chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r794981 - in /hadoop/chukwa/trunk: ./ contrib/xtrace/ contrib/xtrace/src/ contrib/xtrace/src/java/ contrib/xtrace/src/java/edu/ contrib/xtrace/src/java/edu/berkeley/ contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/ contrib/xtrace/test/ c...
Date Fri, 17 Jul 2009 06:28:29 GMT
Author: asrabkin
Date: Fri Jul 17 06:28:28 2009
New Revision: 794981

URL: http://svn.apache.org/viewvc?rev=794981&view=rev
Log:
CHUKWA-352. Xtrace in contrib.

Added:
    hadoop/chukwa/trunk/contrib/xtrace/
    hadoop/chukwa/trunk/contrib/xtrace/build.xml
    hadoop/chukwa/trunk/contrib/xtrace/src/
    hadoop/chukwa/trunk/contrib/xtrace/src/java/
    hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/
    hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/
    hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/
    hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/XtrAdaptor.java
    hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/XtrExtract.java
    hadoop/chukwa/trunk/contrib/xtrace/test/
    hadoop/chukwa/trunk/contrib/xtrace/test/src/
    hadoop/chukwa/trunk/contrib/xtrace/test/src/java/
    hadoop/chukwa/trunk/contrib/xtrace/test/src/java/edu/
    hadoop/chukwa/trunk/contrib/xtrace/test/src/java/edu/berkeley/
    hadoop/chukwa/trunk/contrib/xtrace/test/src/java/edu/berkeley/chukwa_xtrace/
    hadoop/chukwa/trunk/contrib/xtrace/test/src/java/edu/berkeley/chukwa_xtrace/TestXtrAdaptor.java
    hadoop/chukwa/trunk/contrib/xtrace/test/src/java/edu/berkeley/chukwa_xtrace/TestXtrExtract.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=794981&r1=794980&r2=794981&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Fri Jul 17 06:28:28 2009
@@ -4,6 +4,8 @@
 
   NEW FEATURES
 
+    CHUKWA-352. Xtrace in contrib. (asrabkin)
+
     CHUKWA-346. Simplified sink archiver. (asrabkin)
 
     CHUKWA-343. Static HDFS Heatmap visualization.   (Jiaqi Tan via asrabkin)

