Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B0BFCF665 for ; Fri, 26 Apr 2013 08:53:32 +0000 (UTC) Received: (qmail 304 invoked by uid 500); 26 Apr 2013 08:53:32 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 236 invoked by uid 500); 26 Apr 2013 08:53:31 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 207 invoked by uid 99); 26 Apr 2013 08:53:31 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Apr 2013 08:53:31 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id E504F881CCE; Fri, 26 Apr 2013 08:53:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mpercy@apache.org To: commits@flume.apache.org Message-Id: <6466fdec0eb1450eb332df1d76e20f51@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: FLUME-1940. Log a snapshot of Flume metrics on shutdown. Date: Fri, 26 Apr 2013 08:53:30 +0000 (UTC) Updated Branches: refs/heads/trunk 1da1e5324 -> 215d75eb1 FLUME-1940. Log a snapshot of Flume metrics on shutdown. (Israel Ekpo via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/215d75eb Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/215d75eb Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/215d75eb Branch: refs/heads/trunk Commit: 215d75eb15362f34cd3246107cbea7ace247af10 Parents: 1da1e53 Author: Mike Percy Authored: Fri Apr 26 01:52:39 2013 -0700 Committer: Mike Percy Committed: Fri Apr 26 01:52:39 2013 -0700 ---------------------------------------------------------------------- .../instrumentation/MonitoredCounterGroup.java | 121 ++++++++++++++- 1 files changed, 116 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/215d75eb/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java index 502fe9e..c5c2956 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java @@ -19,9 +19,11 @@ package org.apache.flume.instrumentation; import java.lang.management.ManagementFactory; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -30,11 +32,23 @@ import javax.management.ObjectName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Used for keeping track of internal metrics using atomic integers

+ * + * This is used by a variety of component types such as Sources, Channels, + * Sinks, SinkProcessors, ChannelProcessors, Interceptors and Serializers. + */ public abstract class MonitoredCounterGroup { - private static final Logger LOG = + private static final Logger logger = LoggerFactory.getLogger(MonitoredCounterGroup.class); + // Key for component's start time in MonitoredCounterGroup.counterMap + private static final String COUNTER_GROUP_START_TIME = "start.time"; + + // key for component's stop time in MonitoredCounterGroup.counterMap + private static final String COUNTER_GROUP_STOP_TIME = "stop.time"; + private final Type type; private final String name; private final Map counterMap; @@ -62,6 +76,13 @@ public abstract class MonitoredCounterGroup { } + /** + * Starts the component + * + * Initializes the values for the stop time as well as all the keys in the + * internal map to zero and sets the start time to the current time in + * milliseconds since midnight January 1, 1970 UTC + */ public void start() { register(); @@ -70,7 +91,7 @@ public abstract class MonitoredCounterGroup { counterMap.get(counter).set(0L); } startTime.set(System.currentTimeMillis()); - LOG.info("Component type: " + type + ", name: " + name + " started"); + logger.info("Component type: " + type + ", name: " + name + " started"); } /** @@ -86,24 +107,83 @@ public abstract class MonitoredCounterGroup { ManagementFactory.getPlatformMBeanServer().registerMBean(this, objName); registered = true; - LOG.info("Monitoried counter group for type: " + type + ", name: " + name + logger.info("Monitoried counter group for type: " + type + ", name: " + name + ", registered successfully."); } catch (Exception ex) { - LOG.error("Failed to register monitored counter group for type: " + logger.error("Failed to register monitored counter group for type: " + type + ", name: " + name, ex); } } } + /** + * Shuts Down the Component + * + * Used to indicate that the component is shutting down. + * + * Sets the stop time and then prints out the metrics from + * the internal map of keys to values for the following components: + * + * - ChannelCounter + * - ChannelProcessorCounter + * - SinkCounter + * - SinkProcessorCounter + * - SourceCounter + */ public void stop() { + + // Sets the stopTime for the component as the current time in milliseconds stopTime.set(System.currentTimeMillis()); - LOG.info("Component type: " + type + ", name: " + name + " stopped"); + + // Prints out a message indicating that this component has been stopped + logger.info("Component type: " + type + ", name: " + name + " stopped"); + + // Retrieve the type for this counter group + final String typePrefix = type.name().toLowerCase(); + + // Print out the startTime for this component + logger.info("Shutdown Metric for type: " + type + ", " + + "name: " + name + ". " + + typePrefix + "." + COUNTER_GROUP_START_TIME + + " == " + startTime); + + // Print out the stopTime for this component + logger.info("Shutdown Metric for type: " + type + ", " + + "name: " + name + ". " + + typePrefix + "." + COUNTER_GROUP_STOP_TIME + + " == " + stopTime); + + // Retrieve and sort counter group map keys + final List mapKeys = new ArrayList(counterMap.keySet()); + + Collections.sort(mapKeys); + + // Cycle through and print out all the key value pairs in counterMap + for (final String counterMapKey : mapKeys) { + + // Retrieves the value from the original counterMap. + final long counterMapValue = get(counterMapKey); + + logger.info("Shutdown Metric for type: " + type + ", " + + "name: " + name + ". " + + counterMapKey + " == " + counterMapValue); + } } + /** + * Returns when this component was first started + * + * @return + */ public long getStartTime() { return startTime.get(); } + /** + * Returns when this component was stopped + * + * @return + */ public long getStopTime() { return stopTime.get(); } @@ -129,22 +209,53 @@ public abstract class MonitoredCounterGroup { } + /** + * Retrieves the current value for this key + * + * @param counter The key for this metric + * @return The current value for this key + */ protected long get(String counter) { return counterMap.get(counter).get(); } + /** + * Sets the value for this key to the given value + * + * @param counter The key for this metric + * @param value The new value for this key + */ protected void set(String counter, long value) { counterMap.get(counter).set(value); } + /** + * Atomically adds the delta to the current value for this key + * + * @param counter The key for this metric + * @param delta + * @return The updated value for this key + */ protected long addAndGet(String counter, long delta) { return counterMap.get(counter).addAndGet(delta); } + /** + * Atomically increments the current value for this key by one + * + * @param counter The key for this metric + * @return The updated value for this key + */ protected long increment(String counter) { return counterMap.get(counter).incrementAndGet(); } + /** + * Component Enum Constants + * + * Used by each component's constructor to distinguish which type the + * component is. + */ public static enum Type { SOURCE, CHANNEL_PROCESSOR,