flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject flume git commit: FLUME-3092. Extend the FileChannel's monitoring metrics
Date Tue, 16 May 2017 19:54:27 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk ed433ae1b -> fdc53f338


FLUME-3092. Extend the FileChannel's monitoring metrics

This patch adds the following new metrics to the FileChannel's counters:
- eventPutErrorCount: incremented if an IOException occurs during put operation.
- eventTakeErrorCount: incremented if an IOException or CorruptEventException occurs
  during take operation.
- checkpointWriteErrorCount: incremented if an exception occurs during checkpoint write.
- unhealthy: this flag represents whether the channel has started successfully
  (i.e. the replay ran without any problem), so the channel is capable for normal operation
- closed flag: the numeric representation (1: closed, 0: open) of the negated open flag.

Closes #131.

Reviewers: Attila Simon, Mike Percy

(Denes Arvay via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/fdc53f33
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/fdc53f33
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/fdc53f33

Branch: refs/heads/trunk
Commit: fdc53f338931b96addf06f3f2be59da128656ec0
Parents: ed433ae
Author: Denes Arvay <denes@cloudera.com>
Authored: Tue May 9 16:23:31 2017 +0200
Committer: Mike Percy <mpercy@apache.org>
Committed: Tue May 16 12:50:10 2017 -0700

----------------------------------------------------------------------
 .../apache/flume/channel/file/FileChannel.java  |  55 +++--
 .../java/org/apache/flume/channel/file/Log.java |  19 +-
 .../instrumentation/FileChannelCounter.java     |  49 +++-
 .../FileChannelCounterMBean.java                |  39 +++
 .../flume/channel/file/TestFileChannelBase.java |  13 +-
 .../file/TestFileChannelErrorMetrics.java       | 247 +++++++++++++++++++
 .../org/apache/flume/channel/file/TestLog.java  |  16 ++
 7 files changed, 412 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/fdc53f33/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index eca4620..3194592 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -39,7 +39,6 @@ import org.apache.flume.channel.file.Log.Builder;
 import org.apache.flume.channel.file.encryption.EncryptionConfiguration;
 import org.apache.flume.channel.file.encryption.KeyProvider;
 import org.apache.flume.channel.file.encryption.KeyProviderFactory;
-import org.apache.flume.instrumentation.ChannelCounter;
 import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -270,6 +269,7 @@ public class FileChannel extends BasicChannelSemantics {
     if (channelCounter == null) {
       channelCounter = new FileChannelCounter(getName());
     }
+    channelCounter.setUnhealthy(0);
   }
 
   @Override
@@ -277,25 +277,7 @@ public class FileChannel extends BasicChannelSemantics {
     LOG.info("Starting {}...", this);
     channelCounter.start();
     try {
-      Builder builder = new Log.Builder();
-      builder.setCheckpointInterval(checkpointInterval);
-      builder.setMaxFileSize(maxFileSize);
-      builder.setMinimumRequiredSpace(minimumRequiredSpace);
-      builder.setQueueSize(capacity);
-      builder.setCheckpointDir(checkpointDir);
-      builder.setLogDirs(dataDirs);
-      builder.setChannelName(getName());
-      builder.setUseLogReplayV1(useLogReplayV1);
-      builder.setUseFastReplay(useFastReplay);
-      builder.setEncryptionKeyProvider(encryptionKeyProvider);
-      builder.setEncryptionKeyAlias(encryptionActiveKey);
-      builder.setEncryptionCipherProvider(encryptionCipherProvider);
-      builder.setUseDualCheckpoints(useDualCheckpoints);
-      builder.setCompressBackupCheckpoint(compressBackupCheckpoint);
-      builder.setBackupCheckpointDir(backupCheckpointDir);
-      builder.setFsyncPerTransaction(fsyncPerTransaction);
-      builder.setFsyncInterval(fsyncInterval);
-      builder.setCheckpointOnClose(checkpointOnClose);
+      Builder builder = createLogBuilder();
       log = builder.build();
       log.replay();
       setOpen(true);
@@ -307,6 +289,7 @@ public class FileChannel extends BasicChannelSemantics {
           + channelNameDescriptor);
     } catch (Throwable t) {
       setOpen(false);
+      channelCounter.setUnhealthy(1);
       startupError = t;
       LOG.error("Failed to start the file channel " + channelNameDescriptor, t);
       if (t instanceof Error) {
@@ -320,6 +303,31 @@ public class FileChannel extends BasicChannelSemantics {
     super.start();
   }
 
+  @VisibleForTesting
+  Builder createLogBuilder() {
+    Builder builder = new Log.Builder();
+    builder.setCheckpointInterval(checkpointInterval);
+    builder.setMaxFileSize(maxFileSize);
+    builder.setMinimumRequiredSpace(minimumRequiredSpace);
+    builder.setQueueSize(capacity);
+    builder.setCheckpointDir(checkpointDir);
+    builder.setLogDirs(dataDirs);
+    builder.setChannelName(getName());
+    builder.setUseLogReplayV1(useLogReplayV1);
+    builder.setUseFastReplay(useFastReplay);
+    builder.setEncryptionKeyProvider(encryptionKeyProvider);
+    builder.setEncryptionKeyAlias(encryptionActiveKey);
+    builder.setEncryptionCipherProvider(encryptionCipherProvider);
+    builder.setUseDualCheckpoints(useDualCheckpoints);
+    builder.setCompressBackupCheckpoint(compressBackupCheckpoint);
+    builder.setBackupCheckpointDir(backupCheckpointDir);
+    builder.setFsyncPerTransaction(fsyncPerTransaction);
+    builder.setFsyncInterval(fsyncInterval);
+    builder.setCheckpointOnClose(checkpointOnClose);
+    builder.setChannelCounter(channelCounter);
+    return builder;
+  }
+
   @Override
   public synchronized void stop() {
     LOG.info("Stopping {}...", this);
@@ -449,12 +457,12 @@ public class FileChannel extends BasicChannelSemantics {
     private final FlumeEventQueue queue;
     private final Semaphore queueRemaining;
     private final String channelNameDescriptor;
-    private final ChannelCounter channelCounter;
+    private final FileChannelCounter channelCounter;
     private final boolean fsyncPerTransaction;
 
     public FileBackedTransaction(Log log, long transactionID,
                                  int transCapacity, int keepAlive, Semaphore queueRemaining,
-                                 String name, boolean fsyncPerTransaction, ChannelCounter
+                                 String name, boolean fsyncPerTransaction, FileChannelCounter
                                      counter) {
       this.log = log;
       queue = log.getFlumeEventQueue();
@@ -503,6 +511,7 @@ public class FileChannel extends BasicChannelSemantics {
         queue.addWithoutCommit(ptr, transactionID);
         success = true;
       } catch (IOException e) {
+        channelCounter.incrementEventPutErrorCount();
         throw new ChannelException("Put failed due to IO error "
             + channelNameDescriptor, e);
       } finally {
@@ -549,6 +558,7 @@ public class FileChannel extends BasicChannelSemantics {
               Event event = log.get(ptr);
               return event;
             } catch (IOException e) {
+              channelCounter.incrementEventTakeErrorCount();
               throw new ChannelException("Take failed due to IO error "
                   + channelNameDescriptor, e);
             } catch (NoopRecordException e) {
@@ -556,6 +566,7 @@ public class FileChannel extends BasicChannelSemantics {
                   "tool found. Will retrieve next event", e);
               takeList.remove(ptr);
             } catch (CorruptEventException ex) {
+              channelCounter.incrementEventTakeErrorCount();
               if (fsyncPerTransaction) {
                 throw new ChannelException(ex);
               }

http://git-wip-us.apache.org/repos/asf/flume/blob/fdc53f33/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 5f59d97..1662a5b 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -30,6 +30,7 @@ import org.apache.flume.Event;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
 import org.apache.flume.channel.file.encryption.KeyProvider;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -138,6 +139,8 @@ public class Log {
 
   private final List<File> pendingDeletes = Lists.newArrayList();
 
+  private final FileChannelCounter channelCounter;
+
   static class Builder {
     private long bCheckpointInterval;
     private long bMinimumRequiredSpace;
@@ -161,6 +164,8 @@ public class Log {
 
     private boolean checkpointOnClose = true;
 
+    private FileChannelCounter channelCounter;
+
     boolean isFsyncPerTransaction() {
       return fsyncPerTransaction;
     }
@@ -262,13 +267,18 @@ public class Log {
       return this;
     }
 
+    Builder setChannelCounter(FileChannelCounter channelCounter) {
+      this.channelCounter = channelCounter;
+      return this;
+    }
+
     Log build() throws IOException {
       return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity,
           bUseDualCheckpoints, bCompressBackupCheckpoint, bCheckpointDir,
           bBackupCheckpointDir, bName, useLogReplayV1, useFastReplay,
           bMinimumRequiredSpace, bEncryptionKeyProvider, bEncryptionKeyAlias,
           bEncryptionCipherProvider, bUsableSpaceRefreshInterval,
-          fsyncPerTransaction, fsyncInterval, checkpointOnClose, bLogDirs);
+          fsyncPerTransaction, fsyncInterval, checkpointOnClose, channelCounter, bLogDirs);
     }
   }
 
@@ -280,7 +290,8 @@ public class Log {
               @Nullable String encryptionKeyAlias,
               @Nullable String encryptionCipherProvider,
               long usableSpaceRefreshInterval, boolean fsyncPerTransaction,
-              int fsyncInterval, boolean checkpointOnClose, File... logDirs)
+              int fsyncInterval, boolean checkpointOnClose, FileChannelCounter channelCounter,
+              File... logDirs)
       throws IOException {
     Preconditions.checkArgument(checkpointInterval > 0,
         "checkpointInterval <= 0");
@@ -304,6 +315,7 @@ public class Log {
     Preconditions.checkArgument(logDirs.length > 0, "logDirs empty");
     Preconditions.checkArgument(name != null && !name.trim().isEmpty(),
         "channel name should be specified");
+    Preconditions.checkNotNull(channelCounter, "ChannelCounter must be not null");
 
     this.channelNameDescriptor = "[channel=" + name + "]";
     this.useLogReplayV1 = useLogReplayV1;
@@ -361,6 +373,7 @@ public class Log {
     this.fsyncPerTransaction = fsyncPerTransaction;
     this.fsyncInterval = fsyncInterval;
     this.checkpointOnClose = checkpointOnClose;
+    this.channelCounter = channelCounter;
 
     logFiles = new AtomicReferenceArray<LogFile.Writer>(this.logDirs.length);
     workerExecutor = Executors.newSingleThreadScheduledExecutor(new
@@ -1221,8 +1234,10 @@ public class Log {
           log.writeCheckpoint();
         }
       } catch (IOException e) {
+        log.channelCounter.incrementCheckpointWriteErrorCount();
         LOG.error("Error doing checkpoint", e);
       } catch (Throwable e) {
+        log.channelCounter.incrementCheckpointWriteErrorCount();
         LOG.error("General error in checkpoint worker", e);
       }
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/fdc53f33/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
index 1cd1ba8..40470a8 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
@@ -23,9 +23,15 @@ import org.apache.flume.instrumentation.ChannelCounter;
 public class FileChannelCounter extends ChannelCounter implements FileChannelCounterMBean
{
 
   private boolean open;
+  private int unhealthy;
+
+  private static final String EVENT_PUT_ERROR_COUNT = "channel.file.event.put.error";
+  private static final String EVENT_TAKE_ERROR_COUNT = "channel.file.event.take.error";
+  private static final String CHECKPOINT_WRITE_ERROR_COUNT = "channel.file.checkpoint.write.error";
 
   public FileChannelCounter(String name) {
-    super(name);
+    super(name, new String[] {
+        EVENT_PUT_ERROR_COUNT, EVENT_TAKE_ERROR_COUNT, CHECKPOINT_WRITE_ERROR_COUNT });
   }
 
   @Override
@@ -36,4 +42,45 @@ public class FileChannelCounter extends ChannelCounter implements FileChannelCou
   public void setOpen(boolean open) {
     this.open = open;
   }
+
+  @Override
+  public int getClosed() {
+    return open ? 0 : 1;
+  }
+
+  @Override
+  public int getUnhealthy() {
+    return unhealthy;
+  }
+
+  public void setUnhealthy(int unhealthy) {
+    this.unhealthy = unhealthy;
+  }
+
+  @Override
+  public long getEventPutErrorCount() {
+    return get(EVENT_PUT_ERROR_COUNT);
+  }
+
+  public void incrementEventPutErrorCount() {
+    increment(EVENT_PUT_ERROR_COUNT);
+  }
+
+  @Override
+  public long getEventTakeErrorCount() {
+    return get(EVENT_TAKE_ERROR_COUNT);
+  }
+
+  public void incrementEventTakeErrorCount() {
+    increment(EVENT_TAKE_ERROR_COUNT);
+  }
+
+  @Override
+  public long getCheckpointWriteErrorCount() {
+    return get(CHECKPOINT_WRITE_ERROR_COUNT);
+  }
+
+  public void incrementCheckpointWriteErrorCount() {
+    increment(CHECKPOINT_WRITE_ERROR_COUNT);
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/fdc53f33/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
index a193c0c..175b1f4 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
@@ -18,9 +18,48 @@
  */
 package org.apache.flume.channel.file.instrumentation;
 
+import org.apache.flume.Event;
 import org.apache.flume.instrumentation.ChannelCounterMBean;
 
 public interface FileChannelCounterMBean extends ChannelCounterMBean {
 
   boolean isOpen();
+
+  /**
+   * The numeric representation (0/1) of the negated value of the open flag.
+   */
+  int getClosed();
+
+  /**
+   * A value of 0 represents that the channel is in a healthy state: it is either starting
+   * up (i.e. the replay is running) or already started up successfully.
+   * A value of 1 represents that the channel is in a permanently failed state, which means
that
+   * the startup was unsuccessful due to an exception during the replay.
+   * Once the channel started up successfully the *ErrorCount (or the ratio of the *AttemptCount
+   * and *SuccessCount) counters should be used to check whether it is functioning properly.
+   *
+   * Note: this flag doesn't report the channel as unhealthy if the configuration failed
because the
+   * ChannelCounter might not have been instantiated/started yet.
+   */
+  int getUnhealthy();
+
+  /**
+   * A count of the number of IOExceptions encountered while trying to put() onto the channel.
+   * @see org.apache.flume.channel.file.FileChannel.FileBackedTransaction#doPut(Event)
+   */
+  long getEventPutErrorCount();
+
+  /**
+   * A count of the number of errors encountered while trying to take() from the channel,
+   * including IOExceptions and corruption-related errors.
+   * @see org.apache.flume.channel.file.FileChannel.FileBackedTransaction#doTake()
+   */
+  long getEventTakeErrorCount();
+
+  /**
+   * A count of the number of errors encountered while trying to write the checkpoints. This
+   * includes any Throwables.
+   * @see org.apache.flume.channel.file.Log.BackgroundWorker#run()
+   */
+  long getCheckpointWriteErrorCount();
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/fdc53f33/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
index 9901b69..6f5981a 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.io.FileUtils;
 import org.apache.flume.Context;
 import org.junit.After;
@@ -32,6 +33,7 @@ import com.google.common.io.Files;
 
 public class TestFileChannelBase {
 
+  private final int dataDirCount;
   protected FileChannel channel;
   protected File baseDir;
   protected File checkpointDir;
@@ -41,6 +43,15 @@ public class TestFileChannelBase {
   protected File uncompressedBackupCheckpoint;
   protected File compressedBackupCheckpoint;
 
+  public TestFileChannelBase() {
+    this(3); // By default the tests run with multiple data directories
+  }
+
+  public TestFileChannelBase(int dataDirCount) {
+    Preconditions.checkArgument(dataDirCount > 0, "Invalid dataDirCount");
+    this.dataDirCount = dataDirCount;
+  }
+
   @Before
   public void setup() throws Exception {
     baseDir = Files.createTempDir();
@@ -51,7 +62,7 @@ public class TestFileChannelBase {
       "checkpoint.snappy");
     Assert.assertTrue(checkpointDir.mkdirs() || checkpointDir.isDirectory());
     Assert.assertTrue(backupDir.mkdirs() || backupDir.isDirectory());
-    dataDirs = new File[3];
+    dataDirs = new File[dataDirCount];
     dataDir = "";
     for (int i = 0; i < dataDirs.length; i++) {
       dataDirs[i] = new File(baseDir, "data" + (i + 1));

http://git-wip-us.apache.org/repos/asf/flume/blob/fdc53f33/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java
new file mode 100644
index 0000000..d0237db
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import junit.framework.Assert;
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+
+public class TestFileChannelErrorMetrics extends TestFileChannelBase {
+
+  public TestFileChannelErrorMetrics() {
+    // use only 1 data directory in order to make it simpler to edit the data files
+    // in testCorruptEventTaken() and testUnhealthy() methods
+    super(1);
+  }
+
+  /**
+   * This tests multiple successful and failed put and take operations
+   * and checks the values of the channel's counters.
+   */
+  @Test
+  public void testEventTakePutErrorCount() throws Exception {
+    final long usableSpaceRefreshInterval = 1;
+    FileChannel channel = Mockito.spy(createFileChannel());
+    Mockito.when(channel.createLogBuilder()).then(new Answer<Log.Builder>() {
+      @Override
+      public Log.Builder answer(InvocationOnMock invocation) throws Throwable {
+        Log.Builder ret = (Log.Builder) invocation.callRealMethod();
+        ret.setUsableSpaceRefreshInterval(usableSpaceRefreshInterval);
+        return ret;
+      }
+    });
+    channel.start();
+
+    FileChannelCounter channelCounter = channel.getChannelCounter();
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    channel.put(EventBuilder.withBody("test1".getBytes()));
+    channel.put(EventBuilder.withBody("test2".getBytes()));
+    tx.commit();
+    tx.close();
+    assertEquals(2, channelCounter.getEventPutAttemptCount());
+    assertEquals(2, channelCounter.getEventPutSuccessCount());
+    assertEquals(0, channelCounter.getEventPutErrorCount());
+
+    tx = channel.getTransaction();
+    tx.begin();
+    channel.take();
+    tx.commit();
+    tx.close();
+    assertEquals(1, channelCounter.getEventTakeAttemptCount());
+    assertEquals(1, channelCounter.getEventTakeSuccessCount());
+    assertEquals(0, channelCounter.getEventTakeErrorCount());
+
+    FileUtils.deleteDirectory(baseDir);
+    Thread.sleep(2 * usableSpaceRefreshInterval);
+
+    tx = channel.getTransaction();
+    tx.begin();
+    ChannelException putException = null;
+    try {
+      channel.put(EventBuilder.withBody("test".getBytes()));
+    } catch (ChannelException ex) {
+      putException = ex;
+    }
+    assertNotNull(putException);
+    assertTrue(putException.getCause() instanceof IOException);
+    assertEquals(3, channelCounter.getEventPutAttemptCount());
+    assertEquals(2, channelCounter.getEventPutSuccessCount());
+    assertEquals(1, channelCounter.getEventPutErrorCount());
+
+    ChannelException takeException = null;
+    try {
+      channel.take(); // This is guaranteed to throw an error if the above put() threw an
error.
+    } catch (ChannelException ex) {
+      takeException = ex;
+    }
+    assertNotNull(takeException);
+    assertTrue(takeException.getCause() instanceof IOException);
+    assertEquals(2, channelCounter.getEventTakeAttemptCount());
+    assertEquals(1, channelCounter.getEventTakeSuccessCount());
+    assertEquals(1, channelCounter.getEventTakeErrorCount());
+  }
+
+  /**
+   * Test the FileChannelCounter.eventTakeErrorCount value if the data file
+   * contains an invalid record thus CorruptEventException is thrown during
+   * the take() operation.
+   * The first byte of the record (= the first byte of the file in this case)
+   * is the operation byte, changing it to an unexpected value will cause the
+   * CorruptEventException to be thrown.
+   */
+  @Test
+  public void testCorruptEventTaken() throws Exception {
+    FileChannel channel = createFileChannel(
+        Collections.singletonMap(FileChannelConfiguration.FSYNC_PER_TXN, "false"));
+    channel.start();
+
+    FileChannelCounter channelCounter = channel.getChannelCounter();
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    tx.commit();
+    tx.close();
+
+    byte[] data = FileUtils.readFileToByteArray(new File(dataDirs[0], "log-1"));
+    data[0] = LogFile.OP_EOF; // change the first (operation) byte to unexpected value
+    FileUtils.writeByteArrayToFile(new File(dataDirs[0], "log-1"), data);
+
+    tx = channel.getTransaction();
+    tx.begin();
+
+    try {
+      channel.take();
+    } catch (Throwable t) {
+      // If fsyncPerTransaction is false then Log.get throws the CorruptEventException
+      // without wrapping it to IOException (which is the case when fsyncPerTransaciton is
true)
+      // but in this case it is swallowed in FileBackedTransaction.doTake()
+      // The eventTakeErrorCount should be increased regardless of this.
+      Assert.fail("No exception should be thrown as fsyncPerTransaction is false");
+    }
+
+    assertEquals(1, channelCounter.getEventTakeAttemptCount());
+    assertEquals(0, channelCounter.getEventTakeSuccessCount());
+    assertEquals(1, channelCounter.getEventTakeErrorCount());
+  }
+
+  @Test
+  public void testCheckpointWriteErrorCount() throws Exception {
+    int checkpointInterval = 1500;
+    final FileChannel channel = createFileChannel(Collections.singletonMap(
+        FileChannelConfiguration.CHECKPOINT_INTERVAL, String.valueOf(checkpointInterval)));
+    channel.start();
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    tx.commit();
+    tx.close();
+
+    final long beforeCheckpointWrite = System.currentTimeMillis();
+
+    // first checkpoint should be written successfully -> the counter should remain 0
+    assertEventuallyTrue("checkpoint should have been written", new BooleanPredicate() {
+      @Override
+      public boolean get() {
+        return new File(checkpointDir, "checkpoint").lastModified() > beforeCheckpointWrite;
+      }
+    }, checkpointInterval * 3);
+    assertEquals(0, channel.getChannelCounter().getCheckpointWriteErrorCount());
+
+    FileUtils.deleteDirectory(baseDir);
+
+    // the channel's directory has been deleted so the checkpoint write should have been
failed
+    assertEventuallyTrue("checkpointWriterErrorCount should be 1", new BooleanPredicate()
{
+      @Override
+      public boolean get() {
+        return channel.getChannelCounter().getCheckpointWriteErrorCount() == 1;
+      }
+    }, checkpointInterval * 3);
+  }
+
+  /**
+   * Test the value of the FileChannelCounter.unhealthy flag after normal startup.
+   * It is expected to be 0
+   */
+  @Test
+  public void testHealthy() throws Exception {
+    FileChannel channel = createFileChannel();
+    assertEquals(0, channel.getChannelCounter().getUnhealthy());
+    assertEquals(1, channel.getChannelCounter().getClosed());
+    assertFalse(channel.getChannelCounter().isOpen());
+
+    channel.start();
+    assertEquals(0, channel.getChannelCounter().getUnhealthy());
+    assertEquals(0, channel.getChannelCounter().getClosed());
+    assertTrue(channel.getChannelCounter().isOpen());
+  }
+
+  /**
+   * Test the value of the FileChannelCounter.unhealthy flag after a failed startup.
+   * It is expected to be 1
+   */
+  @Test
+  public void testUnhealthy() throws Exception {
+    FileChannel channel = createFileChannel();
+    assertEquals(0, channel.getChannelCounter().getUnhealthy());
+    assertEquals(1, channel.getChannelCounter().getClosed());
+    assertFalse(channel.getChannelCounter().isOpen());
+
+    FileUtils.write(new File(dataDirs[0], "log-1"), "invalid data file content");
+
+    channel.start();
+    assertEquals(1, channel.getChannelCounter().getUnhealthy());
+    assertEquals(1, channel.getChannelCounter().getClosed());
+    assertFalse(channel.getChannelCounter().isOpen());
+  }
+
+  private interface BooleanPredicate {
+    boolean get();
+  }
+
+  private static void assertEventuallyTrue(String description, BooleanPredicate expression,
+                                           long timeoutMillis)
+      throws InterruptedException {
+    long start = System.currentTimeMillis();
+    while (System.currentTimeMillis() < start + timeoutMillis) {
+      if (expression.get()) break;
+      Thread.sleep(timeoutMillis / 10);
+    }
+    assertTrue(description, expression.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/fdc53f33/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
index f7f0950..e85274d 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
@@ -21,6 +21,7 @@ package org.apache.flume.channel.file;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import org.apache.commons.io.FileUtils;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -63,6 +64,7 @@ public class TestLog {
                            .setLogDirs(dataDirs)
                            .setCheckpointOnClose(false)
                            .setChannelName("testlog")
+                           .setChannelCounter(new FileChannelCounter("testlog"))
                            .build();
     log.replay();
   }
@@ -141,6 +143,7 @@ public class TestLog {
                            .setCheckpointDir(checkpointDir)
                            .setLogDirs(dataDirs)
                            .setChannelName("testlog")
+                           .setChannelCounter(new FileChannelCounter("testlog"))
                            .build();
     log.replay();
     takeAndVerify(eventPointerIn, eventIn);
@@ -163,6 +166,7 @@ public class TestLog {
                            .setCheckpointDir(checkpointDir)
                            .setLogDirs(dataDirs)
                            .setChannelName("testlog")
+                           .setChannelCounter(new FileChannelCounter("testlog"))
                            .build();
     log.replay();
     FlumeEventQueue queue = log.getFlumeEventQueue();
@@ -180,6 +184,7 @@ public class TestLog {
                            .setLogDirs(dataDirs)
                            .setChannelName("testlog")
                            .setMinimumRequiredSpace(Long.MAX_VALUE)
+                           .setChannelCounter(new FileChannelCounter("testlog"))
                            .build();
     try {
       log.replay();
@@ -219,6 +224,7 @@ public class TestLog {
                            .setChannelName("testlog")
                            .setMinimumRequiredSpace(minimumRequiredSpace)
                            .setUsableSpaceRefreshInterval(1L)
+                           .setChannelCounter(new FileChannelCounter("testlog"))
                            .build();
     log.replay();
     File filler = new File(checkpointDir, "filler");
@@ -259,6 +265,7 @@ public class TestLog {
                      .setCheckpointDir(checkpointDir)
                      .setLogDirs(dataDirs)
                      .setChannelName("testlog")
+                     .setChannelCounter(new FileChannelCounter("testlog"))
                      .build();
     log.replay();
     FlumeEventQueue queue = log.getFlumeEventQueue();
@@ -298,6 +305,7 @@ public class TestLog {
                      .setLogDirs(dataDirs)
                      .setChannelName("testlog")
                      .setUseLogReplayV1(useLogReplayV1)
+                     .setChannelCounter(new FileChannelCounter("testlog"))
                      .build();
     log.replay();
     takeAndVerify(eventPointerIn, eventIn);
@@ -314,6 +322,7 @@ public class TestLog {
                      .setCheckpointDir(checkpointDir)
                      .setLogDirs(dataDirs)
                      .setChannelName("testlog")
+                     .setChannelCounter(new FileChannelCounter("testlog"))
                      .build();
     log.replay();
     FlumeEventQueue queue = log.getFlumeEventQueue();
@@ -332,6 +341,7 @@ public class TestLog {
                      .setCheckpointDir(checkpointDir)
                      .setLogDirs(dataDirs)
                      .setChannelName("testlog")
+                     .setChannelCounter(new FileChannelCounter("testlog"))
                      .build();
     log.replay();
     FlumeEventQueue queue = log.getFlumeEventQueue();
@@ -350,6 +360,7 @@ public class TestLog {
                      .setCheckpointDir(checkpointDir)
                      .setLogDirs(dataDirs)
                      .setChannelName("testlog")
+                     .setChannelCounter(new FileChannelCounter("testlog"))
                      .build();
     log.replay();
     FlumeEventQueue queue = log.getFlumeEventQueue();
@@ -402,6 +413,7 @@ public class TestLog {
                            .setLogDirs(dataDirs)
                            .setChannelName("testlog")
                            .setUseFastReplay(useFastReplay)
+                           .setChannelCounter(new FileChannelCounter("testlog"))
                            .build();
     log.replay();
     FlumeEvent eventIn = TestUtils.newPersistableEvent();
@@ -430,6 +442,7 @@ public class TestLog {
                            .setLogDirs(dataDirs)
                            .setChannelName("testlog")
                            .setUseFastReplay(useFastReplay)
+                           .setChannelCounter(new FileChannelCounter("testlog"))
                            .build();
     try {
       log.replay();
@@ -455,6 +468,7 @@ public class TestLog {
                            .setCheckpointDir(checkpointDir)
                            .setLogDirs(dataDirs)
                            .setChannelName("testlog")
+                           .setChannelCounter(new FileChannelCounter("testlog"))
                            .build();
     doTestReplaySucceedsWithUnusedEmptyLogMetaData(eventIn, eventPointer);
   }
@@ -477,6 +491,7 @@ public class TestLog {
                            .setLogDirs(dataDirs)
                            .setChannelName("testlog")
                            .setUseFastReplay(true)
+                           .setChannelCounter(new FileChannelCounter("testlog"))
                            .build();
     doTestReplaySucceedsWithUnusedEmptyLogMetaData(eventIn, eventPointer);
   }
@@ -529,6 +544,7 @@ public class TestLog {
                            .setLogDirs(dataDirs)
                            .setCheckpointOnClose(true)
                            .setChannelName("testLog")
+                           .setChannelCounter(new FileChannelCounter("testlog"))
                            .build();
     log.replay();
 


Mime
View raw message