Added: hadoop/chukwa/trunk/contrib/xtrace/build.xml
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/xtrace/build.xml?rev=794981&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/xtrace/build.xml (added)
+++ hadoop/chukwa/trunk/contrib/xtrace/build.xml Fri Jul 17 06:28:28 2009
@@ -0,0 +1,341 @@
+<?xml version="1.0" ?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<project name="chukwa-xtrace" default="main" xmlns:ivy="antlib:org.apache.ivy.ant">
+
+	<property name="name" value="chukwa-xtrace" />
+
+	<import file="../build-contrib.xml" optional="true" />
+	<available file="../build-contrib.xml" property="present" />
+
+    <property name="src.dir" value="${basedir}/src" />
+    <property name="build.dir" value="${basedir}/build" />
+    <property name="build.classes" value="${build.dir}/classes/" />
+    <property name="conf.dir" value="${basedir}/conf" />
+    <property name="docs.src" value="${basedir}/src/docs" />
+    <property name="docs.dir" value="${basedir}/docs" />
+    <property name="changes.src" value="${docs.src}/changes" />
+    <property name="tools.dir" value="${basedir}/tools" />
+    <property name="opt.dir" value="${basedir}/opt" />
+    <property name="javac.debug" value="on" />
+    <property name="javac.version" value="1.6" />
+    <property name="test.src.dir" value="${basedir}/test/src/java/" />
+    <property name="test.lib.dir" value="${basedir}/src/test/lib" />
+    <property name="test.build.dir" value="${build.dir}/test" />
+    <property name="test.generated.dir" value="${test.build.dir}/src" />
+    <property name="test.build.data" value="${test.build.dir}/data" />
+    <property name="test.cache.data" value="${test.build.dir}/cache" />
+    <property name="test.debug.data" value="${test.build.dir}/debug" />
+    <property name="test.log.dir" value="${test.build.dir}/logs" />
+    <property name="test.build.classes" value="${test.build.dir}/classes/" />
+    <property name="test.build.testjar" value="${test.build.dir}/testjar" />
+    <property name="test.build.javadoc" value="${test.build.dir}/docs/api" />
+    <property name="test.build.javadoc.dev" value="${test.build.dir}/docs/dev-api" />
+    <property name="test.include" value="Test*" />
+    <property name="test.classpath.id" value="test.classpath" />
+    <property name="test.output" value="no" />
+    <property name="test.timeout" value="900000" />
+    <property name="test.junit.output.format" value="plain" />
+    <property name="test.junit.fork.mode" value="perTest" />
+    <property name="test.junit.printsummary" value="yes" />
+    <property name="test.junit.haltonfailure" value="yes" />
+    <property name="test.junit.maxmemory" value="256m" />
+    <property name="javadoc.link.java" value="http://java.sun.com/javase/6/docs/api/"
/>
+    <property name="javadoc.packages" value="org.apache.hadoop.*" />
+    <property name="javadoc.maxmemory" value="512m" />
+
+
+	
+	<target name="ivy-init-properties-local" description="to initiate ivy properties">
+		<property name="ivy.dir" location="ivy" />
+		<property name="ivysettings.xml" location="${ivy.dir}/ivysettings.xml" />
+		<loadproperties srcfile="${ivy.dir}/libraries.properties" />
+		<loadproperties srcfile="${ivy.dir}/libraries.properties" />
+		<property name="ivy.jar" location="${ivy.dir}/ivy-${ivy.version}.jar" />
+		<property name="ivy_repo_url" value="http://repo2.maven.org/maven2/org/apache/ivy/ivy/${ivy.version}/ivy-${ivy.version}.jar"
/>
+		<property name="build.dir" location="build" />
+		<property name="build.ivy.dir" location="${build.dir}/ivy" />
+		<property name="build.ivy.lib.dir" location="${build.ivy.dir}/lib" />
+		<property name="build.ivy.report.dir" location="${build.ivy.dir}/report" />
+		<property name="common.ivy.lib.dir" location="${build.ivy.lib.dir}/${ant.project.name}/common"
/>
+		<property name="rat.reporting.classname" value="rat.Report" />
+		<property name="jdiff.build.dir" value="${build.docs}/jdiff" />
+		<property name="jdiff.xml.dir" value="${build.ivy.lib.dir}/${name}/jdiff" />
+		<property name="jdiff.stable" value="0.1.2" />
+		<property name="jdiff.stable.javadoc" value="http://hadoop.apache.org/core/docs/r${jdiff.stable}/api/"
/>
+
+		<!--this is the naming policy for artifacts we want pulled down-->
+		<property name="ivy.artifact.retrieve.pattern" value="${ant.project.name}/[conf]/[artifact]-[revision].[ext]"
/>
+		<property name="jdiff.home" value="${build.ivy.lib.dir}/${ant.project.name}/jdiff" />
+		<property name="jdiff.jar" value="${jdiff.home}/jdiff-${jdiff.version}.jar" />
+		<property name="xerces.jar" value="${jdiff.home}/xerces-${xerces.version}.jar" />
+
+		<property name="clover.jar" location="${clover.home}/lib/clover.jar" />
+		<available property="clover.present" file="${clover.jar}" />
+
+	</target>
+
+	<target name="ivy-download-local" description="To download ivy" unless="offline">
+		<get src="${ivy_repo_url}" dest="${ivy.jar}" usetimestamp="true" />
+	</target>
+
+	<target name="ivy-init-dirs-local" depends="ivy-init-properties-local">
+		<mkdir dir="${build.ivy.dir}" />
+		<mkdir dir="${build.ivy.lib.dir}" />
+		<mkdir dir="${build.ivy.report.dir}" />
+	</target>
+
+	<target name="ivy-probe-antlib-local">
+		<condition property="ivy.found.local">
+			<typefound uri="antlib:org.apache.ivy.ant" name="cleancache" />
+		</condition>
+	</target>
+
+	<target name="ivy-init-antlib-local" depends="ivy-init-dirs-local,ivy-download-local,ivy-probe-antlib-local"
unless="ivy.found.local">
+		<typedef uri="antlib:org.apache.ivy.ant" onerror="fail" loaderRef="ivyLoader">
+			<classpath>
+				<pathelement location="${ivy.jar}" />
+			</classpath>
+		</typedef>
+		<fail>
+			<condition>
+				<not>
+					<typefound uri="antlib:org.apache.ivy.ant" name="cleancache" />
+				</not>
+			</condition>
+			      You need Apache Ivy 2.0 or later from http://ant.apache.org/
+			      It could not be loaded from ${ivy_repo_url}
+		 </fail>
+	</target>
+
+	<target name="ivy-init-local" depends="ivy-init-antlib-local">
+		<ivy:configure settingsid="${ant.project.name}.ivy.settings" file="${ivysettings.xml}"
override="true" />
+	</target>
+
+	<target name="ivy-retrieve" depends="ivy-resolve" description="Retrieve Ivy-managed artifacts
for the compile/test configurations">
+		<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}"
sync="true" />
+		<ivy:cachepath pathid="contrib-classpath" conf="common" />
+	</target>
+
+	<target name="ivy-resolve" depends="ivy-init-local">
+		<ivy:resolve settingsRef="${ant.project.name}.ivy.settings" />
+	</target>
+
+	<target name="ivy-resolve-javadoc" depends="ivy-init-local">
+		<ivy:resolve settingsRef="${ant.project.name}.ivy.settings" conf="javadoc" />
+	</target>
+
+	<target name="ivy-resolve-checkstyle" depends="ivy-init-local">
+		<ivy:resolve settingsRef="${ant.project.name}.ivy.settings" conf="checkstyle" />
+	</target>
+
+	<target name="ivy-retrieve-checkstyle" depends="ivy-resolve-checkstyle" description="Retrieve
Ivy-managed artifacts for the checkstyle configurations">
+		<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}"
/>
+		<ivy:cachepath pathid="checkstyle-classpath" conf="checkstyle" />
+	</target>
+
+	<target name="ivy-resolve-test" depends="ivy-init-local">
+		<ivy:resolve settingsRef="${ant.project.name}.ivy.settings" conf="test" />
+	</target>
+
+	<target name="ivy-resolve-common" depends="ivy-init-local">
+		<ivy:resolve settingsRef="${ant.project.name}.ivy.settings" conf="common" />
+	</target>
+
+	<target name="ivy-resolve-jdiff" depends="ivy-init-local">
+		<ivy:resolve settingsRef="${ant.project.name}.ivy.settings" conf="jdiff" />
+	</target>
+
+	<target name="ivy-retrieve-jdiff" depends="ivy-resolve-jdiff" description="Retrieve Ivy-managed
artifacts for the javadoc configurations">
+		<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}"
/>
+		<ivy:cachepath pathid="jdiff-classpath" conf="jdiff" />
+	</target>
+
+	<target name="ivy-retrieve-javadoc" depends="ivy-resolve-javadoc" description="Retrieve
Ivy-managed artifacts for the javadoc configurations">
+		<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}"
/>
+		<ivy:cachepath pathid="javadoc-classpath" conf="javadoc" />
+	</target>
+
+	<target name="ivy-retrieve-test" depends="ivy-resolve-test" description="Retrieve Ivy-managed
artifacts for the test configurations">
+		<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}"
/>
+		<ivy:cachepath pathid="test.classpath" conf="test" />
+	</target>
+
+	<target name="ivy-retrieve-common" depends="ivy-resolve-common" description="Retrieve
Ivy-managed artifacts for the compile configurations">
+		<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}"
/>
+		<ivy:cachepath pathid="ivy-common.classpath" conf="common" />
+	</target>
+
+	<target name="ivy-report" depends="ivy-resolve-releaseaudit" description="Generate">
+		<ivy:report todir="${build.ivy.report.dir}" settingsRef="${ant.project.name}.ivy.settings"
/>
+		<echo>
+      Reports generated:${build.ivy.report.dir}
+    </echo>
+	</target>
+
+	
+	<target name="init" depends="">
+		
+		<mkdir dir="${build.dir}" />
+		<mkdir dir="${build.classes}" />
+		<mkdir dir="${build.dir}/test" />
+
+	</target>
+
+	<target name="main" depends="init,compile,jar" description="Main target">
+		<echo>
+            Building the .jar files.
+        </echo>
+	</target>
+
+	<target name="compile" depends="compile-src,jar" description="Compile">
+	</target>
+		
+	<target name="compile-src" depends="init" description="Compile">
+			<javac srcdir="src/java" destdir="${build.classes}" debug="${javac.debug}">
+				<compilerarg value="-Xlint" />
+				<include name="edu/berkeley/chukwa_xtrace/**" />
+				<classpath refid="xtr-classpath" />
+			</javac>
+	</target>
+	
+	
+	<target name="jar" depends="compile-src" description="Create chukwa-xtrace.jar">
+			<jar jarfile="${build.dir}/chukwa-xtrace.jar" basedir="${build.classes}" includes="edu/berkeley/chukwa_xtrace/**/*.class">
+				<fileset dir="${basedir}/src/java">
+					<include name="edu/berkeley/chukwa_xtrace/*.java" />
+				</fileset>
+			</jar>
+
+		<copy file="${build.dir}/chukwa-xtrace.jar" tofile="${basedir}/chukwa-xtrace.jar" />
+		</target>
+	
+	<path id="xtr-classpath">
+		<pathelement location="${build.classes}" />
+		<pathelement location="${test.build.classes}" />
+		<pathelement location="${chukwa.root.build.classes}" />
+		<fileset dir="${basedir}/lib">
+			<include name="**/xtrace*.jar" />
+			<exclude name="**/excluded/" />
+		</fileset>
+		<fileset dir="../../build/">
+			<include name="chukwa*.jar" />
+			<exclude name="**/excluded/" />
+		</fileset>
+		<fileset dir="../../hadoopjars/">
+			<include name="hadoop*.jar" />
+			<exclude name="**/excluded/" />
+		</fileset>
+	</path>
+
+	<target name="compile-test" depends="init,compile,jar" description="Test target">
+		 <echo>*** Building Test Sources ***</echo>
+		 <echo>*** ${build.classes} ***</echo>
+        <mkdir dir="${test.log.dir}"/>
+		<mkdir dir="${test.build.dir}" />
+		<mkdir dir="${test.log.dir}" />
+		<mkdir dir="${test.build.classes}" />
+
+		<javac srcdir="test/src/java" destdir="${test.build.classes}" debug="${javac.debug}">
+			<compilerarg value="-Xlint" />
+			<include name="org/apache/hadoop/chukwa/**" />
+            <classpath>
+                <pathelement location="${build.classes}" />
+            	<pathelement location="${test.build.classes}" />
+                <path refid="xtr-classpath"/>
+            </classpath>
+		</javac>
+
+	</target>
+	
+	<target name="test" depends="compile-test" description="chukwa-xtrace tests">
+        <delete dir="${test.log.dir}"/>
+        <mkdir dir="${test.log.dir}"/>
+		 <echo>*** ${test.build.classes} *** ${chukwa.root.build.dir} ***</echo>
+	     <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no" fork="yes"

