chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r760991 - in /hadoop/chukwa/trunk: ./ bin/ conf/ src/java/org/apache/hadoop/chukwa/datacollection/agent/ src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/ src/java/org/apache/hadoop/chukwa/datacollection/sender/ src/java/org/a...
Date Wed, 01 Apr 2009 18:14:44 GMT
Author: asrabkin
Date: Wed Apr  1 18:14:42 2009
New Revision: 760991

URL: http://svn.apache.org/viewvc?rev=760991&view=rev
Log:
CHUKWA-12.  Add instrumentation Api for Chukwa components.  (Contributed by Jerome Boulon)

Added:
    hadoop/chukwa/trunk/conf/hadoop-metrics.properties.template
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/AgentActivityMBean.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/AgentMetrics.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/ChunkQueueActivityMBean.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/ChunkQueueMetrics.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/metrics/
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/metrics/HttpSenderActivityMBean.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/metrics/HttpSenderMetrics.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/metrics/
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/metrics/spi/
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/bin/VERSION
    hadoop/chukwa/trunk/bin/agent.sh
    hadoop/chukwa/trunk/bin/chukwa-config.sh
    hadoop/chukwa/trunk/build.xml
    hadoop/chukwa/trunk/conf/chukwa-demux-conf.xml.template
    hadoop/chukwa/trunk/conf/chukwa-hadoop-metrics-log4j.properties
    hadoop/chukwa/trunk/conf/hadoop-metrics.properties
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=760991&r1=760990&r2=760991&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Wed Apr  1 18:14:42 2009
@@ -3,6 +3,9 @@
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    
+    CHUKWA-12.  Add instrumention Api for Chukwa components.  (Jerome Boulon via asrabkin)
+
     CHUKWA-14.  Added permalink support to HICC GUI. (Eric Yang)
 
     CHUKWA-45.  Added docs to folder structure. (Corinne Chandel via asrabkin)

Modified: hadoop/chukwa/trunk/bin/VERSION
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/bin/VERSION?rev=760991&r1=760990&r2=760991&view=diff
==============================================================================
--- hadoop/chukwa/trunk/bin/VERSION (original)
+++ hadoop/chukwa/trunk/bin/VERSION Wed Apr  1 18:14:42 2009
@@ -1 +1 @@
-0.1.1
+0.1.2

Modified: hadoop/chukwa/trunk/bin/agent.sh
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/bin/agent.sh?rev=760991&r1=760990&r2=760991&view=diff
==============================================================================
--- hadoop/chukwa/trunk/bin/agent.sh (original)
+++ hadoop/chukwa/trunk/bin/agent.sh Wed Apr  1 18:14:42 2009
@@ -30,7 +30,7 @@
 }
 
 trap stop SIGHUP SIGINT SIGTERM
-echo "hadoop jar for agent is " ${HADOOP_JAR}
+echo "hadoop jar for agent is " ${HADOOP_20_JAR}
 
 if [ "X$1" = "Xstop" ]; then
    stop
@@ -41,7 +41,6 @@
   chmod 777 ${CHUKWA_LOG_DIR}/metrics
 fi
 
-
-${JAVA_HOME}/bin/java -Xms32M -Xmx64M -DAPP=agent -Dlog4j.configuration=chukwa-log4j.properties -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -classpath ${CLASSPATH}:${CHUKWA_AGENT}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${CHUKWA_CONF_DIR} org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent $@ &
+${JAVA_HOME}/bin/java -Xms32M -Xmx64M -DAPP=agent -Dlog4j.configuration=chukwa-log4j.properties -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -classpath ${CHUKWA_CONF_DIR}:${CLASSPATH}:${CHUKWA_AGENT}:${CHUKWA_CORE}:${HADOOP_20_JAR}:${COMMON} org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent $@ &
 
 wait $!

Modified: hadoop/chukwa/trunk/bin/chukwa-config.sh
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/bin/chukwa-config.sh?rev=760991&r1=760990&r2=760991&view=diff
==============================================================================
--- hadoop/chukwa/trunk/bin/chukwa-config.sh (original)
+++ hadoop/chukwa/trunk/bin/chukwa-config.sh Wed Apr  1 18:14:42 2009
@@ -100,6 +100,9 @@
 export COMMON=`echo ${COMMON} | sed 'y/ /:/'`
 export CHUKWA_CORE=${CHUKWA_HOME}/chukwa-core-${CHUKWA_VERSION}.jar
 export CHUKWA_AGENT=${CHUKWA_HOME}/chukwa-agent-${CHUKWA_VERSION}.jar
+export HADOOP_20_JAR=${CHUKWA_HOME}/hadoopjars/hadoop-0.20-dev-core.jar
+export HADOOP_18_JAR=${CHUKWA_HOME}/hadoopjars/hadoop-0.18.0-core.jar
+
 export CURRENT_DATE=`date +%Y%m%d%H%M`
 
 if [ -z ${HADOOP_JAR} ]; then

Modified: hadoop/chukwa/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/build.xml?rev=760991&r1=760990&r2=760991&view=diff
==============================================================================
--- hadoop/chukwa/trunk/build.xml (original)
+++ hadoop/chukwa/trunk/build.xml Wed Apr  1 18:14:42 2009
@@ -105,7 +105,7 @@
                           <exclude name="**/*core*.jar" />
                   </fileset>
                   <fileset dir="${hadoop.home.jars.dir}">   <!-- ASR -->
-                          <include name="**/*core*.jar" />
+                          <include name="**/hadoop-0.20*.jar" />
                   </fileset>
                   <path refid="contrib-classpath"/>
           </path>
@@ -324,8 +324,9 @@
 		<javac srcdir="src/java" destdir="${build.classes}" debug="${javac.debug}">
                         <compilerarg value="-Xlint"/>
 			<classpath refid="classpath" />
-                        <exclude name="org/apache/hadoop/*" />
+                        <exclude name="org/apache/hadoop/mapred/**" />
                         <include name="org/apache/hadoop/chukwa/datacollection/**" />
+                        <include name="org/apache/hadoop/metrics/spi/**" />
 		</javac>
 	</target>
 
@@ -471,16 +472,25 @@
 			<fileset dir="${build.dir}">
 				<include name="org/apache/hadoop/chukwa/conf/**/*.class"/>
 			</fileset>
