flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject git commit: FLUME-1940. Log a snapshot of Flume metrics on shutdown.
Date Fri, 26 Apr 2013 08:53:34 GMT
Updated Branches:
  refs/heads/flume-1.4 7652ad86d -> 8655387c2


FLUME-1940. Log a snapshot of Flume metrics on shutdown.

(Israel Ekpo via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/8655387c
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/8655387c
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/8655387c

Branch: refs/heads/flume-1.4
Commit: 8655387c2249d3e1fcbc5dbea26525de75648d2c
Parents: 7652ad8
Author: Mike Percy <mpercy@apache.org>
Authored: Fri Apr 26 01:52:39 2013 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Fri Apr 26 01:53:12 2013 -0700

----------------------------------------------------------------------
 .../instrumentation/MonitoredCounterGroup.java     |  121 ++++++++++++++-
 1 files changed, 116 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/8655387c/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
index 502fe9e..c5c2956 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
@@ -19,9 +19,11 @@
 package org.apache.flume.instrumentation;
 
 import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -30,11 +32,23 @@ import javax.management.ObjectName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Used for keeping track of internal metrics using atomic integers</p>
+ *
+ * This is used by a variety of component types such as Sources, Channels,
+ * Sinks, SinkProcessors, ChannelProcessors, Interceptors and Serializers.
+ */
 public abstract class MonitoredCounterGroup {
 
-  private static final Logger LOG =
+  private static final Logger logger =
       LoggerFactory.getLogger(MonitoredCounterGroup.class);
 
+  // Key for component's start time in MonitoredCounterGroup.counterMap
+  private static final String COUNTER_GROUP_START_TIME = "start.time";
+
+  // key for component's stop time in MonitoredCounterGroup.counterMap
+  private static final String COUNTER_GROUP_STOP_TIME = "stop.time";
+
   private final Type type;
   private final String name;
   private final Map<String, AtomicLong> counterMap;
@@ -62,6 +76,13 @@ public abstract class MonitoredCounterGroup {
 
   }
 
+  /**
+   * Starts the component
+   *
+   * Initializes the values for the stop time as well as all the keys in the
+   * internal map to zero and sets the start time to the current time in
+   * milliseconds since midnight January 1, 1970 UTC
+   */
   public void start() {
 
     register();
@@ -70,7 +91,7 @@ public abstract class MonitoredCounterGroup {
       counterMap.get(counter).set(0L);
     }
     startTime.set(System.currentTimeMillis());
-    LOG.info("Component type: " + type + ", name: " + name + " started");
+    logger.info("Component type: " + type + ", name: " + name + " started");
   }
 
   /**
@@ -86,24 +107,83 @@ public abstract class MonitoredCounterGroup {
 
         ManagementFactory.getPlatformMBeanServer().registerMBean(this, objName);
         registered = true;
-        LOG.info("Monitoried counter group for type: " + type + ", name: " + name
+        logger.info("Monitoried counter group for type: " + type + ", name: " + name
                 + ", registered successfully.");
       } catch (Exception ex) {
-        LOG.error("Failed to register monitored counter group for type: "
+        logger.error("Failed to register monitored counter group for type: "
                 + type + ", name: " + name, ex);
       }
     }
   }
 
+  /**
+   * Shuts Down the Component
+   *
+   * Used to indicate that the component is shutting down.
+   *
+   * Sets the stop time and then prints out the metrics from
+   * the internal map of keys to values for the following components:
+   *
+   * - ChannelCounter
+   * - ChannelProcessorCounter
+   * - SinkCounter
+   * - SinkProcessorCounter
+   * - SourceCounter
+   */
   public void stop() {
+
+    // Sets the stopTime for the component as the current time in milliseconds
     stopTime.set(System.currentTimeMillis());
-    LOG.info("Component type: " + type + ", name: " + name + " stopped");
+
+    // Prints out a message indicating that this component has been stopped
+    logger.info("Component type: " + type + ", name: " + name + " stopped");
+
+    // Retrieve the type for this counter group
+    final String typePrefix = type.name().toLowerCase();
+
+    // Print out the startTime for this component
+    logger.info("Shutdown Metric for type: " + type + ", "
+      + "name: " + name + ". "
+      + typePrefix + "." + COUNTER_GROUP_START_TIME
+      + " == " + startTime);
+
+    // Print out the stopTime for this component
+    logger.info("Shutdown Metric for type: " + type + ", "
+      + "name: " + name + ". "
+      + typePrefix + "." + COUNTER_GROUP_STOP_TIME
+      + " == " + stopTime);
+
+    // Retrieve and sort counter group map keys
+    final List<String> mapKeys = new ArrayList<String>(counterMap.keySet());
+
+    Collections.sort(mapKeys);
+
+    // Cycle through and print out all the key value pairs in counterMap
+    for (final String counterMapKey : mapKeys) {
+
+      // Retrieves the value from the original counterMap.
+      final long counterMapValue = get(counterMapKey);
+
+      logger.info("Shutdown Metric for type: " + type + ", "
+        + "name: " + name + ". "
+        + counterMapKey + " == " + counterMapValue);
+    }
   }
 
+  /**
+   * Returns when this component was first started
+   *
+   * @return
+   */
   public long getStartTime() {
     return startTime.get();
   }
 
+  /**
+   * Returns when this component was stopped
+   *
+   * @return
+   */
   public long getStopTime() {
     return stopTime.get();
   }
@@ -129,22 +209,53 @@ public abstract class MonitoredCounterGroup {
   }
 
 
+  /**
+   * Retrieves the current value for this key
+   *
+   * @param counter The key for this metric
+   * @return The current value for this key
+   */
   protected long get(String counter) {
     return counterMap.get(counter).get();
   }
 
+  /**
+   * Sets the value for this key to the given value
+   *
+   * @param counter The key for this metric
+   * @param value The new value for this key
+   */
   protected void set(String counter, long value) {
     counterMap.get(counter).set(value);
   }
 
+  /**
+   * Atomically adds the delta to the current value for this key
+   *
+   * @param counter The key for this metric
+   * @param delta
+   * @return The updated value for this key
+   */
   protected long addAndGet(String counter, long delta) {
     return counterMap.get(counter).addAndGet(delta);
   }
 
+  /**
+   * Atomically increments the current value for this key by one
+   *
+   * @param counter The key for this metric
+   * @return The updated value for this key
+   */
   protected long increment(String counter) {
     return counterMap.get(counter).incrementAndGet();
   }
 
+  /**
+   * Component Enum Constants
+   *
+   * Used by each component's constructor to distinguish which type the
+   * component is.
+   */
   public static enum Type {
     SOURCE,
     CHANNEL_PROCESSOR,


Mime
View raw message