+	     	maxmemory="256m" 
+	     	dir="${basedir}" 
+	     	timeout="${test.timeout}" 
+	     	errorProperty="tests.failed" 
+	     	failureProperty="tests.failed">
+	     	<sysproperty key="chukwa.root.build.dir" value="${chukwa.root.build.dir}" />
+	     	<sysproperty key="chukwa-xtr.build.dir" value="${build.dir}" />
+            <sysproperty key="hod.server" value="" />
+            <sysproperty key="hadoop.log.dir" value="${test.log.dir}"/>
+	     	
+	     	<classpath>
+                <pathelement location="${build.classes}" />
+            	<pathelement location="${test.build.classes}" />
+                <path refid="xtr-classpath"/>
+            </classpath>
+            <formatter type="${test.junit.output.format}" />
+
+
+            <batchtest fork="yes" todir="${test.log.dir}" unless="testcase">
+                <fileset dir="${test.src.dir}">
+                	<include name="**/${test.include}.java" />
+                	<exclude name="**/${test.exclude}.java" />
+                </fileset>
+            </batchtest>
+	     	<batchtest todir="${test.log.dir}" if="testcase">
+				<fileset dir="${test.src.dir}" includes="**/${testcase}.java">
+				</fileset>
+			</batchtest>
+	        </junit>
+	        <fail if="tests.failed">Tests failed!</fail>
+		
+		
+	</target>
+	
+	<!-- ================================================================== -->
+	<!-- Clean.  Delete the build files, and their directories              -->
+	<!-- ================================================================== -->
+	<target name="clean" depends="init" description="Clean.  Delete the build files, and
their directories">
+		<delete dir="${build.dir}" />
+		<delete dir="${build.ivy.lib.dir}/${ant.project.home}" />
+		<delete dir="build" />
+		<delete file="${ivy.jar}" />
+		<delete dir="${docs.src}/build" />
+		<delete dir="${test.build.dir}" />
+		<delete dir="${test.log.dir}" />
+		<delete dir="${test.build.classes}" />
+		<delete file="${basedir}/chukwa-xtr.jar" />
+	</target>
+	
+	
+    <!-- ================================================================== -->
+	<!-- Perform audit activities for the release                           -->
+	<!-- ================================================================== -->
+	<target name="releaseaudit" depends="ivy-retrieve-releaseaudit" description="Release
Audit activities">
+		<java classname="${rat.reporting.classname}" fork="true">
+			<classpath refid="releaseaudit-classpath" />
+			<arg value="${build.dir}/${final.name}" />
+		</java>
+	</target>
+
+	<target name="ivy-retrieve-releaseaudit" depends="ivy-resolve-releaseaudit" description="Retrieve
Ivy-managed artifacts for the compile configurations">
+		<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}"
/>
+		<ivy:cachepath pathid="releaseaudit-classpath" conf="releaseaudit" />
+	</target>
+
+	<target name="ivy-resolve-releaseaudit" depends="ivy-init-local">
+		<ivy:resolve settingsRef="${ant.project.name}.ivy.settings" conf="releaseaudit" />
+	</target>
+	
+</project>