-			<fileset dir="${build.dir}">
+                        <fileset dir="${build.classes}">
+                                <include name="org/apache/hadoop/metrics/**/*.class"/>
+                        </fileset>
+
+			<fileset dir="${build.classes}">
 				<include name="org/apache/hadoop/chukwa/util/**/*.class"/>
 			</fileset>
-			<fileset dir="${build.dir}">
+			<fileset dir="${build.classes}">
 				<include name="org/apache/hadoop/chukwa/inputtools/**/*.class"/>
 			</fileset>
-			<fileset dir="${build.dir}">
+			<fileset dir="${build.classes}">
+                                <exclude name="org/apache/hadoop/chukwa/datacollection/collector/**/*.class"/>
+                                <exclude name="org/apache/hadoop/chukwa/datacollection/writer/**/*.class"/>
 				<include name="org/apache/hadoop/chukwa/datacollection/**/*.class"/>
 			</fileset>
 
+                        <fileset dir="${basedir}/src/java">
+                                <include name="org/apache/hadoop/metrics/**/*.java"/>
+                        </fileset>
 			<fileset dir="${basedir}/src/java">
 				<include name="org/apache/hadoop/chukwa/client/**/*.java"/>
 			</fileset>
@@ -523,15 +533,6 @@
 
 	<target name="chukwa-hadoop_jar" depends="compile" description="Create chukwa_hadoop jar for use with getting hadoop to use chukwa">
                 <mkdir dir="${build.dir}/conf"/>
-                <copy todir="${build.dir}/conf">
-                    <fileset dir="${basedir}/conf">
-                        <include name="chukwa-hadoop-metrics-log4j.properties"/>
-                    </fileset>
-                    <filterset>
-                        <filter token="CHUKWA_LOG_DIR" value="${CHUKWA_LOG_DIR}"/>
-                    </filterset>
-                </copy>
-
 		<jar jarfile="${build.dir}/chukwa-hadoop-${chukwaVersion}-client.jar" basedir="${build.classes}" includes="org/apache/hadoop/chukwa/inputtools/log4j/**/*.class">
 			<fileset dir="${basedir}/src/java">
 				<include name="org/apache/hadoop/mapred/**/*.java"/>

Modified: hadoop/chukwa/trunk/conf/chukwa-demux-conf.xml.template
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/conf/chukwa-demux-conf.xml.template?rev=760991&r1=760990&r2=760991&view=diff
==============================================================================
--- hadoop/chukwa/trunk/conf/chukwa-demux-conf.xml.template (original)
+++ hadoop/chukwa/trunk/conf/chukwa-demux-conf.xml.template Wed Apr  1 18:14:42 2009
@@ -12,6 +12,30 @@
     <description>Reduce count </description>
   </property>
 
+  <property>
+    <name>jvm</name>
+    <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.HadoopMetricsProcessor</value>
+    <description>Parser class for </description>
+  </property>
+
+  <property>
+    <name>mapred</name>
+    <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.HadoopMetricsProcessor</value>
+    <description>Parser class for </description>
+  </property>
+
+  <property>
+    <name>rpc</name>
+    <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.HadoopMetricsProcessor</value>
+    <description>Parser class for </description>
+  </property>
+
+  <property>
+    <name>dfs</name>
+    <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.HadoopMetricsProcessor</value>
+    <description>Parser class for </description>
+  </property>
+
   
   <property>
     <name>SysLog</name>
@@ -32,12 +56,6 @@
   </property>
 
   <property>
-    <name>HadoopMetricsProcessor</name>
-    <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.HadoopMetricsProcessor</value>
-    <description>Parser class for </description>
-  </property>
-
-  <property>
     <name>Iostat</name>
     <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Iostat</value>
     <description>Parser class for </description>
@@ -87,12 +105,6 @@
    </property>
   
    <property>
-    <name>YWatch</name>
-    <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.YWatch</value>
-    <description>Parser class for </description>
-   </property>
-  
-   <property>
     <name>DbLoader</name>
     <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.TsProcessor</value>
     <description>Parser class for </description>

Modified: hadoop/chukwa/trunk/conf/chukwa-hadoop-metrics-log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/conf/chukwa-hadoop-metrics-log4j.properties?rev=760991&r1=760990&r2=760991&view=diff
==============================================================================
--- hadoop/chukwa/trunk/conf/chukwa-hadoop-metrics-log4j.properties (original)
+++ hadoop/chukwa/trunk/conf/chukwa-hadoop-metrics-log4j.properties Wed Apr  1 18:14:42 2009
@@ -1,31 +0,0 @@
-log4j.appender.chukwa.rpc.recordType=HadoopMetricsProcessor
-log4j.appender.chukwa.rpc.chukwaClientHostname=localhost
-log4j.appender.chukwa.rpc.chukwaClientPortNum=9093
-log4j.appender.chukwa.rpc.DatePattern=.yyyy-MM-dd
-log4j.appender.chukwa.rpc.layout=org.apache.log4j.PatternLayout
-log4j.appender.chukwa.rpc.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
-log4j.appender.chukwa.rpc.Dir=@CHUKWA_LOG_DIR@/metrics
-
-log4j.appender.chukwa.jvm.recordType=HadoopMetricsProcessor
-log4j.appender.chukwa.jvm.chukwaClientHostname=localhost
-log4j.appender.chukwa.jvm.chukwaClientPortNum=9093
-log4j.appender.chukwa.jvm.DatePattern=.yyyy-MM-dd
-log4j.appender.chukwa.jvm.layout=org.apache.log4j.PatternLayout
-log4j.appender.chukwa.jvm.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
-log4j.appender.chukwa.jvm.Dir=@CHUKWA_LOG_DIR@/metrics
-
-log4j.appender.chukwa.dfs.recordType=HadoopMetricsProcessor
-log4j.appender.chukwa.dfs.chukwaClientHostname=localhost
-log4j.appender.chukwa.dfs.chukwaClientPortNum=9093
-log4j.appender.chukwa.dfs.DatePattern=.yyyy-MM-dd
-log4j.appender.chukwa.dfs.layout=org.apache.log4j.PatternLayout
-log4j.appender.chukwa.dfs.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
-log4j.appender.chukwa.dfs.Dir=@CHUKWA_LOG_DIR@/metrics
-
-log4j.appender.chukwa.mapred.recordType=HadoopMetricsProcessor
-log4j.appender.chukwa.mapred.chukwaClientHostname=localhost
-log4j.appender.chukwa.mapred.chukwaClientPortNum=9093
-log4j.appender.chukwa.mapred.DatePattern=.yyyy-MM-dd
-log4j.appender.chukwa.mapred.layout=org.apache.log4j.PatternLayout
-log4j.appender.chukwa.mapred.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
-log4j.appender.chukwa.mapred.Dir=@CHUKWA_LOG_DIR@/metrics

