flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From de...@apache.org
Subject flume git commit: FLUME-3152 Add Flume Metric for Backup Checkpoint Errors
Date Wed, 23 Aug 2017 16:27:58 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk 66327aa20 -> 4d79aa003


FLUME-3152 Add Flume Metric for Backup Checkpoint Errors

This change adds a new metric (channel.file.checkpoint.backup.write.error)
to the File Channel. It gets incremented if an exception happens
during backup checkpoints writes.

This closes #156

Reviewers: Denes Arvay

(Ferenc Szabo via Denes Arvay)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/4d79aa00
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/4d79aa00
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/4d79aa00

Branch: refs/heads/trunk
Commit: 4d79aa003aa02e8d513a1ae1406795d758143397
Parents: 66327aa
Author: Ferenc Szabo <fszabo@cloudera.com>
Authored: Mon Aug 21 14:29:38 2017 +0200
Committer: Denes Arvay <denes@apache.org>
Committed: Wed Aug 23 18:18:39 2017 +0200

----------------------------------------------------------------------
 .../flume/channel/file/CheckpointRebuilder.java |  3 +-
 .../file/EventQueueBackingStoreFactory.java     | 41 +++++----
 .../file/EventQueueBackingStoreFile.java        | 24 ++++--
 .../file/EventQueueBackingStoreFileV2.java      |  8 +-
 .../file/EventQueueBackingStoreFileV3.java      | 12 +--
 .../java/org/apache/flume/channel/file/Log.java |  4 +-
 .../instrumentation/FileChannelCounter.java     | 16 +++-
 .../FileChannelCounterMBean.java                |  7 ++
 .../flume/channel/file/TestCheckpoint.java      |  3 +-
 .../channel/file/TestCheckpointRebuilder.java   |  3 +-
 .../file/TestEventQueueBackingStoreFactory.java | 87 +++++++++++++-------
 .../file/TestFileChannelErrorMetrics.java       | 67 +++++++++++++++
 .../flume/channel/file/TestFlumeEventQueue.java | 19 +++--
 13 files changed, 220 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