Added: hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/XtrAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/XtrAdaptor.java?rev=794981&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/XtrAdaptor.java
(added)
+++ hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/XtrAdaptor.java
Fri Jul 17 06:28:28 2009
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.berkeley.chukwa_xtrace;
+
+import org.apache.hadoop.chukwa.*;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor;
+import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
+import org.apache.log4j.Logger;
+import edu.berkeley.xtrace.server.*;
+import edu.berkeley.xtrace.XTraceException;
+import java.util.concurrent.*;
+
+/**
+ * Adaptor that wraps an xtrace report source, so that xtrace messages
+ * can get picked up by the chukwa agent.
+ * Takes one mandatory param, the class name of the report source,
+ * implicitly inside package edu.berkeley.xtrace.server
+ * 
+ */
+public class XtrAdaptor extends AbstractAdaptor implements Runnable {
+  
+  
+  ReportSource rs;
+  Thread pump = new Thread(this);
+  Thread reportSourceThread;
+  BlockingQueue<String> q = new ArrayBlockingQueue<String>(1000);
+  volatile boolean stopping = false;
+  long offset = 0;
+  static Logger log = Logger.getLogger(XtrAdaptor.class);
+  
+  static final String XTR_RS_PACKAGE = "edu.berkeley.xtrace.server.";
+  /**
+   * Get an xtrace report source, of name classname
+   * @param classname
+   * @return a report source. Defaults to UdpReportSource on error.
+   */
+  static ReportSource getXtrReportSource(String name) {
+    try { 
+    Object obj = Class.forName(XTR_RS_PACKAGE + name).newInstance();
+    if (ReportSource.class.isInstance(obj)) {
+      return (ReportSource) obj;
+    } else
+      return new UdpReportSource();
+    } catch(Exception e) {
+      log.warn(e);
+      return new UdpReportSource();
+    }
+  }
+  
+  /*
+   * This is effectively the main thread associated with the adaptor;
+   * however, each ReportSource separately might have a thread.
+   */
+  public void run() {
+    try {
+      log.info("starting Pump Thread");
+      while(!stopping) {
+        String report = q.take();
+        log.info("got a report");
+        byte[] data = report.getBytes();
+        offset += data.length;
+        ChunkImpl i = new ChunkImpl(type, "xtrace", offset, data, this);
+        dest.add(i);
+      }
+    } catch(InterruptedException e) {
+      
+    }
+    log.info("XtrAdaptor stopping");
+  }
+
+  @Override
+  public void start( long offset) throws AdaptorException {
+
+    this.offset = offset;
+    try{
+      rs.initialize();
+      rs.setReportQueue(q);
+      reportSourceThread = new Thread(rs);
+      reportSourceThread.start();
+      pump.start();
+      log.info("starting Report Source");
+    } catch(XTraceException e) {
+      throw new AdaptorException(e);
+    }
+  }
+
+  @Override
+  public String getCurrentStatus() throws AdaptorException {
+    return type;
+  }
+
+  @Override
+  public String parseArgs(String params) {
+    rs = getXtrReportSource(params);  
+    return params; //no optional params 
+  }
+
+  @Override
+  public void hardStop() throws AdaptorException {
+    shutdown(AdaptorShutdownPolicy.HARD_STOP);
+  }
+
+  @Override
+  public long shutdown() throws AdaptorException {
+    return shutdown(AdaptorShutdownPolicy.GRACEFULLY);
+  }
+
+  @Override
+  public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
+      throws AdaptorException {
+    switch(shutdownPolicy) {
+    case HARD_STOP:
+    case GRACEFULLY:
+    case WAIT_TILL_FINISHED:
+      rs.shutdown();
+      stopping = true;
+    }
+    return offset;
+  }
+
+}