Modified: hadoop/chukwa/trunk/conf/hadoop-metrics.properties
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/conf/hadoop-metrics.properties?rev=760991&r1=760990&r2=760991&view=diff
==============================================================================
--- hadoop/chukwa/trunk/conf/hadoop-metrics.properties (original)
+++ hadoop/chukwa/trunk/conf/hadoop-metrics.properties Wed Apr  1 18:14:42 2009
@@ -1,11 +0,0 @@
-dfs.class=org.apache.hadoop.chukwa.inputtools.log4j.Log4JMetricsContext
-dfs.period=60
-
-jvm.class=org.apache.hadoop.chukwa.inputtools.log4j.Log4JMetricsContext
-jvm.period=60
-
-mapred.class=org.apache.hadoop.chukwa.inputtools.log4j.Log4JMetricsContext
-mapred.period=60
-
-rpc.class=org.apache.hadoop.chukwa.inputtools.log4j.Log4JMetricsContext
-rpc.period=60

Added: hadoop/chukwa/trunk/conf/hadoop-metrics.properties.template
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/conf/hadoop-metrics.properties.template?rev=760991&view=auto
==============================================================================
--- hadoop/chukwa/trunk/conf/hadoop-metrics.properties.template (added)
+++ hadoop/chukwa/trunk/conf/hadoop-metrics.properties.template Wed Apr  1 18:14:42 2009
@@ -0,0 +1,31 @@
+#chukwaAgent.class=org.apache.hadoop.chukwa.inputtools.log4j.Log4JMetricsContext
+#chukwaAgent.period=60
+#chukwaAgent.directory=/tmp/
+ 
+#chukwaHttpSender.class=org.apache.hadoop.chukwa.inputtools.log4j.Log4JMetricsContext
+#chukwaHttpSender.period=60
+#chukwaHttpSender.directory=/tmp/
+
+#chunkQueue.class=org.apache.hadoop.chukwa.inputtools.log4j.Log4JMetricsContext
+#chunkQueue.period=60
+#chunkQueue.directory=/tmp/
+
+#dfs.class=org.apache.hadoop.chukwa.inputtools.log4j.Log4JMetricsContext
+#dfs.period=60
+#dfs.directory=/tmp/
+
+#jvm.class=org.apache.hadoop.chukwa.inputtools.log4j.Log4JMetricsContext
+#jvm.period=60
+#jvm.directory=/tmp/
+#jvm.uuid=true
+
+#mapred.class=org.apache.hadoop.chukwa.inputtools.log4j.Log4JMetricsContext
+#mapred.period=60
+#mapred.directory=/tmp/
+#mapred.uuid=true
+
+#rpc.class=org.apache.hadoop.chukwa.inputtools.log4j.Log4JMetricsContext
+#rpc.period=60
+#rpc.directory=/tmp/
+#rpc.uuid=true
+

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=760991&r1=760990&r2=760991&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Wed Apr  1 18:14:42 2009
@@ -21,6 +21,7 @@
 
 import org.apache.hadoop.chukwa.datacollection.DataFactory;
 import org.apache.hadoop.chukwa.datacollection.adaptor.*;
+import org.apache.hadoop.chukwa.datacollection.agent.metrics.AgentMetrics;
 import org.apache.hadoop.chukwa.datacollection.connector.*;
 import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
 import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
@@ -41,7 +42,7 @@
  */
 public class ChukwaAgent {
   // boolean WRITE_CHECKPOINTS = true;
-
+  static final AgentMetrics agentMetrics = new AgentMetrics("ChukwaAgent", "chukwaAgent");;
   static Logger log = Logger.getLogger(ChukwaAgent.class);
   static ChukwaAgent agent = null;
   private static PidFile pFile = null;
@@ -308,6 +309,8 @@
           adaptor.start(adaptorID, dataType, params, offset, DataFactory
               .getInstance().getEventQueue());
           log.info("started a new adaptor, id = " + adaptorID);
+          ChukwaAgent.agentMetrics.adaptorCount.set(adaptorsByNumber.size());
+          ChukwaAgent.agentMetrics.addedAdaptor.inc();
           return adaptorID;
 
         } catch (Exception e) {
@@ -483,7 +486,9 @@
     } else {
       adaptorPositions.remove(toStop);
     }