index a0ecdeb..8fbf3c8 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
@@ -28,6 +28,7 @@ import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -240,7 +241,7 @@ public class CheckpointRebuilder {
     } else {
       EventQueueBackingStore backingStore =
           EventQueueBackingStoreFactory.get(checkpointFile,
-              capacity, "channel");
+              capacity, "channel", new FileChannelCounter("Main"));
       FlumeEventQueue queue = new FlumeEventQueue(backingStore,
           new File(checkpointDir, "inflighttakes"),
           new File(checkpointDir, "inflightputs"),

http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
index dcd6f98..7f8b3f6 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flume.channel.file;
 
 import com.google.common.io.Files;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,19 +32,22 @@ class EventQueueBackingStoreFactory {
 
   private EventQueueBackingStoreFactory() {}
 
-  static EventQueueBackingStore get(File checkpointFile, int capacity,
-                                    String name) throws Exception {
-    return get(checkpointFile, capacity, name, true);
+  static EventQueueBackingStore get(
+      File checkpointFile, int capacity, String name, FileChannelCounter counter
+  ) throws Exception {
+    return get(checkpointFile, capacity, name, counter, true);
   }
 
-  static EventQueueBackingStore get(File checkpointFile, int capacity,
-                                    String name, boolean upgrade) throws Exception {
-    return get(checkpointFile, null, capacity, name, upgrade, false, false);
+  static EventQueueBackingStore get(
+      File checkpointFile, int capacity, String name, FileChannelCounter counter, boolean
upgrade
+  ) throws Exception {
+    return get(checkpointFile, null, capacity, name, counter, upgrade, false, false);
   }
 
-  static EventQueueBackingStore get(File checkpointFile, File backupCheckpointDir,
-                                    int capacity, String name, boolean upgrade,
-                                    boolean shouldBackup, boolean compressBackup) throws
Exception {
+  static EventQueueBackingStore get(
+      File checkpointFile, File backupCheckpointDir, int capacity, String name,
+      FileChannelCounter counter, boolean upgrade, boolean shouldBackup, boolean compressBackup
+  ) throws Exception {
     File metaDataFile = Serialization.getMetaDataFile(checkpointFile);
     RandomAccessFile checkpointFileHandle = null;
     try {
@@ -69,21 +73,21 @@ class EventQueueBackingStoreFactory {
           throw new IOException("Cannot create " + checkpointFile);
         }
         return new EventQueueBackingStoreFileV3(checkpointFile,
-            capacity, name, backupCheckpointDir, shouldBackup, compressBackup);
+            capacity, name, counter, backupCheckpointDir, shouldBackup, compressBackup);
       }
       // v3 due to meta file, version will be checked by backing store
       if (metaDataExists) {
         return new EventQueueBackingStoreFileV3(checkpointFile, capacity,
-            name, backupCheckpointDir, shouldBackup, compressBackup);
+            name, counter, backupCheckpointDir, shouldBackup, compressBackup);
       }
       checkpointFileHandle = new RandomAccessFile(checkpointFile, "r");
       int version = (int) checkpointFileHandle.readLong();
       if (Serialization.VERSION_2 == version) {
         if (upgrade) {
           return upgrade(checkpointFile, capacity, name, backupCheckpointDir,
-              shouldBackup, compressBackup);
+              shouldBackup, compressBackup, counter);
         }
-        return new EventQueueBackingStoreFileV2(checkpointFile, capacity, name);
+        return new EventQueueBackingStoreFileV2(checkpointFile, capacity, name, counter);
       }
       LOG.error("Found version " + Integer.toHexString(version) + " in " +
           checkpointFile);
@@ -100,12 +104,13 @@ class EventQueueBackingStoreFactory {
     }
   }
 
-  private static EventQueueBackingStore upgrade(File checkpointFile, int capacity, String
name,
-                                                File backupCheckpointDir, boolean shouldBackup,
-                                                boolean compressBackup) throws Exception
{
+  private static EventQueueBackingStore upgrade(
+      File checkpointFile, int capacity, String name, File backupCheckpointDir,
+      boolean shouldBackup, boolean compressBackup, FileChannelCounter counter
+  ) throws Exception {
     LOG.info("Attempting upgrade of " + checkpointFile + " for " + name);
     EventQueueBackingStoreFileV2 backingStoreV2 =
-        new EventQueueBackingStoreFileV2(checkpointFile, capacity, name);
+        new EventQueueBackingStoreFileV2(checkpointFile, capacity, name, counter);
     String backupName = checkpointFile.getName() + "-backup-"
         + System.currentTimeMillis();
     Files.copy(checkpointFile,
@@ -113,7 +118,7 @@ class EventQueueBackingStoreFactory {
     File metaDataFile = Serialization.getMetaDataFile(checkpointFile);
     EventQueueBackingStoreFileV3.upgrade(backingStoreV2, checkpointFile,
         metaDataFile);
-    return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name,
+    return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name, counter,
         backupCheckpointDir, shouldBackup, compressBackup);
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
index 73f1d4c..445d912 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Maps;
 import com.google.common.collect.SetMultimap;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,6 +61,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore
{
   protected final Map<Integer, AtomicInteger> logFileIDReferenceCounts = Maps.newHashMap();
   protected final MappedByteBuffer mappedBuffer;
   protected final RandomAccessFile checkpointFileHandle;
+  private final FileChannelCounter fileChannelCounter;
   protected final File checkpointFile;
   private final Semaphore backupCompletedSema = new Semaphore(1);
   protected final boolean shouldBackup;
@@ -67,17 +69,18 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore
{
   private final File backupDir;
   private final ExecutorService checkpointBackUpExecutor;
 
-  protected EventQueueBackingStoreFile(int capacity, String name,
-                                       File checkpointFile) throws IOException,
-      BadCheckpointException {
-    this(capacity, name, checkpointFile, null, false, false);
+  protected EventQueueBackingStoreFile(
+      int capacity, String name, FileChannelCounter fileChannelCounter, File checkpointFile
+  ) throws IOException, BadCheckpointException {
+    this(capacity, name, fileChannelCounter, checkpointFile, null, false, false);
   }
 
-  protected EventQueueBackingStoreFile(int capacity, String name,
-                                       File checkpointFile, File checkpointBackupDir,
-                                       boolean backupCheckpoint, boolean compressBackup)
-      throws IOException, BadCheckpointException {
+  protected EventQueueBackingStoreFile(
+      int capacity, String name, FileChannelCounter fileChannelCounter, File checkpointFile,
+      File checkpointBackupDir, boolean backupCheckpoint, boolean compressBackup
+  ) throws IOException, BadCheckpointException {
     super(capacity, name);
+    this.fileChannelCounter = fileChannelCounter;
     this.checkpointFile = checkpointFile;
     this.shouldBackup = backupCheckpoint;
     this.compressBackup = compressBackup;
@@ -294,6 +297,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore
{
         try {
           backupCheckpoint(backupDir);
         } catch (Throwable throwable) {
+          fileChannelCounter.incrementCheckpointBackupWriteErrorCount();
           error = true;
           LOG.error("Backing up of checkpoint directory failed.", throwable);
         } finally {
@@ -432,7 +436,9 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore
{
     }
     int capacity = (int) ((file.length() - (HEADER_SIZE * 8L)) / 8L);
     EventQueueBackingStoreFile backingStore = (EventQueueBackingStoreFile)
-        EventQueueBackingStoreFactory.get(file, capacity, "debug", false);
+        EventQueueBackingStoreFactory.get(
+            file, capacity, "debug", new FileChannelCounter("Main"), false
+        );
     System.out.println("File Reference Counts"
         + backingStore.logFileIDReferenceCounts);
     System.out.println("Queue Capacity " + backingStore.getCapacity());

http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
index 71183aa..3711a78 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Preconditions;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
 
 final class EventQueueBackingStoreFileV2 extends EventQueueBackingStoreFile {
 
@@ -33,9 +34,10 @@ final class EventQueueBackingStoreFileV2 extends EventQueueBackingStoreFile
{
   private static final int INDEX_ACTIVE_LOG = 5;
   private static final int MAX_ACTIVE_LOGS = 1024;
 
-  EventQueueBackingStoreFileV2(File checkpointFile, int capacity, String name)
-      throws IOException, BadCheckpointException {
-    super(capacity, name, checkpointFile);
+  EventQueueBackingStoreFileV2(
+      File checkpointFile, int capacity, String name, FileChannelCounter counter
+  ) throws IOException, BadCheckpointException {
+    super(capacity, name, counter, checkpointFile);
     Preconditions.checkArgument(capacity > 0,
         "capacity must be greater than 0 " + capacity);
 

http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
index f1a892a..da5a082 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
@@ -20,6 +20,7 @@ package org.apache.flume.channel.file;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
 import org.apache.flume.channel.file.proto.ProtosFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,16 +37,17 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile
{
   private static final Logger LOG = LoggerFactory.getLogger(EventQueueBackingStoreFileV3.class);
   private final File metaDataFile;
 
-  EventQueueBackingStoreFileV3(File checkpointFile, int capacity,
-                               String name) throws IOException, BadCheckpointException {
-    this(checkpointFile, capacity, name, null, false, false);
+  EventQueueBackingStoreFileV3(
+      File checkpointFile, int capacity, String name, FileChannelCounter counter
+  ) throws IOException, BadCheckpointException {
+    this(checkpointFile, capacity, name, counter, null, false, false);
   }
 
   EventQueueBackingStoreFileV3(File checkpointFile, int capacity,
-                               String name, File checkpointBackupDir,
+                               String name, FileChannelCounter counter, File checkpointBackupDir,
                                boolean backupCheckpoint, boolean compressBackup)
       throws IOException, BadCheckpointException {
-    super(capacity, name, checkpointFile, checkpointBackupDir, backupCheckpoint,
+    super(capacity, name, counter, checkpointFile, checkpointBackupDir, backupCheckpoint,
         compressBackup);
     Preconditions.checkArgument(capacity > 0,
         "capacity must be greater than 0 " + capacity);

http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 1662a5b..efc8d14 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -450,7 +450,7 @@ public class Log {
         backingStore =
             EventQueueBackingStoreFactory.get(checkpointFile,
                 backupCheckpointDir, queueCapacity, channelNameDescriptor,
-                true, this.useDualCheckpoints,
+                channelCounter, true, this.useDualCheckpoints,
                 this.compressBackupCheckpoint);
         queue = new FlumeEventQueue(backingStore, inflightTakesFile,
             inflightPutsFile, queueSetDir);
@@ -487,7 +487,7 @@ public class Log {
         }
         backingStore = EventQueueBackingStoreFactory.get(
             checkpointFile, backupCheckpointDir, queueCapacity,
-            channelNameDescriptor, true, useDualCheckpoints,
+            channelNameDescriptor, channelCounter, true, useDualCheckpoints,
             compressBackupCheckpoint);
         queue = new FlumeEventQueue(backingStore, inflightTakesFile,
             inflightPutsFile, queueSetDir);

http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
index 40470a8..6cec3da 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
@@ -28,10 +28,15 @@ public class FileChannelCounter extends ChannelCounter implements FileChannelCou
   private static final String EVENT_PUT_ERROR_COUNT = "channel.file.event.put.error";
   private static final String EVENT_TAKE_ERROR_COUNT = "channel.file.event.take.error";
   private static final String CHECKPOINT_WRITE_ERROR_COUNT = "channel.file.checkpoint.write.error";
+  private static final String CHECKPOINT_BACKUP_WRITE_ERROR_COUNT
+      = "channel.file.checkpoint.backup.write.error";
 
   public FileChannelCounter(String name) {
     super(name, new String[] {
-        EVENT_PUT_ERROR_COUNT, EVENT_TAKE_ERROR_COUNT, CHECKPOINT_WRITE_ERROR_COUNT });
+        EVENT_PUT_ERROR_COUNT, EVENT_TAKE_ERROR_COUNT,
+        CHECKPOINT_WRITE_ERROR_COUNT, CHECKPOINT_BACKUP_WRITE_ERROR_COUNT
+        }
+    );
   }
 
   @Override
@@ -83,4 +88,13 @@ public class FileChannelCounter extends ChannelCounter implements FileChannelCou
   public void incrementCheckpointWriteErrorCount() {
     increment(CHECKPOINT_WRITE_ERROR_COUNT);
   }
+
+  @Override
+  public long getCheckpointBackupWriteErrorCount() {
+    return get(CHECKPOINT_BACKUP_WRITE_ERROR_COUNT);
+  }
+
+  public void incrementCheckpointBackupWriteErrorCount() {
+    increment(CHECKPOINT_BACKUP_WRITE_ERROR_COUNT);
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
index 175b1f4..9386094 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
@@ -62,4 +62,11 @@ public interface FileChannelCounterMBean extends ChannelCounterMBean {
    * @see org.apache.flume.channel.file.Log.BackgroundWorker#run()
    */
   long getCheckpointWriteErrorCount();
+
+  /**
+   * A count of the number of errors encountered while trying to write the backup checkpoints.
This
+   * includes any Throwables.
+   * @see org.apache.flume.channel.file.EventQueueBackingStoreFile#startBackupThread()
+   */
+  long getCheckpointBackupWriteErrorCount();
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
index cd1dcd9..1e00ee2 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 
 import junit.framework.Assert;
 
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -51,7 +52,7 @@ public class TestCheckpoint {
   @Test
   public void testSerialization() throws Exception {
     EventQueueBackingStore backingStore =
-        new EventQueueBackingStoreFileV2(file, 1, "test");
+        new EventQueueBackingStoreFileV2(file, 1, "test", new FileChannelCounter("test"));
     FlumeEventPointer ptrIn = new FlumeEventPointer(10, 20);
     FlumeEventQueue queueIn = new FlumeEventQueue(backingStore,
         inflightTakes, inflightPuts, queueSet);

http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
index c6c6ad3..6c91661 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
@@ -24,6 +24,7 @@ import java.io.File;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -70,7 +71,7 @@ public class TestCheckpointRebuilder extends TestFileChannelBase {
     Assert.assertTrue(inflightPutsFile.delete());
     EventQueueBackingStore backingStore =
         EventQueueBackingStoreFactory.get(checkpointFile, 50,
-            "test");
+            "test", new FileChannelCounter("test"));
     FlumeEventQueue queue = new FlumeEventQueue(backingStore, inflightTakesFile,
           inflightPutsFile, queueSetDir);
     CheckpointRebuilder checkpointRebuilder =

http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
index 0939454..7aebb03 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
@@ -23,6 +23,7 @@ import com.google.common.io.Files;
 import com.google.protobuf.InvalidProtocolBufferException;
 import junit.framework.Assert;
 import org.apache.commons.io.FileUtils;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
 import org.apache.flume.channel.file.proto.ProtosFactory;
 import org.junit.After;
 import org.junit.Before;
@@ -75,29 +76,39 @@ public class TestEventQueueBackingStoreFactory {
 
   @Test
   public void testWithNoFlag() throws Exception {
-    verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test"),
-           Serialization.VERSION_3, pointersInTestCheckpoint);
+    verify(
+        EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test")),
+        Serialization.VERSION_3, pointersInTestCheckpoint
+    );
   }
 
   @Test
   public void testWithFlag() throws Exception {
-    verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", true),
-           Serialization.VERSION_3, pointersInTestCheckpoint);
+    verify(
+        EventQueueBackingStoreFactory.get(
+            checkpoint, 10, "test", new FileChannelCounter("test"), true
+        ),
+        Serialization.VERSION_3, pointersInTestCheckpoint
+    );
   }
 
   @Test
   public void testNoUprade() throws Exception {
-    verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", false),
-           Serialization.VERSION_2, pointersInTestCheckpoint);
+    verify(
+        EventQueueBackingStoreFactory.get(
+            checkpoint, 10, "test", new FileChannelCounter("test"), false
+        ),
+        Serialization.VERSION_2, pointersInTestCheckpoint
+    );
   }
 
   @Test(expected = BadCheckpointException.class)
   public void testDecreaseCapacity() throws Exception {
     Assert.assertTrue(checkpoint.delete());
     EventQueueBackingStore backingStore =
-        EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+        EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test"));
     backingStore.close();
-    EventQueueBackingStoreFactory.get(checkpoint, 9, "test");
+    EventQueueBackingStoreFactory.get(checkpoint, 9, "test", new FileChannelCounter("test"));
     Assert.fail();
   }
 
@@ -105,17 +116,21 @@ public class TestEventQueueBackingStoreFactory {
   public void testIncreaseCapacity() throws Exception {
     Assert.assertTrue(checkpoint.delete());
     EventQueueBackingStore backingStore =
-        EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+        EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test"));
     backingStore.close();
-    EventQueueBackingStoreFactory.get(checkpoint, 11, "test");
+    EventQueueBackingStoreFactory.get(checkpoint, 11, "test", new FileChannelCounter("test"));
     Assert.fail();
   }
 
   @Test
   public void testNewCheckpoint() throws Exception {
     Assert.assertTrue(checkpoint.delete());
-    verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", false),
-           Serialization.VERSION_3, Collections.<Long>emptyList());
+    verify(
+        EventQueueBackingStoreFactory.get(
+            checkpoint, 10, "test", new FileChannelCounter("test"), false
+        ),
+        Serialization.VERSION_3, Collections.<Long>emptyList()
+    );
   }
 
   @Test(expected = BadCheckpointException.class)
@@ -123,13 +138,15 @@ public class TestEventQueueBackingStoreFactory {
     RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
     try {
       EventQueueBackingStore backingStore =
-          EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+          EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test"));
       backingStore.close();
       writer.seek(EventQueueBackingStoreFile.INDEX_VERSION * Serialization.SIZE_OF_LONG);
       writer.writeLong(94L);
       writer.getFD().sync();
 
-      backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+      backingStore = EventQueueBackingStoreFactory.get(
+          checkpoint, 10, "test", new FileChannelCounter("test")
+      );
     } finally {
       writer.close();
     }
@@ -141,12 +158,14 @@ public class TestEventQueueBackingStoreFactory {
 
     try {
       EventQueueBackingStore backingStore =
-          EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+          EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test"));
       backingStore.close();
       writer.seek(EventQueueBackingStoreFile.INDEX_CHECKPOINT_MARKER * Serialization.SIZE_OF_LONG);
       writer.writeLong(EventQueueBackingStoreFile.CHECKPOINT_INCOMPLETE);
       writer.getFD().sync();
-      backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+      backingStore = EventQueueBackingStoreFactory.get(
+          checkpoint, 10, "test", new FileChannelCounter("test")
+      );
     } finally {
       writer.close();
     }
@@ -157,12 +176,14 @@ public class TestEventQueueBackingStoreFactory {
     RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
     try {
       EventQueueBackingStore backingStore =
-          EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+          EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test"));
       backingStore.close();
       writer.seek(EventQueueBackingStoreFile.INDEX_VERSION * Serialization.SIZE_OF_LONG);
       writer.writeLong(2L);
       writer.getFD().sync();
-      backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+      backingStore = EventQueueBackingStoreFactory.get(
+          checkpoint, 10, "test", new FileChannelCounter("test")
+      );
     } finally {
       writer.close();
     }
@@ -173,7 +194,7 @@ public class TestEventQueueBackingStoreFactory {
     FileOutputStream os = null;
     try {
       EventQueueBackingStore backingStore =
-          EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+          EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test"));
       backingStore.close();
       Assert.assertTrue(checkpoint.exists());
       Assert.assertTrue(Serialization.getMetaDataFile(checkpoint).length() != 0);
@@ -184,7 +205,9 @@ public class TestEventQueueBackingStoreFactory {
       os = new FileOutputStream(Serialization.getMetaDataFile(checkpoint));
       meta.toBuilder().setVersion(2).build().writeDelimitedTo(os);
       os.flush();
-      backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+      backingStore = EventQueueBackingStoreFactory.get(
+          checkpoint, 10, "test", new FileChannelCounter("test")
+      );
     } finally {
       os.close();
     }
@@ -195,12 +218,14 @@ public class TestEventQueueBackingStoreFactory {
     RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
     try {
       EventQueueBackingStore backingStore =
-          EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+          EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test"));
       backingStore.close();
       writer.seek(EventQueueBackingStoreFile.INDEX_WRITE_ORDER_ID * Serialization.SIZE_OF_LONG);
       writer.writeLong(2L);
       writer.getFD().sync();
-      backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+      backingStore = EventQueueBackingStoreFactory.get(
+          checkpoint, 10, "test", new FileChannelCounter("test")
+      );
     } finally {
       writer.close();
     }
@@ -211,7 +236,7 @@ public class TestEventQueueBackingStoreFactory {
     FileOutputStream os = null;
     try {
       EventQueueBackingStore backingStore =
-          EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+          EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test"));
       backingStore.close();
       Assert.assertTrue(checkpoint.exists());
       Assert.assertTrue(Serialization.getMetaDataFile(checkpoint).length() != 0);
@@ -223,7 +248,9 @@ public class TestEventQueueBackingStoreFactory {
           Serialization.getMetaDataFile(checkpoint));
       meta.toBuilder().setWriteOrderID(1).build().writeDelimitedTo(os);
       os.flush();
-      backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+      backingStore = EventQueueBackingStoreFactory.get(
+          checkpoint, 10, "test", new FileChannelCounter("test")
+      );
     } finally {
       os.close();
     }
@@ -232,7 +259,7 @@ public class TestEventQueueBackingStoreFactory {
   @Test(expected = BadCheckpointException.class)
   public void testTruncateMeta() throws Exception {
     EventQueueBackingStore backingStore =
-        EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+        EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test"));
     backingStore.close();
     Assert.assertTrue(checkpoint.exists());
     File metaFile = Serialization.getMetaDataFile(checkpoint);
@@ -241,13 +268,15 @@ public class TestEventQueueBackingStoreFactory {
     writer.setLength(0);
     writer.getFD().sync();
     writer.close();
-    backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+    backingStore = EventQueueBackingStoreFactory.get(
+        checkpoint, 10, "test", new FileChannelCounter("test")
+    );
   }
 
   @Test(expected = InvalidProtocolBufferException.class)
   public void testCorruptMeta() throws Throwable {
     EventQueueBackingStore backingStore =
-        EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+        EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test"));
     backingStore.close();
     Assert.assertTrue(checkpoint.exists());
     File metaFile = Serialization.getMetaDataFile(checkpoint);
@@ -258,7 +287,9 @@ public class TestEventQueueBackingStoreFactory {
     writer.getFD().sync();
     writer.close();
     try {
-      backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+      backingStore = EventQueueBackingStoreFactory.get(
+          checkpoint, 10, "test", new FileChannelCounter("test")
+      );
     } catch (BadCheckpointException ex) {
       throw ex.getCause();
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java
index d0237db..e2d1ee6 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java
@@ -31,7 +31,10 @@ import org.mockito.stubbing.Answer;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertFalse;
@@ -230,6 +233,70 @@ public class TestFileChannelErrorMetrics extends TestFileChannelBase
{
     assertFalse(channel.getChannelCounter().isOpen());
   }
 
+  @Test
+  public void testCheckpointBackupWriteErrorShouldIncreaseCounter()
+      throws IOException, InterruptedException {
+    FileChannelCounter fileChannelCounter = new FileChannelCounter("test");
+    File checkpointFile = File.createTempFile("checkpoint", ".tmp");
+    File backupDir = Files.createTempDirectory("checkpoint").toFile();
+    backupDir.deleteOnExit();
+    checkpointFile.deleteOnExit();
+    EventQueueBackingStoreFileV3 backingStoreFileV3 = new EventQueueBackingStoreFileV3(
+        checkpointFile, 1, "test", fileChannelCounter, backupDir,true, false
+    );
+
+    // Exception will be thrown by state check if beforeCheckpoint is not called
+    backingStoreFileV3.checkpoint();
+    // wait for other thread to reach the error state
+    assertEventuallyTrue("checkpoint backup write failure should increase counter to 1",
+        new BooleanPredicate() {
+          @Override
+          public boolean get() {
+            return fileChannelCounter.getCheckpointBackupWriteErrorCount() == 1;
+          }
+        },
+        100
+    );
+  }
+
+  @Test
+  public void testCheckpointBackupWriteErrorShouldIncreaseCounter2()
+      throws Exception {
+    int checkpointInterval = 1500;
+    Map config = new HashMap();
+    config.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, String.valueOf(checkpointInterval));
+    config.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, "true");
+    final FileChannel channel = createFileChannel(Collections.unmodifiableMap(config));
+    channel.start();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    tx.commit();
+    tx.close();
+    final long beforeCheckpointWrite = System.currentTimeMillis();
+    // first checkpoint should be written successfully -> the counter should remain 0
+    assertEventuallyTrue("checkpoint backup should have been written", new BooleanPredicate()
{
+      @Override
+      public boolean get() {
+        return new File(backupDir, "checkpoint").lastModified() > beforeCheckpointWrite;
+      }
+    }, checkpointInterval * 3);
+    assertEquals(0, channel.getChannelCounter().getCheckpointBackupWriteErrorCount());
+    FileUtils.deleteDirectory(backupDir);
+    tx = channel.getTransaction();
+    tx.begin();
+    channel.put(EventBuilder.withBody("test2".getBytes()));
+    tx.commit();
+    tx.close();
+    // the backup directory has been deleted so the backup checkpoint write should have been
failed
+    assertEventuallyTrue("checkpointBackupWriteErrorCount should be 1", new BooleanPredicate()
{
+      @Override
+      public boolean get() {
+        return channel.getChannelCounter().getCheckpointBackupWriteErrorCount() >= 1;
+      }
+    }, checkpointInterval * 3);
+  }
+
   private interface BooleanPredicate {
     boolean get();
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
index f1700f9..9c7352e 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -96,7 +97,7 @@ public class TestFlumeEventQueue {
           public EventQueueBackingStore get() throws Exception {
             Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
             return new EventQueueBackingStoreFileV2(getCheckpoint(), 1000,
-                                                    "test");
+                                                    "test", new FileChannelCounter("test"));
           }
         }
       },
@@ -105,7 +106,9 @@ public class TestFlumeEventQueue {
           @Override
           public EventQueueBackingStore get() throws Exception {
             Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
-            return new EventQueueBackingStoreFileV3(getCheckpoint(), 1000, "test");
+            return new EventQueueBackingStoreFileV3(
+                getCheckpoint(), 1000, "test", new FileChannelCounter("test")
+            );
           }
         }
       }
@@ -135,7 +138,9 @@ public class TestFlumeEventQueue {
     backingStore.close();
     File checkpoint = backingStoreSupplier.getCheckpoint();
     Assert.assertTrue(checkpoint.delete());
-    backingStore = new EventQueueBackingStoreFileV2(checkpoint, 1, "test");
+    backingStore = new EventQueueBackingStoreFileV2(
+        checkpoint, 1, "test", new FileChannelCounter("test")
+    );
     queue = new FlumeEventQueue(backingStore,
                                 backingStoreSupplier.getInflightTakes(),
                                 backingStoreSupplier.getInflightPuts(),
@@ -149,7 +154,9 @@ public class TestFlumeEventQueue {
     backingStore.close();
     File checkpoint = backingStoreSupplier.getCheckpoint();
     Assert.assertTrue(checkpoint.delete());
-    backingStore = new EventQueueBackingStoreFileV2(checkpoint, 0, "test");
+    backingStore = new EventQueueBackingStoreFileV2(
+        checkpoint, 0, "test", new FileChannelCounter("test")
+    );
     queue = new FlumeEventQueue(backingStore,
                                 backingStoreSupplier.getInflightTakes(),
                                 backingStoreSupplier.getInflightPuts(),
@@ -161,7 +168,9 @@ public class TestFlumeEventQueue {
     backingStore.close();
     File checkpoint = backingStoreSupplier.getCheckpoint();
     Assert.assertTrue(checkpoint.delete());
-    backingStore = new EventQueueBackingStoreFileV2(checkpoint, -1, "test");
+    backingStore = new EventQueueBackingStoreFileV2(
+        checkpoint, -1, "test", new FileChannelCounter("test")
+    );
     queue = new FlumeEventQueue(backingStore,
                                 backingStoreSupplier.getInflightTakes(),
                                 backingStoreSupplier.getInflightPuts(),


Mime
View raw message