Added: hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/XtrExtract.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/XtrExtract.java?rev=794981&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/XtrExtract.java
(added)
+++ hadoop/chukwa/trunk/contrib/xtrace/src/java/edu/berkeley/chukwa_xtrace/XtrExtract.java
Fri Jul 17 06:28:28 2009
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.berkeley.chukwa_xtrace;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.AbstractProcessor;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.Record;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import java.io.IOException;
+import java.util.*;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Text;
+
+import edu.berkeley.xtrace.reporting.Report;
+import edu.berkeley.xtrace.*;
+
+/**
+ * MapReduce job to process xtrace reports coming out of chukwa demux.
+ * 
+ * Map phase unwraps the chukwa records, reduce phase does trace reconstruction.
+ * 
+ * We use task ID as the reduce sort key.
+ *
+ */
+public class XtrExtract extends Configured implements Tool {
+  
+  public static final String OUTLINK_FIELD = "__xtr_outlinks";
+  
+  /**
+   * with more than 10,000 reports, switch to on-disk sort, 
+   * instead of in-memory topological sort.
+   */
+  static final int MAX_IN_MEMORY_REPORTS = 10* 1000;
+  
+public static class MapClass extends Mapper <Object, Object, BytesWritable, Text> {
+    
+    public MapClass() {
+      System.out.println("starting xtrace map");
+    }
+    
+    @Override
+    protected void map(Object k, Object v, 
+        Mapper<Object, Object,BytesWritable, Text>.Context context)
+        throws IOException, InterruptedException 
+    {
+      Text t;
+      BytesWritable bw;
+      
+      if(k instanceof ChukwaArchiveKey && v instanceof ChunkImpl) {
+        ChunkImpl value = (ChunkImpl) v;
+        Report xtrReport = Report.createFromString(new String(value.getData()));
+        bw = new BytesWritable(xtrReport.getMetadata().getTaskId().get());
+        //FIXME: can probably optimize the above lines by doing a search in the raw bytes
+        t= new Text(value.getData());
+      } else if(k instanceof ChukwaRecordKey && v instanceof ChukwaRecord){
+        ChukwaRecord value = (ChukwaRecord) v;
+        Report xtrReport = Report.createFromString(value.getValue(Record.bodyField));
+        bw = new BytesWritable(xtrReport.getMetadata().getTaskId().get());
+        //FIXME: can probably optimize the above lines by doing a search in the raw bytes
+        t= new Text(value.getValue(Record.bodyField));
+      } else {
+        System.out.println("unexpected key/value types: "+ k.getClass().getCanonicalName()

+            + " and " + v.getClass().getCanonicalName() );
+        return;
+      }
+      context.write(bw, t);
+    }
+  }
+
+  public static class Reduce extends Reducer<BytesWritable, Text,BytesWritable,ArrayWritable>
{
+    
+    public Reduce() {}
+    
+    /**
+     * 
+     * Note that loading everything into hashtables means
+     * we implicity suppress duplicate-but-identical reports.  
+     * 
+     */
+    protected  void reduce(BytesWritable taskID, Iterable<Text> values, 
+          Reducer<BytesWritable, Text,BytesWritable,ArrayWritable>.Context context)

+          throws IOException, InterruptedException
+    {
+      String taskIDString = new String(taskID.getBytes());
+      //in both cases, key is OpId string
+      HashMap<String, Report> reports = new LinkedHashMap<String, Report>();
+
+      Counter reportCounter = context.getCounter("app", "reports");
+      Counter edgeCounter = context.getCounter("app", "edges");
+      Counter badEdgeCounter = context.getCounter("app", "reference to missing report");
+      int edgeCount = 0;
+      
+      int numReports = 0;
+      for(Text rep_text: values) {
+        Report r = Report.createFromString(rep_text.toString());
+        numReports++;
+        
+        if(numReports < MAX_IN_MEMORY_REPORTS) {
+          reports.put(r.getMetadata().getOpIdString(), r);
+        } else if(numReports == MAX_IN_MEMORY_REPORTS) {
+          //bail out, prepare to do an external sort.
+          return;
+        } else
+          ;
+    //      do the external sort
+      }
+
+      HashMap<String, Integer> counts = new HashMap<String, Integer>();
+      Queue<Report> zeroInlinkReports = new LinkedList<Report>();
+      reportCounter.increment(reports.size());
+      //FIXME: could usefully compare reports.size() with numReports;
+      //that would measure duplicate reports
+      
+      //increment link counts for children
+      for(Report r: reports.values()){ 
+        String myOpID = r.getMetadata().getOpIdString();
+        int parentCount = 0;
+        for(String inLink: r.get("Edge")) {
+          
+            //sanitize data from old, nonconformant C++ implementation
+          if(inLink.contains(","))
+            inLink = inLink.substring(0, inLink.indexOf(','));
+          
+          Report parent = reports.get(inLink);
+          if(parent != null) {
+            parent.put(OUTLINK_FIELD, myOpID);
+            parentCount++;
+            edgeCount++;
+          }
+          else { //no match
+            if(!inLink.equals("0000000000000000"))  {
+              System.out.println("no sign of parent: " + inLink);
+              badEdgeCounter.increment(1);
+            }
+            //else quietly suppress
+          }
+        }
+          
+        //if there weren't any parents, we can dequeue
+        if(parentCount == 0)
+          zeroInlinkReports.add(r);
+        else
+          counts.put(myOpID, parentCount);
+      }
+      
+      System.out.println(taskIDString+": " + edgeCount + " total edges");
+      edgeCounter.increment(edgeCount);
+      //at this point, we have a map from metadata to report, and also
+      //from report op ID to inlink count.
+      //next step is to do a topological sort.
+
+      
+      Text[] finalOutput = new Text[reports.size()];
+      System.out.println(taskIDString+": expecting to sort " + finalOutput.length + " reports");
+      int i=0;
+      while(!zeroInlinkReports.isEmpty()) {
+        Report r = zeroInlinkReports.poll();
+        if(r == null) {
+          System.err.println("poll returned null but list not empty");
+          break;
+        }
+        finalOutput[i++] = new Text(r.toString());
+        List<String> outLinks =  r.get(OUTLINK_FIELD);
+        if(outLinks != null) {
+          for(String outLink: outLinks) {
+            Integer oldCount = counts.get(outLink);
+            if(oldCount == null) {
+              oldCount = 0;  //FIXME: can this happen?
+              System.out.println("warning: found an in-edge where none was expected");
+            } if(oldCount == 1) {
+              zeroInlinkReports.add(reports.get(outLink));
+            }
+            counts.put(outLink, oldCount -1);
+          }
+        }
+      }
+      if(i != finalOutput.length) {
+        if(i > 0)
+           System.out.println("error: I only sorted " + i + " items, but expected " 
+            + finalOutput.length+", is your list cyclic?");
+        else
+          System.out.println("every event in graph has a predecessor; perhaps "
+              + "the start event isn't in the input set?");
+      }
+
+      context.write(taskID, new ArrayWritable(Text.class, finalOutput));
+      //Should sort values topologically and output list.  or?
+      
+    } //end reduce
+    
+  }//end reduce class
+
+
+  @Override
+  public int run(String[] arg) throws Exception {
+    Job extractor = new Job(getConf());
+    
+
+    extractor.setMapperClass(MapClass.class);
+    
+    extractor.setReducerClass(Reduce.class);
+    extractor.setJobName("x-trace reconstructor");
+    extractor.setJarByClass(this.getClass());
+    
+    extractor.setMapOutputKeyClass(BytesWritable.class);
+    extractor.setMapOutputValueClass(Text.class);
+    
+    extractor.setOutputKeyClass(BytesWritable.class);
+    extractor.setOutputValueClass(ArrayWritable.class);
+    
+    extractor.setInputFormatClass(SequenceFileInputFormat.class);
+    extractor.setOutputFormatClass(SequenceFileOutputFormat.class);
+    FileInputFormat.setInputPaths(extractor, new Path(arg[0]));
+    FileOutputFormat.setOutputPath(extractor, new Path(arg[1]));
+    System.out.println("looks OK.  Submitting.");
+    extractor.submit();
+//    extractor.waitForCompletion(false);
+    return 0;
+
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(),
+        new XtrExtract(), args);
+    System.exit(res);
+  }
+
+}