-
+    ChukwaAgent.agentMetrics.adaptorCount.set(adaptorsByNumber.size());
+    ChukwaAgent.agentMetrics.removedAdaptor.inc();
+    
     try {
       if (gracefully) {
         offset = toStop.shutdown();

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java?rev=760991&r1=760990&r2=760991&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java Wed Apr  1 18:14:42 2009
@@ -24,6 +24,7 @@
 import java.util.Queue;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
+import org.apache.hadoop.chukwa.datacollection.agent.metrics.ChunkQueueMetrics;
 import org.apache.log4j.Logger;
 
 /**
@@ -35,7 +36,7 @@
  */
 public class MemLimitQueue implements ChunkQueue {
   static Logger log = Logger.getLogger(WaitingQueue.class);
-
+  static final ChunkQueueMetrics metrics = new ChunkQueueMetrics("ChukwaAgent", "chunkQueue");;
   private Queue<Chunk> queue = new LinkedList<Chunk>();
   private long dataSize = 0;
   private final long MAX_MEM_USAGE;
@@ -52,13 +53,18 @@
     synchronized (this) {
       while (chunk.getData().length + dataSize > MAX_MEM_USAGE) {
         try {
+          metrics.fullQueue.set(1);
           this.wait();
           log.info("MemLimitQueue is full [" + dataSize + "]");
         } catch (InterruptedException e) {
         }
       }
+      metrics.fullQueue.set(0);
       dataSize += chunk.getData().length;
       queue.add(chunk);
+      metrics.addedChunk.inc();
+      metrics.queueSize.set(queue.size());
+      metrics.dataSize.set(dataSize);
       this.notifyAll();
     }
 
@@ -79,11 +85,14 @@
       int size = 0;
       while (!queue.isEmpty() && (size < maxSize)) {
         Chunk e = this.queue.remove();
+        metrics.removedChunk.inc();
         int chunkSize = e.getData().length;
         size += chunkSize;
         dataSize -= chunkSize;
+        metrics.dataSize.set(dataSize);
         events.add(e);
       }
+      metrics.queueSize.set(queue.size());
       this.notifyAll();
     }
 

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/AgentActivityMBean.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/AgentActivityMBean.java?rev=760991&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/AgentActivityMBean.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/AgentActivityMBean.java Wed Apr  1 18:14:42 2009
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.agent.metrics;
+
+import javax.management.ObjectName;
+
+import org.apache.hadoop.metrics.util.MBeanUtil;
+import org.apache.hadoop.metrics.util.MetricsDynamicMBeanBase;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+
+public class AgentActivityMBean extends MetricsDynamicMBeanBase {
+  final private ObjectName mbeanName;
+
+  public AgentActivityMBean(final MetricsRegistry mr, final String serviceName) {
+    super(mr, "Agent layer statistics");
+    mbeanName = MBeanUtil.registerMBean(serviceName,
+          "AgentActivity", this);
+  }
+
+
+  public void shutdown() {
+    if (mbeanName != null)
+      MBeanUtil.unregisterMBean(mbeanName);
+  }
+
+
+}

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/AgentMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/AgentMetrics.java?rev=760991&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/AgentMetrics.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/AgentMetrics.java Wed Apr  1 18:14:42 2009
@@ -0,0 +1,57 @@
+package org.apache.hadoop.chukwa.datacollection.agent.metrics;
+
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.util.MetricsBase;
+import org.apache.hadoop.metrics.util.MetricsIntValue;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
+
+public class AgentMetrics implements Updater {
+  public static final AgentMetrics agentMetrics = new AgentMetrics("ChukwaAgent", "chukwaAgent");;
+  
+  public MetricsRegistry registry = new MetricsRegistry();
+  private MetricsRecord metricsRecord;
+  private AgentActivityMBean agentActivityMBean;
+
+  public MetricsIntValue adaptorCount =
+    new MetricsIntValue("adaptorCount", registry,"number of new adaptor");
+
+  public MetricsTimeVaryingInt addedAdaptor =
+    new MetricsTimeVaryingInt("addedAdaptor", registry,"number of added adaptor");
+  
+  public MetricsTimeVaryingInt removedAdaptor =
+    new MetricsTimeVaryingInt("removedAdaptor", registry,"number of removed adaptor");
+  
+  /** Creates a new instance of AgentMetrics */
+  public AgentMetrics(String processName, String recordName) {
+      MetricsContext context = MetricsUtil.getContext(recordName);
+      metricsRecord = MetricsUtil.createRecord(context, recordName);
+      metricsRecord.setTag("process", processName);
+      agentActivityMBean = new AgentActivityMBean(registry, recordName);
+      context.registerUpdater(this);
+      
+  }
+
+
+  /**
+   * Since this object is a registered updater, this method will be called
+   * periodically, e.g. every 5 seconds.
+   */
+  public void doUpdates(MetricsContext unused) {
+    synchronized (this) {
+      for (MetricsBase m : registry.getMetricsList()) {
+        m.pushMetric(metricsRecord);
+      }
+    }
+    metricsRecord.update();
+  }
+
+  public void shutdown() {
+    if (agentActivityMBean != null)
+      agentActivityMBean.shutdown();
+  }
+
+}

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/ChunkQueueActivityMBean.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/ChunkQueueActivityMBean.java?rev=760991&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/ChunkQueueActivityMBean.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/ChunkQueueActivityMBean.java Wed Apr  1 18:14:42 2009
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.agent.metrics;
+
+import javax.management.ObjectName;
+
+import org.apache.hadoop.metrics.util.MBeanUtil;
+import org.apache.hadoop.metrics.util.MetricsDynamicMBeanBase;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+
+public class ChunkQueueActivityMBean extends MetricsDynamicMBeanBase {
+  final private ObjectName mbeanName;
+
+  public ChunkQueueActivityMBean(final MetricsRegistry mr, final String serviceName) {
+    super(mr, "ChunkQueue layer statistics");
+    mbeanName = MBeanUtil.registerMBean(serviceName,
+          "QueueActivity", this);
+  }
+
+
+  public void shutdown() {
+    if (mbeanName != null)
+      MBeanUtil.unregisterMBean(mbeanName);
+  }
+
+
+}

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/ChunkQueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/ChunkQueueMetrics.java?rev=760991&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/ChunkQueueMetrics.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/metrics/ChunkQueueMetrics.java Wed Apr  1 18:14:42 2009
@@ -0,0 +1,64 @@
+package org.apache.hadoop.chukwa.datacollection.agent.metrics;
+
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.util.MetricsBase;
+import org.apache.hadoop.metrics.util.MetricsIntValue;
+import org.apache.hadoop.metrics.util.MetricsLongValue;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
+
+public class ChunkQueueMetrics implements Updater {
+
+  public MetricsRegistry registry = new MetricsRegistry();
+  private MetricsRecord metricsRecord;
+  private ChunkQueueActivityMBean mbean;
+
+
+  public MetricsIntValue queueSize =
+    new MetricsIntValue("queueSize", registry,"Queue size");
+  
+  public MetricsLongValue dataSize =
+    new MetricsLongValue("dataSize", registry,"Data size");
+  
+  public MetricsTimeVaryingInt addedChunk =
+    new MetricsTimeVaryingInt("addedChunk", registry,"number of added chunk");
+  
+  public MetricsTimeVaryingInt removedChunk =
+    new MetricsTimeVaryingInt("removedChunk", registry,"number of removed chunk");
+  
+  public MetricsIntValue fullQueue =
+    new MetricsIntValue("fullQueue", registry,"Queue is full");
+  
+  
+  /** Creates a new instance of QueueMetrics */
+  public ChunkQueueMetrics(String processName, String recordName) {
+      MetricsContext context = MetricsUtil.getContext(recordName);
+      metricsRecord = MetricsUtil.createRecord(context, recordName);
+      mbean = new ChunkQueueActivityMBean(registry, recordName);
+      context.registerUpdater(this);
+      
+  }
+
+
+  /**
+   * Since this object is a registered updater, this method will be called
+   * periodically, e.g. every 5 seconds.
+   */
+  public void doUpdates(MetricsContext unused) {
+    synchronized (this) {
+      for (MetricsBase m : registry.getMetricsList()) {
+        m.pushMetric(metricsRecord);
+      }
+    }
+    metricsRecord.update();
+  }
+
+  public void shutdown() {
+    if (mbean != null)
+      mbean.shutdown();
+  }
+
+}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java?rev=760991&r1=760990&r2=760991&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java Wed Apr  1 18:14:42 2009
@@ -40,6 +40,7 @@
 import org.apache.commons.httpclient.params.HttpMethodParams;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+import org.apache.hadoop.chukwa.datacollection.sender.metrics.HttpSenderMetrics;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.log4j.Logger;
@@ -61,6 +62,8 @@
   final int WAIT_FOR_COLLECTOR_REBOOT;
   // FIXME: this should really correspond to the timer in RetryListOfCollectors
 
+  static final HttpSenderMetrics metrics = new HttpSenderMetrics("ChukwaAgent", "chukwaHttpSender");
+  
   static Logger log = Logger.getLogger(ChukwaHttpSender.class);
   static HttpClient client = null;
   static MultiThreadedHttpConnectionManager connectionManager = null;
@@ -201,10 +204,13 @@
         return commitResults;
       } catch (Throwable e) {
         log.error("Http post exception", e);
+        ChukwaHttpSender.metrics.httpThrowable.inc();
         log
             .info("Checking list of collectors to see if another collector has been specified for rollover");
         if (collectors.hasNext()) {
+          ChukwaHttpSender.metrics.collectorRollover.inc();
           currCollector = collectors.next();
+
           log
               .info("Found a new collector to roll over to, retrying HTTP Post to collector "
                   + currCollector);
@@ -255,11 +261,18 @@
     log.info(">>>>>> HTTP post to " + dest + " length = "
         + data.getContentLength());
     // Send POST request
-
+    ChukwaHttpSender.metrics.httpPost.inc();
+    
     // client.setTimeout(15*1000);
     int statusCode = client.executeMethod(method);
 
     if (statusCode != HttpStatus.SC_OK) {
+      ChukwaHttpSender.metrics.httpException.inc();
+      
+      if (statusCode == HttpStatus.SC_REQUEST_TIMEOUT ) {
+        ChukwaHttpSender.metrics.httpTimeOutException.inc();
+      }
+      
       log.error(">>>>>> HTTP post response statusCode: " + statusCode
           + ", statusLine: " + method.getStatusLine());
       // do something aggressive here
@@ -284,12 +297,4 @@
       }
     }
   }
-
-  public static void main(String[] argv) throws InterruptedException {
-    // HttpConnectorClient cc = new HttpConnectorClient();
-    // do something smarter than to hide record headaches, like force them to
-    // create and add records to a chunk
-    // cc.addChunk("test-source", "test-streamName", "test-application",
-    // "test-dataType", new byte[]{1,2,3,4,5});
-  }
 }

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/metrics/HttpSenderActivityMBean.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/metrics/HttpSenderActivityMBean.java?rev=760991&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/metrics/HttpSenderActivityMBean.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/metrics/HttpSenderActivityMBean.java Wed Apr  1 18:14:42 2009
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.sender.metrics;
+
+import javax.management.ObjectName;
+
+import org.apache.hadoop.metrics.util.MBeanUtil;
+import org.apache.hadoop.metrics.util.MetricsDynamicMBeanBase;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+
+public class HttpSenderActivityMBean extends MetricsDynamicMBeanBase {
+  final private ObjectName mbeanName;
+
+
+
+  public HttpSenderActivityMBean(final MetricsRegistry mr, final String serviceName) {
+    super(mr, "Http Sender layer statistics");
+    mbeanName = MBeanUtil.registerMBean(serviceName,
+          "HttpSenderActivity", this);
+  }
+
+
+  public void shutdown() {
+    if (mbeanName != null)
+      MBeanUtil.unregisterMBean(mbeanName);
+  }
+
+
+}

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/metrics/HttpSenderMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/metrics/HttpSenderMetrics.java?rev=760991&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/metrics/HttpSenderMetrics.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/metrics/HttpSenderMetrics.java Wed Apr  1 18:14:42 2009
@@ -0,0 +1,61 @@
+package org.apache.hadoop.chukwa.datacollection.sender.metrics;
+
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.util.MetricsBase;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
+
+public class HttpSenderMetrics implements Updater {
+
+  public MetricsRegistry registry = new MetricsRegistry();
+  private MetricsRecord metricsRecord;
+  private HttpSenderActivityMBean mbean;
+  
+  
+  public MetricsTimeVaryingInt collectorRollover =
+    new MetricsTimeVaryingInt("collectorRollover", registry,"number of collector rollovert");
+  
+  public MetricsTimeVaryingInt httpPost =
+    new MetricsTimeVaryingInt("httpPost", registry,"number of HTTP post");
+  
+  public MetricsTimeVaryingInt httpException =
+    new MetricsTimeVaryingInt("httpException", registry,"number of HTTP Exception");
+
+  public MetricsTimeVaryingInt httpThrowable =
+    new MetricsTimeVaryingInt("httpThrowable", registry,"number of HTTP Throwable exception");
+  
+  public MetricsTimeVaryingInt httpTimeOutException =
+    new MetricsTimeVaryingInt("httpTimeOutException", registry,"number of HTTP TimeOutException");
+  
+  /** Creates a new instance of HttpSenderMetrics */
+  public HttpSenderMetrics(String processName, String recordName) {
+      MetricsContext context = MetricsUtil.getContext(recordName);
+      metricsRecord = MetricsUtil.createRecord(context, recordName);
+      metricsRecord.setTag("process", processName);
+      mbean = new HttpSenderActivityMBean(registry, recordName);
+      context.registerUpdater(this);
+  }
+
+
+  /**
+   * Since this object is a registered updater, this method will be called
+   * periodically, e.g. every 5 seconds.
+   */
+  public void doUpdates(MetricsContext unused) {
+    synchronized (this) {
+      for (MetricsBase m : registry.getMetricsList()) {
+        m.pushMetric(metricsRecord);
+      }
+    }
+    metricsRecord.update();
+  }
+
+  public void shutdown() {
+    if (mbean != null)
+      mbean.shutdown();
+  }
+
+}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java?rev=760991&r1=760990&r2=760991&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java Wed Apr  1 18:14:42 2009
@@ -18,44 +18,42 @@
 package org.apache.hadoop.chukwa.inputtools.log4j;
 
 
