flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bes...@apache.org
Subject flume git commit: FLUME-3025. Expose FileChannel.open on JMX
Date Mon, 21 Nov 2016 21:42:10 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk de941e7b8 -> 21a64db1b


FLUME-3025. Expose FileChannel.open on JMX

This patch exposes the FileChannel's open flag on JMX to make it possible to detect when it
wasn't able to start up. In those cases the open flag is false.

This closes #85

Reviewers: Attila Simon, Bessenyei Balázs Donát

(Denes Arvay via Bessenyei Balázs Donát)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/21a64db1
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/21a64db1
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/21a64db1

Branch: refs/heads/trunk
Commit: 21a64db1b0ebcb812e5c510668b580e84ff37f35
Parents: de941e7
Author: Denes Arvay <denes@cloudera.com>
Authored: Mon Nov 21 21:28:22 2016 +0000
Committer: Bessenyei Balázs Donát <bessbd@apache.org>
Committed: Mon Nov 21 21:41:31 2016 +0000

----------------------------------------------------------------------
 .../apache/flume/channel/file/FileChannel.java  | 30 ++++++++++++---
 .../instrumentation/FileChannelCounter.java     | 39 ++++++++++++++++++++
 .../FileChannelCounterMBean.java                | 26 +++++++++++++
 .../flume/channel/file/TestFileChannel.java     | 14 +++++++
 4 files changed, 103 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/21a64db1/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index 9d82e43..eca4620 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -40,6 +40,7 @@ import org.apache.flume.channel.file.encryption.EncryptionConfiguration;
 import org.apache.flume.channel.file.encryption.KeyProvider;
 import org.apache.flume.channel.file.encryption.KeyProviderFactory;
 import org.apache.flume.instrumentation.ChannelCounter;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,7 +94,7 @@ public class FileChannel extends BasicChannelSemantics {
   private final ThreadLocal<FileBackedTransaction> transactions =
       new ThreadLocal<FileBackedTransaction>();
   private String channelNameDescriptor = "[channel=unknown]";
-  private ChannelCounter channelCounter;
+  private FileChannelCounter channelCounter;
   private boolean useLogReplayV1;
   private boolean useFastReplay = false;
   private KeyProvider encryptionKeyProvider;
@@ -267,13 +268,14 @@ public class FileChannel extends BasicChannelSemantics {
     }
 
     if (channelCounter == null) {
-      channelCounter = new ChannelCounter(getName());
+      channelCounter = new FileChannelCounter(getName());
     }
   }
 
   @Override
   public synchronized void start() {
     LOG.info("Starting {}...", this);
+    channelCounter.start();
     try {
       Builder builder = new Log.Builder();
       builder.setCheckpointInterval(checkpointInterval);
@@ -296,7 +298,7 @@ public class FileChannel extends BasicChannelSemantics {
       builder.setCheckpointOnClose(checkpointOnClose);
       log = builder.build();
       log.replay();
-      open = true;
+      setOpen(true);
 
       int depth = getDepth();
       Preconditions.checkState(queueRemaining.tryAcquire(depth),
@@ -304,7 +306,7 @@ public class FileChannel extends BasicChannelSemantics {
       LOG.info("Queue Size after replay: " + depth + " "
           + channelNameDescriptor);
     } catch (Throwable t) {
-      open = false;
+      setOpen(false);
       startupError = t;
       LOG.error("Failed to start the file channel " + channelNameDescriptor, t);
       if (t instanceof Error) {
@@ -312,7 +314,6 @@ public class FileChannel extends BasicChannelSemantics {
       }
     }
     if (open) {
-      channelCounter.start();
       channelCounter.setChannelSize(getDepth());
       channelCounter.setChannelCapacity(capacity);
     }
@@ -373,7 +374,7 @@ public class FileChannel extends BasicChannelSemantics {
 
   void close() {
     if (open) {
-      open = false;
+      setOpen(false);
       try {
         log.close();
       } catch (Exception e) {
@@ -401,6 +402,18 @@ public class FileChannel extends BasicChannelSemantics {
   }
 
   /**
+   * This method makes sure that <code>this.open</code> and <code>channelCounter.open</code>
+   * are in sync.
+   * Only for internal use, call from synchronized methods only. It also assumes that
+   * <code>channelCounter</code> is not null.
+   * @param open
+   */
+  private void setOpen(boolean open) {
+    this.open = open;
+    channelCounter.setOpen(this.open);
+  }
+
+  /**
    * Did this channel recover a backup of the checkpoint to restart?
    *
    * @return true if the channel recovered using a backup.
@@ -418,6 +431,11 @@ public class FileChannel extends BasicChannelSemantics {
     return log;
   }
 
+  @VisibleForTesting
+  FileChannelCounter getChannelCounter() {
+    return channelCounter;
+  }
+
   /**
    * Transaction backed by a file. This transaction supports either puts
    * or takes but not both.

http://git-wip-us.apache.org/repos/asf/flume/blob/21a64db1/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
new file mode 100644
index 0000000..1cd1ba8
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file.instrumentation;
+
+import org.apache.flume.instrumentation.ChannelCounter;
+
+public class FileChannelCounter extends ChannelCounter implements FileChannelCounterMBean
{
+
+  private boolean open;
+
+  public FileChannelCounter(String name) {
+    super(name);
+  }
+
+  @Override
+  public boolean isOpen() {
+    return open;
+  }
+
+  public void setOpen(boolean open) {
+    this.open = open;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/21a64db1/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
new file mode 100644
index 0000000..a193c0c
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file.instrumentation;
+
+import org.apache.flume.instrumentation.ChannelCounterMBean;
+
+public interface FileChannelCounterMBean extends ChannelCounterMBean {
+
+  boolean isOpen();
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/21a64db1/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
index bfc2d0d..8efe991 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
@@ -26,6 +26,7 @@ import org.apache.flume.Event;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.file.FileChannel.FileBackedTransaction;
 import org.apache.flume.channel.file.FlumeEventQueue.InflightEventWrapper;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.junit.After;
@@ -632,4 +633,17 @@ public class TestFileChannel extends TestFileChannelBase {
     }
   }
 
+  @Test
+  public void testFileChannelCounterIsOpen() {
+    FileChannel channel = createFileChannel();
+    FileChannelCounter counter = channel.getChannelCounter();
+    Assert.assertEquals(counter.isOpen(), false);
+
+    channel.start();
+    Assert.assertEquals(counter.isOpen(), true);
+
+    channel.stop();
+    Assert.assertEquals(counter.isOpen(), false);
+  }
+
 }


Mime
View raw message