Added: hadoop/chukwa/trunk/contrib/xtrace/test/src/java/edu/berkeley/chukwa_xtrace/TestXtrAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/xtrace/test/src/java/edu/berkeley/chukwa_xtrace/TestXtrAdaptor.java?rev=794981&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/xtrace/test/src/java/edu/berkeley/chukwa_xtrace/TestXtrAdaptor.java
(added)
+++ hadoop/chukwa/trunk/contrib/xtrace/test/src/java/edu/berkeley/chukwa_xtrace/TestXtrAdaptor.java
Fri Jul 17 06:28:28 2009
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.berkeley.chukwa_xtrace;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
+import org.apache.hadoop.conf.Configuration;
+import junit.framework.TestCase;
+import edu.berkeley.xtrace.XTraceContext;
+import edu.berkeley.xtrace.reporting.*;
+
+public class TestXtrAdaptor extends TestCase  {
+  public void testXtrAdaptor() throws IOException,
+  ChukwaAgent.AlreadyRunningException, InterruptedException{
+    Configuration conf = new Configuration();
+    File baseDir = new File(System.getProperty("test.build.data", "/tmp"));
+    conf.set("chukwaAgent.checkpoint.dir", baseDir.getCanonicalPath());
+    conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
+    conf.set("chukwaAgent.control.port", "0");
+    ChukwaAgent agent = new ChukwaAgent(conf);
+    ChunkCatcherConnector chunks = new ChunkCatcherConnector();
+    chunks.start();
+
+    System.setProperty("xtrace.reporter", "edu.berkeley.xtrace.reporting.TcpReporter");
+    System.setProperty("xtrace.tcpdest", "localhost:7831");
+
+    assertEquals(0, agent.adaptorCount());
+    agent.processAddCommand("add edu.berkeley.chukwa_xtrace.XtrAdaptor XTrace TcpReportSource
0");
+    assertEquals(1, agent.adaptorCount());
+    
+    XTraceContext.startTrace("test", "testtrace", "atag");
+    XTraceContext.logEvent("test", "label");
+    Chunk c = chunks.waitForAChunk();
+    String report = new String(c.getData());
+    assertTrue(report.contains("Agent: test"));
+    assertTrue(report.contains("Tag: atag"));
+    System.out.println(report);
+    System.out.println("-- next chunk --- ");
+
+    c = chunks.waitForAChunk();
+    report = new String(c.getData());
+    assertTrue(report.contains("Agent: test"));
+    assertTrue(report.contains("Label: label"));
+    System.out.println(report);
+
+    System.out.println("OK");
+    agent.shutdown();
+  }
+
+}

Added: hadoop/chukwa/trunk/contrib/xtrace/test/src/java/edu/berkeley/chukwa_xtrace/TestXtrExtract.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/xtrace/test/src/java/edu/berkeley/chukwa_xtrace/TestXtrExtract.java?rev=794981&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/xtrace/test/src/java/edu/berkeley/chukwa_xtrace/TestXtrExtract.java
(added)
+++ hadoop/chukwa/trunk/contrib/xtrace/test/src/java/edu/berkeley/chukwa_xtrace/TestXtrExtract.java
Fri Jul 17 06:28:28 2009
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.berkeley.chukwa_xtrace;
+
+import java.io.IOException;
+import java.util.Calendar;
+import junit.framework.TestCase;
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.extraction.archive.ChukwaArchiveBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.util.ToolRunner;
+
+public class TestXtrExtract extends TestCase {
+  
+  public void writeASinkFile(Configuration conf, FileSystem fileSys, Path dest,
+      int chunks) throws IOException {
+    FSDataOutputStream out = fileSys.create(dest);
+
+    Calendar calendar = Calendar.getInstance();
+    SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(conf, out,
+        ChukwaArchiveKey.class, ChunkImpl.class,
+        SequenceFile.CompressionType.NONE, null);
+
+      //FIXME: do write here
+
+    seqFileWriter.close();
+    out.close();
+  }
+  
+  static final int NUM_HADOOP_SLAVES = 1;
+  static final Path OUTPUT_DIR = new Path("/test/out/");
+  static final Path INPUT_DIR = new Path("/test/in/");
+  
+ public void testArchiving() throws Exception {
+    
+    System.out.println("starting archive test");
+    Configuration conf = new Configuration();
+    System.setProperty("hadoop.log.dir", System.getProperty(
+        "test.build.data", "/tmp"));
+    MiniDFSCluster dfs = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true,
+        null);
+    FileSystem fileSys = dfs.getFileSystem();
+    fileSys.delete(OUTPUT_DIR, true);//nuke output dir
+
+    writeASinkFile(conf, fileSys, INPUT_DIR, 1000);
+    
+    FileStatus fstat = fileSys.getFileStatus(INPUT_DIR);
+    assertTrue(fstat.getLen() > 10);
+    
+    System.out.println("filesystem is " + fileSys.getUri());
+    conf.set("fs.default.name", fileSys.getUri().toString());
+    conf.setInt("io.sort.mb", 1);
+    conf.setInt("io.sort.factor", 5);
+    conf.setInt("mapred.tasktracker.map.tasks.maximum", 2);
+    conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 2);
+    
+    MiniMRCluster mr = new MiniMRCluster(NUM_HADOOP_SLAVES, fileSys.getUri()
+        .toString(), 1);
+    String[] archiveArgs = {INPUT_DIR.toString(),
+        fileSys.getUri().toString() +OUTPUT_DIR.toString() };
+    
+    
+    JobConf jc = mr.createJobConf(new JobConf(conf));
+    assertEquals("true", jc.get("archive.groupByClusterName"));
+    assertEquals(1, jc.getInt("io.sort.mb", 5));
+    
+    int returnVal = ToolRunner.run(jc,  new XtrExtract(), archiveArgs);
+    assertEquals(0, returnVal);
+    fstat = fileSys.getFileStatus(new Path("/chukwa/archives/foocluster/HadoopLogProcessor_2008_05_29.arc"));
+    assertTrue(fstat.getLen() > 10);    
+    
+    Thread.sleep(1000);
+
+    System.out.println("done!");
+ }  
+}



Mime
View raw message