Return-Path:
X-Original-To: apmail-flume-commits-archive@www.apache.org
Delivered-To: apmail-flume-commits-archive@www.apache.org
Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
by minotaur.apache.org (Postfix) with SMTP id B0BFCF665
for ;
Fri, 26 Apr 2013 08:53:32 +0000 (UTC)
Received: (qmail 304 invoked by uid 500); 26 Apr 2013 08:53:32 -0000
Delivered-To: apmail-flume-commits-archive@flume.apache.org
Received: (qmail 236 invoked by uid 500); 26 Apr 2013 08:53:31 -0000
Mailing-List: contact commits-help@flume.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: dev@flume.apache.org
Delivered-To: mailing list commits@flume.apache.org
Received: (qmail 207 invoked by uid 99); 26 Apr 2013 08:53:31 -0000
Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org)
(140.211.11.114)
by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Apr 2013 08:53:31 +0000
Received: by tyr.zones.apache.org (Postfix, from userid 65534)
id E504F881CCE; Fri, 26 Apr 2013 08:53:30 +0000 (UTC)
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
From: mpercy@apache.org
To: commits@flume.apache.org
Message-Id: <6466fdec0eb1450eb332df1d76e20f51@git.apache.org>
X-Mailer: ASF-Git Admin Mailer
Subject: git commit: FLUME-1940. Log a snapshot of Flume metrics on shutdown.
Date: Fri, 26 Apr 2013 08:53:30 +0000 (UTC)
Updated Branches:
refs/heads/trunk 1da1e5324 -> 215d75eb1
FLUME-1940. Log a snapshot of Flume metrics on shutdown.
(Israel Ekpo via Mike Percy)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/215d75eb
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/215d75eb
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/215d75eb
Branch: refs/heads/trunk
Commit: 215d75eb15362f34cd3246107cbea7ace247af10
Parents: 1da1e53
Author: Mike Percy
Authored: Fri Apr 26 01:52:39 2013 -0700
Committer: Mike Percy
Committed: Fri Apr 26 01:52:39 2013 -0700
----------------------------------------------------------------------
.../instrumentation/MonitoredCounterGroup.java | 121 ++++++++++++++-
1 files changed, 116 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/215d75eb/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
index 502fe9e..c5c2956 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
@@ -19,9 +19,11 @@
package org.apache.flume.instrumentation;
import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
@@ -30,11 +32,23 @@ import javax.management.ObjectName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Used for keeping track of internal metrics using atomic integers
+ *
+ * This is used by a variety of component types such as Sources, Channels,
+ * Sinks, SinkProcessors, ChannelProcessors, Interceptors and Serializers.
+ */
public abstract class MonitoredCounterGroup {
- private static final Logger LOG =
+ private static final Logger logger =
LoggerFactory.getLogger(MonitoredCounterGroup.class);
+ // Key for component's start time in MonitoredCounterGroup.counterMap
+ private static final String COUNTER_GROUP_START_TIME = "start.time";
+
+ // key for component's stop time in MonitoredCounterGroup.counterMap
+ private static final String COUNTER_GROUP_STOP_TIME = "stop.time";
+
private final Type type;
private final String name;
private final Map counterMap;
@@ -62,6 +76,13 @@ public abstract class MonitoredCounterGroup {
}
+ /**
+ * Starts the component
+ *
+ * Initializes the values for the stop time as well as all the keys in the
+ * internal map to zero and sets the start time to the current time in
+ * milliseconds since midnight January 1, 1970 UTC
+ */
public void start() {
register();
@@ -70,7 +91,7 @@ public abstract class MonitoredCounterGroup {
counterMap.get(counter).set(0L);
}
startTime.set(System.currentTimeMillis());
- LOG.info("Component type: " + type + ", name: " + name + " started");
+ logger.info("Component type: " + type + ", name: " + name + " started");
}
/**
@@ -86,24 +107,83 @@ public abstract class MonitoredCounterGroup {
ManagementFactory.getPlatformMBeanServer().registerMBean(this, objName);
registered = true;
- LOG.info("Monitoried counter group for type: " + type + ", name: " + name
+ logger.info("Monitoried counter group for type: " + type + ", name: " + name
+ ", registered successfully.");
} catch (Exception ex) {
- LOG.error("Failed to register monitored counter group for type: "
+ logger.error("Failed to register monitored counter group for type: "
+ type + ", name: " + name, ex);
}
}
}
+ /**
+ * Shuts Down the Component
+ *
+ * Used to indicate that the component is shutting down.
+ *
+ * Sets the stop time and then prints out the metrics from
+ * the internal map of keys to values for the following components:
+ *
+ * - ChannelCounter
+ * - ChannelProcessorCounter
+ * - SinkCounter
+ * - SinkProcessorCounter
+ * - SourceCounter
+ */
public void stop() {
+
+ // Sets the stopTime for the component as the current time in milliseconds
stopTime.set(System.currentTimeMillis());
- LOG.info("Component type: " + type + ", name: " + name + " stopped");
+
+ // Prints out a message indicating that this component has been stopped
+ logger.info("Component type: " + type + ", name: " + name + " stopped");
+
+ // Retrieve the type for this counter group
+ final String typePrefix = type.name().toLowerCase();
+
+ // Print out the startTime for this component
+ logger.info("Shutdown Metric for type: " + type + ", "
+ + "name: " + name + ". "
+ + typePrefix + "." + COUNTER_GROUP_START_TIME
+ + " == " + startTime);
+
+ // Print out the stopTime for this component
+ logger.info("Shutdown Metric for type: " + type + ", "
+ + "name: " + name + ". "
+ + typePrefix + "." + COUNTER_GROUP_STOP_TIME
+ + " == " + stopTime);
+
+ // Retrieve and sort counter group map keys
+ final List mapKeys = new ArrayList(counterMap.keySet());
+
+ Collections.sort(mapKeys);
+
+ // Cycle through and print out all the key value pairs in counterMap
+ for (final String counterMapKey : mapKeys) {
+
+ // Retrieves the value from the original counterMap.
+ final long counterMapValue = get(counterMapKey);
+
+ logger.info("Shutdown Metric for type: " + type + ", "
+ + "name: " + name + ". "
+ + counterMapKey + " == " + counterMapValue);
+ }
}
+ /**
+ * Returns when this component was first started
+ *
+ * @return
+ */
public long getStartTime() {
return startTime.get();
}
+ /**
+ * Returns when this component was stopped
+ *
+ * @return
+ */
public long getStopTime() {
return stopTime.get();
}
@@ -129,22 +209,53 @@ public abstract class MonitoredCounterGroup {
}
+ /**
+ * Retrieves the current value for this key
+ *
+ * @param counter The key for this metric
+ * @return The current value for this key
+ */
protected long get(String counter) {
return counterMap.get(counter).get();
}
+ /**
+ * Sets the value for this key to the given value
+ *
+ * @param counter The key for this metric
+ * @param value The new value for this key
+ */
protected void set(String counter, long value) {
counterMap.get(counter).set(value);
}
+ /**
+ * Atomically adds the delta to the current value for this key
+ *
+ * @param counter The key for this metric
+ * @param delta
+ * @return The updated value for this key
+ */
protected long addAndGet(String counter, long delta) {
return counterMap.get(counter).addAndGet(delta);
}
+ /**
+ * Atomically increments the current value for this key by one
+ *
+ * @param counter The key for this metric
+ * @return The updated value for this key
+ */
protected long increment(String counter) {
return counterMap.get(counter).incrementAndGet();
}
+ /**
+ * Component Enum Constants
+ *
+ * Used by each component's constructor to distinguish which type the
+ * component is.
+ */
public static enum Type {
SOURCE,
CHANNEL_PROCESSOR,