-import java.io.*;
-import java.util.Enumeration;
-import java.util.logging.LogManager;
-import java.util.Properties;
-import org.apache.hadoop.mapred.TaskLogAppender;
+import java.io.File;
+import java.io.IOException;
+
 import org.apache.hadoop.metrics.ContextFactory;
 import org.apache.hadoop.metrics.MetricsException;
 import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
 import org.apache.hadoop.metrics.spi.OutputRecord;
-import org.apache.log4j.Appender;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 import org.json.JSONException;
 import org.json.JSONObject;
 
 public class Log4JMetricsContext extends AbstractMetricsContext {
-
-  Logger out = null; // Logger.getLogger(Log4JMetricsContext.class);
+  Logger log = Logger.getLogger(Log4JMetricsContext.class);
+  Logger out = null; 
   static final Object lock = new Object();
 
   /* Configuration attribute names */
-  // protected static final String FILE_NAME_PROPERTY = "fileName";
+  protected static final String  OUTPUT_DIR_PROPERTY = "directory";
   protected static final String PERIOD_PROPERTY = "period";
-  private static final String metricsLogDir = System
-      .getProperty("hadoop.log.dir");
-  private static final String user = System.getProperty("user.name");
+  protected static final String ADD_UUID_PROPERTY = "uuid";
+  
 
+  protected static final String user = System.getProperty("user.name");
+  
+  protected String outputDir = null;
+  protected int period = 0;
+  protected boolean needUUID = false;
+  
   /** Creates a new instance of FileContext */
   public Log4JMetricsContext() {
   }
 
   public void init(String contextName, ContextFactory factory) {
     super.init(contextName, factory);
-    /*
-     * String fileName = getAttribute(FILE_NAME_PROPERTY); if (fileName != null)
-     * { file = new File(fileName); }
-     */
-
+   
     String periodStr = getAttribute(PERIOD_PROPERTY);
     if (periodStr != null) {
       int period = 0;
@@ -67,6 +65,25 @@
         throw new MetricsException("Invalid period: " + periodStr);
       }
       setPeriod(period);
+      this.period = period;
+      log.info("Log4JMetricsContext." + contextName + ".period=" + period);
+    }
+    
+    outputDir = getAttribute(OUTPUT_DIR_PROPERTY);
+    if (outputDir == null) {
+      log.warn("Log4JMetricsContext." + contextName + "."+ OUTPUT_DIR_PROPERTY + " is null");
+      throw new MetricsException("Invalid output directory: " + outputDir);
+    }
+    File fOutputDir = new File(outputDir);
+    if (!fOutputDir.exists()) {
+      fOutputDir.mkdirs();
+    }
+    log.info("Log4JMetricsContext." + contextName + "." + OUTPUT_DIR_PROPERTY +"=" + outputDir);
+    
+    String uuid = getAttribute(ADD_UUID_PROPERTY);
+    if (uuid != null && uuid.equalsIgnoreCase("true")) {
+      needUUID = true;
+      log.info("Log4JMetricsContext." + contextName + "." + ADD_UUID_PROPERTY +" has been activated."); 
     }
   }
 
@@ -76,56 +93,42 @@
     if (out == null) {
       synchronized (lock) {
         if (out == null) {
-          String logName = null;
-          java.util.Properties properties = new java.util.Properties();
-          properties.load(this.getClass().getClassLoader().getResourceAsStream(
-              "chukwa-hadoop-metrics-log4j.properties"));
-          Logger logger = Logger.getLogger(Log4JMetricsContext.class);
-          logger.setAdditivity(false);
-          PatternLayout layout = new PatternLayout(properties
-              .getProperty("log4j.appender.chukwa." + contextName
-                  + ".layout.ConversionPattern"));
+          PatternLayout layout = new PatternLayout("%d{ISO8601} %p %c: %m%n");
+          
           org.apache.hadoop.chukwa.inputtools.log4j.ChukwaDailyRollingFileAppender appender = new org.apache.hadoop.chukwa.inputtools.log4j.ChukwaDailyRollingFileAppender();
-          appender.setName("chukwa." + contextName);
+          appender.setName("chukwa.metrics." + contextName);
           appender.setLayout(layout);
           appender.setAppend(true);
-          if (properties.getProperty("log4j.appender.chukwa." + contextName
-              + ".Dir") != null) {
-            logName = properties.getProperty("log4j.appender.chukwa."
-                + contextName + ".Dir")
-                + File.separator
-                + "chukwa-"
-                + user
-                + "-"
-                + contextName
-                + "-"
-                + System.currentTimeMillis() + ".log";
-
-            appender.setFile(logName);
+          if (needUUID) {
+            appender.setFile(outputDir + File.separator + "chukwa-" + user
+                + "-" + contextName + "-" + System.currentTimeMillis()
+                + ".log");
           } else {
-            logName = metricsLogDir+File.separator+"chukwa-"+user+"-"
-                +contextName + "-" + System.currentTimeMillis()+ ".log";
-            appender.setFile(logName);
+            appender.setFile(outputDir + File.separator + "chukwa-" + user
+                + "-" + contextName 
+                + ".log");
           }
-          appender.activateOptions();
-          appender.setRecordType(properties
-              .getProperty("log4j.appender.chukwa." + contextName
-                  + ".recordType"));
-          appender.setChukwaClientHostname(properties
-              .getProperty("log4j.appender.chukwa." + contextName
-                  + ".chukwaClientHostname"));
-          appender.setChukwaClientPortNum(Integer.parseInt(properties
-              .getProperty("log4j.appender.chukwa." + contextName
-                  + ".chukwaClientPortNum")));
-          appender.setDatePattern(properties
-              .getProperty("log4j.appender.chukwa." + contextName
-                  + ".DatePattern"));
+
+          try {
+            File fooLogFile = new File(appender.getFile());
+            if (!fooLogFile.exists()) {
+              fooLogFile.createNewFile();
+            }
+            fooLogFile.setReadable(true, false);
+            fooLogFile.setWritable(true, false);
+            fooLogFile = null;
+          }catch (Exception e) {
+            log.warn("Exception while trying to set file permission," , e);
+          }
+          
+          appender.setRecordType( contextName);
+          appender.setDatePattern(".yyyy-MM-dd");
+          
+          Logger logger = Logger.getLogger("chukwa.metrics." + contextName);
+          logger.setAdditivity(false);
           logger.addAppender(appender);
+          appender.activateOptions();
           out = logger;
-          // FIXME: Hack to make the log file readable by chukwa user.
-          if (System.getProperty("os.name").intern() == "Linux".intern()) {
-            Runtime.getRuntime().exec("chmod 666 " + logName);
-          }
         }
       }
     }
@@ -135,6 +138,7 @@
       json.put("contextName", contextName);
       json.put("recordName", recordName);
       json.put("chukwa_timestamp", System.currentTimeMillis());
+      json.put("period", period);
       for (String tagName : outRec.getTagNames()) {
         json.put(tagName, outRec.getTag(tagName));
       }
@@ -142,8 +146,7 @@
         json.put(metricName, outRec.getMetric(metricName));
       }
     } catch (JSONException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      log.warn("exception in Log4jMetricsContext:" , e);
     }
     out.info(json.toString());
   }

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java?rev=760991&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java Wed Apr  1 18:14:42 2009
@@ -0,0 +1,439 @@
+/*
+ * AbstractMetricsContext.java
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics.spi;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.metrics.ContextFactory;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsException;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.Updater;
+
+/**
+ * The main class of the Service Provider Interface.  This class should be
+ * extended in order to integrate the Metrics API with a specific metrics
+ * client library. <p/>
+ *
+ * This class implements the internal table of metric data, and the timer
+ * on which data is to be sent to the metrics system.  Subclasses must
+ * override the abstract <code>emitRecord</code> method in order to transmit
+ * the data. <p/>
+ */
+public abstract class AbstractMetricsContext implements MetricsContext {
+    
+  private int period = MetricsContext.DEFAULT_PERIOD;
+  private Timer timer = null;
+  private long lastRun = 0L;    
+  private Set<Updater> updaters = new HashSet<Updater>(1);
+  private volatile boolean isMonitoring = false;
+    
+  private ContextFactory factory = null;
+  private String contextName = null;
+    
+  static class TagMap extends TreeMap<String,Object> {
+    private static final long serialVersionUID = 3546309335061952993L;
+    TagMap() {
+      super();
+    }
+    TagMap(TagMap orig) {
+      super(orig);
+    }
+    /**
+     * Returns true if this tagmap contains every tag in other.
+     */
+    public boolean containsAll(TagMap other) {
+      for (Map.Entry<String,Object> entry : other.entrySet()) {
+        Object value = get(entry.getKey());
+        if (value == null || !value.equals(entry.getValue())) {
+          // either key does not exist here, or the value is different
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+  
+  static class MetricMap extends TreeMap<String,Number> {
+    private static final long serialVersionUID = -7495051861141631609L;
+  }
+            
+  static class RecordMap extends HashMap<TagMap,MetricMap> {
+    private static final long serialVersionUID = 259835619700264611L;
+  }
+    
+  private Map<String,RecordMap> bufferedData = new HashMap<String,RecordMap>();
+    
+
+  /**
+   * Creates a new instance of AbstractMetricsContext
+   */
+  protected AbstractMetricsContext() {
+  }
+    
+  /**
+   * Initializes the context.
+   */
+  public void init(String contextName, ContextFactory factory) 
+  {
+    this.contextName = contextName;
+    this.factory = factory;
+  }
+    
+  /**
+   * Convenience method for subclasses to access factory attributes.
+   */
+  protected String getAttribute(String attributeName) {
+    String factoryAttribute = contextName + "." + attributeName;
+    return (String) factory.getAttribute(factoryAttribute);  
+  }
+    
+  /**
+   * Returns an attribute-value map derived from the factory attributes
+   * by finding all factory attributes that begin with 
+   * <i>contextName</i>.<i>tableName</i>.  The returned map consists of
+   * those attributes with the contextName and tableName stripped off.
+   */
+  protected Map<String,String> getAttributeTable(String tableName) {
+    String prefix = contextName + "." + tableName + ".";
+    Map<String,String> result = new HashMap<String,String>();
+    for (String attributeName : factory.getAttributeNames()) {
+      if (attributeName.startsWith(prefix)) {
+        String name = attributeName.substring(prefix.length());
+        String value = (String) factory.getAttribute(attributeName);
+        result.put(name, value);
+      }
+    }
+    return result;
+  }
+    
+  /**
+   * Returns the context name.
+   */
+  public String getContextName() {
+    return contextName;
+  }
+    
+  /**
+   * Returns the factory by which this context was created.
+   */
+  public ContextFactory getContextFactory() {
+    return factory;
+  }
+    
+  /**
+   * Starts or restarts monitoring, the emitting of metrics records.
+   */
+  public synchronized void startMonitoring()
+    throws IOException {
+    if (!isMonitoring) {
+      startTimer();
+      isMonitoring = true;
+    }
+  }
+    
+  /**
+   * Stops monitoring.  This does not free buffered data. 
+   * @see #close()
+   */
+  public synchronized void stopMonitoring() {
+    if (isMonitoring) {
+      stopTimer();
+      isMonitoring = false;
+    }
+  }
+    
+  /**
+   * Returns true if monitoring is currently in progress.
+   */
+  public boolean isMonitoring() {
+    return isMonitoring;
+  }
+    
+  /**
+   * Stops monitoring and frees buffered data, returning this
+   * object to its initial state.  
+   */
+  public synchronized void close() {
+    stopMonitoring();
+    clearUpdaters();
+  } 
+    
+  /**
+   * Creates a new AbstractMetricsRecord instance with the given <code>recordName</code>.
+   * Throws an exception if the metrics implementation is configured with a fixed
+   * set of record names and <code>recordName</code> is not in that set.
+   * 
+   * @param recordName the name of the record
+   * @throws MetricsException if recordName conflicts with configuration data
+   */
+  public final synchronized MetricsRecord createRecord(String recordName) {
+    if (bufferedData.get(recordName) == null) {
+      bufferedData.put(recordName, new RecordMap());
+    }
+    return newRecord(recordName);
+  }
+    
+  /**
+   * Subclasses should override this if they subclass MetricsRecordImpl.
+   * @param recordName the name of the record
+   * @return newly created instance of MetricsRecordImpl or subclass
+   */
+  protected MetricsRecord newRecord(String recordName) {
+    return new MetricsRecordImpl(recordName, this);
+  }
+    
+  /**
+   * Registers a callback to be called at time intervals determined by
+   * the configuration.
+   *
+   * @param updater object to be run periodically; it should update
+   * some metrics records 
+   */
+  public synchronized void registerUpdater(final Updater updater) {
+    if (!updaters.contains(updater)) {
+      updaters.add(updater);
+    }
+  }
+    
+  /**
+   * Removes a callback, if it exists.
+   *
+   * @param updater object to be removed from the callback list
+   */
+  public synchronized void unregisterUpdater(Updater updater) {
+    updaters.remove(updater);
+  }
+    
+  private synchronized void clearUpdaters() {
+    updaters.clear();
+  }
+    
+  /**
+   * Starts timer if it is not already started
+   */
+  private synchronized void startTimer() {
+    if (timer == null) {
+      timer = new Timer("Timer thread for monitoring " + getContextName(), 
+                        true);
+      TimerTask task = new TimerTask() {
+          public void run() {
+            try {
+              timerEvent();
+            }
+            catch (IOException ioe) {
+              ioe.printStackTrace();
+            }
+          }
+        };
+      long millis = period * 1000;
+      timer.scheduleAtFixedRate(task, millis, millis);
+    }
+  }
+    
+  /**
+   * Stops timer if it is running
+   */
+  private synchronized void stopTimer() {
+    if (timer != null) {
+      timer.cancel();
+      timer = null;
+    }
+  }
+    
+  /**
+   * Timer callback.
+   */
+  private void timerEvent() throws IOException {
+    if (isMonitoring) {
+      Collection<Updater> myUpdaters;
+      synchronized (this) {
+        myUpdaters = new ArrayList<Updater>(updaters);
+      }     
+      // Run all the registered updates without holding a lock
+      // on this context
+      for (Updater updater : myUpdaters) {
+        try {
+          updater.doUpdates(this);
+        }
+        catch (Throwable throwable) {
+          throwable.printStackTrace();
+        }
+      }
+      emitRecords();
+    }
+  }
+    
+  /**
+   *  Emits the records.
+   */
+  private synchronized void emitRecords() throws IOException {
+    for (String recordName : bufferedData.keySet()) {
+      RecordMap recordMap = bufferedData.get(recordName);
+      synchronized (recordMap) {
+        Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet ();
+        for (Entry<TagMap, MetricMap> entry : entrySet) {
+          OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
+          emitRecord(contextName, recordName, outRec);
+        }
+      }
+    }
+    flush();
+  }
+
+  /**
+   * Sends a record to the metrics system.
+   */
+  protected abstract void emitRecord(String contextName, String recordName, 
+                                     OutputRecord outRec) throws IOException;
+    
+  /**
+   * Called each period after all records have been emitted, this method does nothing.
+   * Subclasses may override it in order to perform some kind of flush.
+   */
+  protected void flush() throws IOException {
+  }
+    
+  /**
+   * Called by MetricsRecordImpl.update().  Creates or updates a row in
+   * the internal table of metric data.
+   */
+  protected void update(MetricsRecordImpl record) {
+    long now = System.currentTimeMillis();
+    
+    String recordName = record.getRecordName();
+    TagMap tagTable = record.getTagTable();
+    Map<String,MetricValue> metricUpdates = record.getMetricTable();
+        
+    RecordMap recordMap = getRecordMap(recordName);
+    synchronized (recordMap) {
+      MetricMap metricMap = recordMap.get(tagTable);
+      if (metricMap == null) {
+        metricMap = new MetricMap();
+        TagMap tagMap = new TagMap(tagTable); // clone tags
+        recordMap.put(tagMap, metricMap);
+      }
+
+      Set<Entry<String, MetricValue>> entrySet = metricUpdates.entrySet();
+      for (Entry<String, MetricValue> entry : entrySet) {
+        String metricName = entry.getKey ();
+        MetricValue updateValue = entry.getValue ();
+        Number updateNumber = updateValue.getNumber();
+        Number currentNumber = metricMap.get(metricName);
+        if (currentNumber == null || updateValue.isAbsolute()) {
+          metricMap.put(metricName, updateNumber);
+        }
+        else {
+          Number newNumber = sum(updateNumber, currentNumber);
+          metricMap.put(metricName, newNumber);
+          if (lastRun != 0) {
+            long duration = now -lastRun;
+            if (duration != 0) {
+              double rate = newNumber.doubleValue() * 60.0 / duration;
+              metricMap.put(metricName+"_rate", rate);
+            } else {
+              metricMap.put(metricName+"_rate", 0.0);
+            }
+          }
+          lastRun = now;
+        }
+      }
+    }
+  }
+    
+  private synchronized RecordMap getRecordMap(String recordName) {
+    return bufferedData.get(recordName);
+  }
+    
+  /**
+   * Adds two numbers, coercing the second to the type of the first.
+   *
+   */
+  private Number sum(Number a, Number b) {
+    if (a instanceof Integer) {
+      return Integer.valueOf(a.intValue() + b.intValue());
+    }
+    else if (a instanceof Float) {
+      return new Float(a.floatValue() + b.floatValue());
+    }
+    else if (a instanceof Short) {
+      return Short.valueOf((short)(a.shortValue() + b.shortValue()));
+    }
+    else if (a instanceof Byte) {
+      return Byte.valueOf((byte)(a.byteValue() + b.byteValue()));
+    }
+    else if (a instanceof Long) {
+      return Long.valueOf((a.longValue() + b.longValue()));
+    }
+    else {
+      // should never happen
+      throw new MetricsException("Invalid number type");
+    }
+            
+  }
+    
+  /**
+   * Called by MetricsRecordImpl.remove().  Removes all matching rows in
+   * the internal table of metric data.  A row matches if it has the same
+   * tag names and values as record, but it may also have additional
+   * tags.
+   */    
+  protected void remove(MetricsRecordImpl record) {
+    String recordName = record.getRecordName();
+    TagMap tagTable = record.getTagTable();
+        
+    RecordMap recordMap = getRecordMap(recordName);
+    synchronized (recordMap) {
+      Iterator<TagMap> it = recordMap.keySet().iterator();
+      while (it.hasNext()) {
+        TagMap rowTags = it.next();
+        if (rowTags.containsAll(tagTable)) {
+          it.remove();
+        }
+      }
+    }
+  }
+    
+  /**
+   * Returns the timer period.
+   */
+  public int getPeriod() {
+    return period;
+  }
+    
+  /**
+   * Sets the timer period
+   */
+  protected void setPeriod(int period) {
+    this.period = period;
+  }
+}



Mime
View raw message