chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject svn commit: r783208 - in /hadoop/chukwa/trunk: ./ contrib/ contrib/chukwa-pig/ contrib/chukwa-pig/lib/ contrib/chukwa-pig/src/ contrib/chukwa-pig/src/java/ contrib/chukwa-pig/src/java/org/ contrib/chukwa-pig/src/java/org/apache/ contrib/chukwa-pig/src/...
Date Wed, 10 Jun 2009 05:44:52 GMT
Author: eyang
Date: Wed Jun 10 05:44:51 2009
New Revision: 783208

URL: http://svn.apache.org/viewvc?rev=783208&view=rev
Log:
CHUKWA-20. Added pig support for ChukwaRecords. (Jerome Boulon via Eric Yang)

Added:
    hadoop/chukwa/trunk/contrib/
    hadoop/chukwa/trunk/contrib/build-contrib.xml
    hadoop/chukwa/trunk/contrib/build.xml
    hadoop/chukwa/trunk/contrib/chukwa-pig/
    hadoop/chukwa/trunk/contrib/chukwa-pig/build.xml
    hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar   (with props)
    hadoop/chukwa/trunk/contrib/chukwa-pig/lib/
    hadoop/chukwa/trunk/contrib/chukwa-pig/lib/pig-test.jar   (with props)
    hadoop/chukwa/trunk/contrib/chukwa-pig/lib/pig.jar   (with props)
    hadoop/chukwa/trunk/contrib/chukwa-pig/src/
    hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/
    hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/
    hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/
    hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/
    hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/
    hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/ChukwaStorage.java
    hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/PARSEDOUBLE.java
    hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/RecordMerger.java
    hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/TimePartition.java
    hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/tools/
    hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/tools/PigMover.java
    hadoop/chukwa/trunk/contrib/chukwa-pig/test/
    hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/
    hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/
    hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/
    hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/
    hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/
    hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/
    hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/PigTest.java
    hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestLocalChukwaStorage.java
    hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestParseDouble.java
    hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestRecordMerger.java
    hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestTimePartition.java
    hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/util/
    hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/util/GenerateTestFile.java
Modified:
    hadoop/chukwa/trunk/build.xml
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java

Modified: hadoop/chukwa/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/build.xml?rev=783208&r1=783207&r2=783208&view=diff
==============================================================================
--- hadoop/chukwa/trunk/build.xml (original)
+++ hadoop/chukwa/trunk/build.xml Wed Jun 10 05:44:51 2009
@@ -23,7 +23,8 @@
 	<property name="chukwaVersion" value="0.1.2" />
 	<property name="chukwaRelease" value="alpha" />
 	<property name="final.name" value="${name}-${chukwaVersion}" />
-
+	<property name="contrib.dir" value="${basedir}/contrib"/>
+	
 	<import file="../build-contrib.xml" optional="true" />
 	<available file="../build-contrib.xml" property="present" />
 
@@ -354,13 +355,13 @@
 
 	</target>
 
-	<target name="main" depends="init, compile, compress" description="Main target">
+	<target name="main" depends="init, compile, compile-contrib,compress" description="Main target">
 		<echo>
             Building the .jar files.
         </echo>
 	</target>
 
-	<target name="compile" depends="compile-common,compile-inputtools,compile-datacollection,compile-extraction,compile-hicc" description="Compilation target">
+	<target name="compile" depends="compile-common,compile-inputtools,compile-datacollection,compile-extraction,compile-hicc,compile-contrib" description="Compilation target">
 	</target>
 
 	<target name="compile-common" depends="init" description="Compilation target">
@@ -449,6 +450,14 @@
 		</copy>
 	</target>
 
+	  <target name="compile-contrib" depends="compile-common,compile-datacollection,compile-extraction">
+	     <subant target="compile">
+	        <property name="version" value="${version}"/>
+	        <fileset file="${contrib.dir}/build.xml"/>
+	     </subant>  	
+	  </target>
+	
+	
 	<!--printsummary="${test.junit.printsummary}" -->
 
 	<target name="test-chukwa" depends="compile,compile-test" description="Run Chukwa unit tests">
@@ -695,9 +704,15 @@
 		</copy>
 	</target>
 
-	<target name="test" depends="compile,compile-test,test-chukwa" description="Automated Test Framework">
+	<target name="test" depends="compile,compile-test,test-chukwa,test-contrib" description="Automated Test Framework">
 	</target>
 
+	  <target name="test-contrib" depends="chukwa_jar" description="Run contrib unit tests">
+	    <subant target="test">
+	       <property name="version" value="${version}"/>
+	       <fileset file="${contrib.dir}/build.xml"/>
+	    </subant> 
+	  </target>
 	<!-- ================================================================== -->
 	<!-- Documentation                                                      -->
 	<!-- ================================================================== -->
@@ -817,13 +832,24 @@
 	<!-- ================================================================== -->
 	<!-- Clean.  Delete the build files, and their directories              -->
 	<!-- ================================================================== -->
-	<target name="clean" depends="init" description="Clean.  Delete the build files, and their directories">
+	<target name="clean" depends="init,clean-contrib" description="Clean.  Delete the build files, and their directories">
 		<delete dir="${build.dir}" />
 		<delete dir="${build.ivy.lib.dir}/${ant.project.home}" />
 		<delete dir="build" />
 		<delete file="${ivy.jar}" />
 		<delete dir="${docs.src}/build" />
 	</target>
+	
+	<!-- ================================================================== -->
+	  <!-- Clean contrib target. For now, must be called explicitly           -->
+	  <!-- Using subant instead of ant as a workaround for 30569              -->
+	  <!-- ================================================================== -->
+	  <target name="clean-contrib">
+	     <subant target="clean">        
+	        <fileset file="${contrib.dir}/build.xml"/>
+	     </subant>  	
+	  </target>
+	
 	<!-- ====================================================== -->
 	<!-- Macro definitions                                      -->
 	<!-- ====================================================== -->
@@ -1024,6 +1050,10 @@
 			<fileset dir="${opt.dir}" />
 		</copy>
 
+		<copy todir="${build.dir}/${final.name}/contrib/chukwa-pig" failonerror="false">
+			<fileset dir="${basedir}/contrib/chukwa-pig" />
+		</copy>
+		
 		<copy todir="${build.dir}/${final.name}/tools">
 			<fileset dir="${tools.dir}" />
 		</copy>

Added: hadoop/chukwa/trunk/contrib/build-contrib.xml
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/build-contrib.xml?rev=783208&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/build-contrib.xml (added)
+++ hadoop/chukwa/trunk/contrib/build-contrib.xml Wed Jun 10 05:44:51 2009
@@ -0,0 +1,50 @@
+<?xml version="1.0"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<!-- Imported by contrib/*/build.xml files to share generic targets. -->
+
+<project name="chukwacontrib" xmlns:ivy="antlib:org.apache.ivy.ant">
+
+  <property name="name" value="${ant.project.name}"/>
+  <property name="root" value="../../"/>
+
+  <!-- Load all the default properties, and any the user wants    -->
+  <!-- to contribute (without having to type -D or edit this file -->
+  <property file="${user.home}/${name}.build.properties" />
+  <property file="${root}/build.properties" />
+
+  <property name="test.output" value="no"/>
+  <property name="test.timeout" value="900000"/>
+  <property name="build.dir" location="${root}/build/contrib/${name}"/>
+  <property name="chukwa.root.build.dir" location="${root}/build/"/>
+	<property name="chukwa.root.build.classes" location="${chukwa.root.build.dir}/classes/"/>
+  <property name="build.classes" location="${build.dir}/classes"/>
+  <property name="build.test" location="${build.dir}/test"/>
+  <!-- all jars together -->
+  <property name="javac.deprecation" value="off"/>
+  <property name="javac.debug" value="on"/>
+
+  <property name="javadoc.link"
+            value="http://java.sun.com/j2se/1.4/docs/api/"/>
+
+  <property name="build.encoding" value="ISO-8859-1"/>
+
+  <fileset id="lib.jars" dir="${root}" includes="lib/*.jar"/>
+
+</project>

Added: hadoop/chukwa/trunk/contrib/build.xml
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/build.xml?rev=783208&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/build.xml (added)
+++ hadoop/chukwa/trunk/contrib/build.xml Wed Jun 10 05:44:51 2009
@@ -0,0 +1,64 @@
+<?xml version="1.0"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<project name="chukwa-contrib" default="compile" basedir=".">
+  
+  <!-- In case one of the contrib subdirectories -->
+  <!-- fails the build or test targets and you cannot fix it: -->
+  <!-- Then add to fileset: excludes="badcontrib/build.xml" -->
+
+  <!-- ====================================================== -->
+  <!-- Compile contribs.                                      -->
+  <!-- ====================================================== -->
+  <target name="compile">
+    <subant target="compile">
+    	 <fileset dir="." includes="*/build.xml"/>
+    </subant>
+  </target>
+  
+  <!-- ====================================================== -->
+  <!-- Package contrib jars.                                  -->
+  <!-- ====================================================== -->
+  <target name="package">
+    <subant target="package">
+      <fileset dir="." includes="*/build.xml"/>
+    </subant>
+  </target>
+  
+  <!-- ====================================================== -->
+  <!-- Test all the contribs.                               -->
+  <!-- ====================================================== -->
+  <target name="test">
+    <subant target="test">
+      <fileset dir="." includes="*/build.xml"/>
+    </subant>
+  </target>
+  
+  
+  <!-- ====================================================== -->
+  <!-- Clean all the contribs.                              -->
+  <!-- ====================================================== -->
+  <target name="clean">
+    <subant target="clean">
+      <fileset dir="." includes="*/build.xml"/>
+    </subant>
+  </target>
+
+</project>
+

Added: hadoop/chukwa/trunk/contrib/chukwa-pig/build.xml
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/build.xml?rev=783208&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/chukwa-pig/build.xml (added)
+++ hadoop/chukwa/trunk/contrib/chukwa-pig/build.xml Wed Jun 10 05:44:51 2009
@@ -0,0 +1,337 @@
+<?xml version="1.0" ?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<project name="chukwa-pig" default="main" xmlns:ivy="antlib:org.apache.ivy.ant">
+
+	<property name="name" value="chukwa-pig" />
+
+	<import file="../build-contrib.xml" optional="true" />
+	<available file="../build-contrib.xml" property="present" />
+
+    <property name="src.dir" value="${basedir}/src" />
+    <property name="build.dir" value="${basedir}/build" />
+    <property name="build.classes" value="${build.dir}/classes/" />
+    <property name="conf.dir" value="${basedir}/conf" />
+    <property name="docs.src" value="${basedir}/src/docs" />
+    <property name="docs.dir" value="${basedir}/docs" />
+    <property name="changes.src" value="${docs.src}/changes" />
+    <property name="tools.dir" value="${basedir}/tools" />
+    <property name="opt.dir" value="${basedir}/opt" />
+    <property name="javac.debug" value="on" />
+    <property name="javac.version" value="1.6" />
+    <property name="test.src.dir" value="${basedir}/test/src/java/" />
+    <property name="test.lib.dir" value="${basedir}/src/test/lib" />
+    <property name="test.build.dir" value="${build.dir}/test" />
+    <property name="test.generated.dir" value="${test.build.dir}/src" />
+    <property name="test.build.data" value="${test.build.dir}/data" />
+    <property name="test.cache.data" value="${test.build.dir}/cache" />
+    <property name="test.debug.data" value="${test.build.dir}/debug" />
+    <property name="test.log.dir" value="${test.build.dir}/logs" />
+    <property name="test.build.classes" value="${test.build.dir}/classes/" />
+    <property name="test.build.testjar" value="${test.build.dir}/testjar" />
+    <property name="test.build.javadoc" value="${test.build.dir}/docs/api" />
+    <property name="test.build.javadoc.dev" value="${test.build.dir}/docs/dev-api" />
+    <property name="test.include" value="Test*" />
+    <property name="test.classpath.id" value="test.classpath" />
+    <property name="test.output" value="no" />
+    <property name="test.timeout" value="900000" />
+    <property name="test.junit.output.format" value="plain" />
+    <property name="test.junit.fork.mode" value="perTest" />
+    <property name="test.junit.printsummary" value="yes" />
+    <property name="test.junit.haltonfailure" value="yes" />
+    <property name="test.junit.maxmemory" value="256m" />
+    <property name="javadoc.link.java" value="http://java.sun.com/javase/6/docs/api/" />
+    <property name="javadoc.packages" value="org.apache.hadoop.*" />
+    <property name="javadoc.maxmemory" value="512m" />
+
+
+	
+	<target name="ivy-init-properties-local" description="to initiate ivy properties">
+		<property name="ivy.dir" location="ivy" />
+		<property name="ivysettings.xml" location="${ivy.dir}/ivysettings.xml" />
+		<loadproperties srcfile="${ivy.dir}/libraries.properties" />
+		<loadproperties srcfile="${ivy.dir}/libraries.properties" />
+		<property name="ivy.jar" location="${ivy.dir}/ivy-${ivy.version}.jar" />
+		<property name="ivy_repo_url" value="http://repo2.maven.org/maven2/org/apache/ivy/ivy/${ivy.version}/ivy-${ivy.version}.jar" />
+		<property name="build.dir" location="build" />
+		<property name="build.ivy.dir" location="${build.dir}/ivy" />
+		<property name="build.ivy.lib.dir" location="${build.ivy.dir}/lib" />
+		<property name="build.ivy.report.dir" location="${build.ivy.dir}/report" />
+		<property name="common.ivy.lib.dir" location="${build.ivy.lib.dir}/${ant.project.name}/common" />
+		<property name="rat.reporting.classname" value="rat.Report" />
+		<property name="jdiff.build.dir" value="${build.docs}/jdiff" />
+		<property name="jdiff.xml.dir" value="${build.ivy.lib.dir}/${name}/jdiff" />
+		<property name="jdiff.stable" value="0.1.2" />
+		<property name="jdiff.stable.javadoc" value="http://hadoop.apache.org/core/docs/r${jdiff.stable}/api/" />
+
+		<!--this is the naming policy for artifacts we want pulled down-->
+		<property name="ivy.artifact.retrieve.pattern" value="${ant.project.name}/[conf]/[artifact]-[revision].[ext]" />
+		<property name="jdiff.home" value="${build.ivy.lib.dir}/${ant.project.name}/jdiff" />
+		<property name="jdiff.jar" value="${jdiff.home}/jdiff-${jdiff.version}.jar" />
+		<property name="xerces.jar" value="${jdiff.home}/xerces-${xerces.version}.jar" />
+
+		<property name="clover.jar" location="${clover.home}/lib/clover.jar" />
+		<available property="clover.present" file="${clover.jar}" />
+
+	</target>
+
+	<target name="ivy-download-local" description="To download ivy" unless="offline">
+		<get src="${ivy_repo_url}" dest="${ivy.jar}" usetimestamp="true" />
+	</target>
+
+	<target name="ivy-init-dirs-local" depends="ivy-init-properties-local">
+		<mkdir dir="${build.ivy.dir}" />
+		<mkdir dir="${build.ivy.lib.dir}" />
+		<mkdir dir="${build.ivy.report.dir}" />
+	</target>
+
+	<target name="ivy-probe-antlib-local">
+		<condition property="ivy.found.local">
+			<typefound uri="antlib:org.apache.ivy.ant" name="cleancache" />
+		</condition>
+	</target>
+
+	<target name="ivy-init-antlib-local" depends="ivy-init-dirs-local,ivy-download-local,ivy-probe-antlib-local" unless="ivy.found.local">
+		<typedef uri="antlib:org.apache.ivy.ant" onerror="fail" loaderRef="ivyLoader">
+			<classpath>
+				<pathelement location="${ivy.jar}" />
+			</classpath>
+		</typedef>
+		<fail>
+			<condition>
+				<not>
+					<typefound uri="antlib:org.apache.ivy.ant" name="cleancache" />
+				</not>
+			</condition>
+			      You need Apache Ivy 2.0 or later from http://ant.apache.org/
+			      It could not be loaded from ${ivy_repo_url}
+		 </fail>
+	</target>
+
+	<target name="ivy-init-local" depends="ivy-init-antlib-local">
+		<ivy:configure settingsid="${ant.project.name}.ivy.settings" file="${ivysettings.xml}" override="true" />
+	</target>
+
+	<target name="ivy-retrieve" depends="ivy-resolve" description="Retrieve Ivy-managed artifacts for the compile/test configurations">
+		<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}" sync="true" />
+		<ivy:cachepath pathid="contrib-classpath" conf="common" />
+	</target>
+
+	<target name="ivy-resolve" depends="ivy-init-local">
+		<ivy:resolve settingsRef="${ant.project.name}.ivy.settings" />
+	</target>
+
+	<target name="ivy-resolve-javadoc" depends="ivy-init-local">
+		<ivy:resolve settingsRef="${ant.project.name}.ivy.settings" conf="javadoc" />
+	</target>
+
+	<target name="ivy-resolve-checkstyle" depends="ivy-init-local">
+		<ivy:resolve settingsRef="${ant.project.name}.ivy.settings" conf="checkstyle" />
+	</target>
+
+	<target name="ivy-retrieve-checkstyle" depends="ivy-resolve-checkstyle" description="Retrieve Ivy-managed artifacts for the checkstyle configurations">
+		<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}" />
+		<ivy:cachepath pathid="checkstyle-classpath" conf="checkstyle" />
+	</target>
+
+	<target name="ivy-resolve-test" depends="ivy-init-local">
+		<ivy:resolve settingsRef="${ant.project.name}.ivy.settings" conf="test" />
+	</target>
+
+	<target name="ivy-resolve-common" depends="ivy-init-local">
+		<ivy:resolve settingsRef="${ant.project.name}.ivy.settings" conf="common" />
+	</target>
+
+	<target name="ivy-resolve-jdiff" depends="ivy-init-local">
+		<ivy:resolve settingsRef="${ant.project.name}.ivy.settings" conf="jdiff" />
+	</target>
+
+	<target name="ivy-retrieve-jdiff" depends="ivy-resolve-jdiff" description="Retrieve Ivy-managed artifacts for the javadoc configurations">
+		<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}" />
+		<ivy:cachepath pathid="jdiff-classpath" conf="jdiff" />
+	</target>
+
+	<target name="ivy-retrieve-javadoc" depends="ivy-resolve-javadoc" description="Retrieve Ivy-managed artifacts for the javadoc configurations">
+		<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}" />
+		<ivy:cachepath pathid="javadoc-classpath" conf="javadoc" />
+	</target>
+
+	<target name="ivy-retrieve-test" depends="ivy-resolve-test" description="Retrieve Ivy-managed artifacts for the test configurations">
+		<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}" />
+		<ivy:cachepath pathid="test.classpath" conf="test" />
+	</target>
+
+	<target name="ivy-retrieve-common" depends="ivy-resolve-common" description="Retrieve Ivy-managed artifacts for the compile configurations">
+		<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}" />
+		<ivy:cachepath pathid="ivy-common.classpath" conf="common" />
+	</target>
+
+	<target name="ivy-report" depends="ivy-resolve-releaseaudit" description="Generate">
+		<ivy:report todir="${build.ivy.report.dir}" settingsRef="${ant.project.name}.ivy.settings" />
+		<echo>
+      Reports generated:${build.ivy.report.dir}
+    </echo>
+	</target>
+
+	
+	<target name="init" depends="">
+		
+		<mkdir dir="${build.dir}" />
+		<mkdir dir="${build.classes}" />
+		<mkdir dir="${build.dir}/test" />
+
+	</target>
+
+	<target name="main" depends="init,compile,jar" description="Main target">
+		<echo>
+            Building the .jar files.
+        </echo>
+	</target>
+
+	<target name="compile" depends="compile-src,jar" description="Compile">
+	</target>
+		
+	<target name="compile-src" depends="init" description="Compile">
+			<javac srcdir="src/java" destdir="${build.classes}" debug="${javac.debug}">
+				<compilerarg value="-Xlint" />
+				<include name="org/apache/hadoop/chukwa/**" />
+				<classpath refid="pig-classpath" />
+			</javac>
+	</target>
+	
+	
+	<target name="jar" depends="compile-src" description="Create chukwa-pig.jar">
+			<jar jarfile="${build.dir}/chukwa-pig.jar" basedir="${build.classes}" includes="org/apache/hadoop/chukwa/**/*.class">
+				<fileset dir="${basedir}/src/java">
+					<include name="org/apache/hadoop/chukwa/**/*.java" />
+				</fileset>
+			</jar>
+
+		<copy file="${build.dir}/chukwa-pig.jar" tofile="${basedir}/chukwa-pig.jar" />
+		</target>
+	
+	<path id="pig-classpath">
+		<pathelement location="${build.classes}" />
+		<pathelement location="${test.build.classes}" />
+		<pathelement location="${chukwa.root.build.classes}" />
+		<fileset dir="${basedir}/lib">
+			<include name="**/pig*.jar" />
+			<exclude name="**/excluded/" />
+		</fileset>
+		<fileset dir="../../build/">
+			<include name="chukwa*.jar" />
+			<exclude name="**/excluded/" />
+		</fileset>
+	</path>
+
+	<target name="compile-test" depends="init,compile,jar" description="Test target">
+		 <echo>*** Building Test Sources ***</echo>
+		 <echo>*** ${build.classes} ***</echo>
+        <mkdir dir="${test.log.dir}"/>
+		<mkdir dir="${test.build.dir}" />
+		<mkdir dir="${test.log.dir}" />
+		<mkdir dir="${test.build.classes}" />
+
+		<javac srcdir="test/src/java" destdir="${test.build.classes}" debug="${javac.debug}">
+			<compilerarg value="-Xlint" />
+			<include name="org/apache/hadoop/chukwa/**" />
+            <classpath>
+                <pathelement location="${build.classes}" />
+            	<pathelement location="${test.build.classes}" />
+                <path refid="pig-classpath"/>
+            </classpath>
+		</javac>
+
+	</target>
+	
+	<target name="test" depends="compile-test" description="chukwa-pig tests">
+        <delete dir="${test.log.dir}"/>
+        <mkdir dir="${test.log.dir}"/>
+		 <echo>*** ${test.build.classes} *** ${chukwa.root.build.dir} ***</echo>
+	     <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no" fork="yes" 
+	     	maxmemory="256m" 
+	     	dir="${basedir}" 
+	     	timeout="${test.timeout}" 
+	     	errorProperty="tests.failed" 
+	     	failureProperty="tests.failed">
+	     	<sysproperty key="chukwa.root.build.dir" value="${chukwa.root.build.dir}" />
+	     	<sysproperty key="chukwa-pig.build.dir" value="${build.dir}" />
+            <sysproperty key="hod.server" value="" />
+            <sysproperty key="hadoop.log.dir" value="${test.log.dir}"/>
+	     	
+	     	<classpath>
+                <pathelement location="${build.classes}" />
+            	<pathelement location="${test.build.classes}" />
+                <path refid="pig-classpath"/>
+            </classpath>
+            <formatter type="${test.junit.output.format}" />
+
+
+            <batchtest fork="yes" todir="${test.log.dir}" unless="testcase">
+                <fileset dir="${test.src.dir}">
+                	<include name="**/${test.include}.java" />
+                	<exclude name="**/${test.exclude}.java" />
+                </fileset>
+            </batchtest>
+	     	<batchtest todir="${test.log.dir}" if="testcase">
+				<fileset dir="${test.src.dir}" includes="**/${testcase}.java">
+				</fileset>
+			</batchtest>
+	        </junit>
+	        <fail if="tests.failed">Tests failed!</fail>
+		
+		
+	</target>
+	
+	<!-- ================================================================== -->
+	<!-- Clean.  Delete the build files, and their directories              -->
+	<!-- ================================================================== -->
+	<target name="clean" depends="init" description="Clean.  Delete the build files, and their directories">
+		<delete dir="${build.dir}" />
+		<delete dir="${build.ivy.lib.dir}/${ant.project.home}" />
+		<delete dir="build" />
+		<delete file="${ivy.jar}" />
+		<delete dir="${docs.src}/build" />
+		<delete dir="${test.build.dir}" />
+		<delete dir="${test.log.dir}" />
+		<delete dir="${test.build.classes}" />
+		<delete file="${basedir}/chukwa-pig.jar" />
+	</target>
+	
+	
+    <!-- ================================================================== -->
+	<!-- Perform audit activities for the release                           -->
+	<!-- ================================================================== -->
+	<target name="releaseaudit" depends="ivy-retrieve-releaseaudit" description="Release Audit activities">
+		<java classname="${rat.reporting.classname}" fork="true">
+			<classpath refid="releaseaudit-classpath" />
+			<arg value="${build.dir}/${final.name}" />
+		</java>
+	</target>
+
+	<target name="ivy-retrieve-releaseaudit" depends="ivy-resolve-releaseaudit" description="Retrieve Ivy-managed artifacts for the compile configurations">
+		<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}" />
+		<ivy:cachepath pathid="releaseaudit-classpath" conf="releaseaudit" />
+	</target>
+
+	<target name="ivy-resolve-releaseaudit" depends="ivy-init-local">
+		<ivy:resolve settingsRef="${ant.project.name}.ivy.settings" conf="releaseaudit" />
+	</target>
+	
+</project>

Added: hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar?rev=783208&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/chukwa/trunk/contrib/chukwa-pig/lib/pig-test.jar
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/lib/pig-test.jar?rev=783208&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/chukwa/trunk/contrib/chukwa-pig/lib/pig-test.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/chukwa/trunk/contrib/chukwa-pig/lib/pig.jar
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/lib/pig.jar?rev=783208&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/chukwa/trunk/contrib/chukwa-pig/lib/pig.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/ChukwaStorage.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/ChukwaStorage.java?rev=783208&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/ChukwaStorage.java (added)
+++ hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/ChukwaStorage.java Wed Jun 10 05:44:51 2009
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.Record;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.record.Buffer;
+import org.apache.pig.ExecType;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+
+public class ChukwaStorage extends Utf8StorageConverter implements LoadFunc,
+    StoreFunc
+{
+  Schema schema = null;
+  SequenceFileRecordReader<ChukwaRecordKey, ChukwaRecord> reader;
+  SequenceFile.Reader r;
+  SequenceFile.Writer writer;
+  DataOutputStream dos;
+  FSDataOutputStream fsd = null;
+  Calendar calendar = Calendar.getInstance();
+
+  int timestampFieldIndex = -1;
+  int pkFieldIndex = -1;
+  int sourceFieldIndex = -1;
+  int clusterNameFieldIndex = -1;
+  int recordTypeFieldIndex = -1;
+  int applicationFieldIndex = -1;
+  
+  String[] fields = null;
+  private TupleFactory tf = DefaultTupleFactory.getInstance();
+
+  public ChukwaStorage() {
+  }
+  
+  public ChukwaStorage(String... scfields ) {
+
+    this.fields = scfields;
+    for (int i=0;i< scfields.length;i++) {
+      if (scfields[i].equalsIgnoreCase("c_timestamp")) {
+        timestampFieldIndex = i;
+      } else if (scfields[i].equalsIgnoreCase("c_pk")) {
+        pkFieldIndex = i;
+      } else if (scfields[i].equalsIgnoreCase("c_source")) {
+        sourceFieldIndex = i;
+      } else if (scfields[i].equalsIgnoreCase("c_recordtype")) {
+        recordTypeFieldIndex =i;
+      } else if (scfields[i].equalsIgnoreCase("c_application")) {
+        applicationFieldIndex =i;
+      } else if (scfields[i].equalsIgnoreCase("c_cluster")) {
+        clusterNameFieldIndex =i;
+      } 
+  }
+
+  }
+  public void bindTo(String fileName, BufferedPositionedInputStream is,
+      long offset, long end) throws IOException
+  {
+    JobConf conf = PigInputFormat.sJob;
+    if (conf == null) {
+      conf = new JobConf();
+    }
+    
+    FileSplit split = new FileSplit(new Path(fileName), offset, end - offset,
+        (String[]) null);
+    reader = new SequenceFileRecordReader<ChukwaRecordKey, ChukwaRecord>(conf,
+        split);
+    if (reader.getValueClass() != ChukwaRecord.class)
+      throw new IOException(
+          "The value class in the sequence file does not match that for Chukwa data");
+
+  }
+
+  public Tuple getNext() throws IOException
+  {
+    ChukwaRecord record = new ChukwaRecord();
+    if (!reader.next(reader.createKey(), record))
+      return null;
+
+    Tuple ret = tf.newTuple(2);
+    try
+    {
+      ret.set(0, new Long(record.getTime()));
+
+      HashMap<Object, Object> pigMapFields = new HashMap<Object, Object>();
+      TreeMap<String, Buffer> mapFields = record.getMapFields();
+
+      if (mapFields != null)
+      {
+        for (String key : mapFields.keySet())
+        {
+          pigMapFields.put(key, new DataByteArray(record.getValue(key).getBytes()));
+        }
+      }
+      ret.set(1, pigMapFields);
+
+    } catch (ExecException e)
+    {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+    return ret;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.pig.LoadFunc#fieldsToRead(org.apache.pig.impl.logicalLayer.schema.Schema)
+   */
+  public void fieldsToRead(Schema schema)
+  {
+  }
+
+  public Schema determineSchema(String fileName, ExecType execType,
+      DataStorage storage) throws IOException
+  {
+    Schema newSchema =  new Schema();
+    newSchema.add(new Schema.FieldSchema("timestamp", DataType.LONG));
+    newSchema.add(new Schema.FieldSchema("map", DataType.MAP));
+   
+    return schema;
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public void bindTo(OutputStream os) throws IOException
+  {
+    JobConf conf = new JobConf();
+    dos = new DataOutputStream(os);
+    fsd = new FSDataOutputStream(dos);
+    writer = SequenceFile.createWriter(conf, fsd,
+        ChukwaRecordKey.class, ChukwaRecord.class,
+        SequenceFile.CompressionType.BLOCK, new DefaultCodec());
+  }
+
+  @Override
+  public void finish() throws IOException
+  {
+    if (reader != null) {
+      try {
+        reader.close();
+      }catch(Throwable e) {
+      }
+    }
+    
+    if (writer != null) {
+      try {
+        writer.close();
+      }catch(Throwable e) {
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void putNext(Tuple f) throws IOException
+  {
+    long timePartition = 0l;
+    long timestamp = 0L;
+    String source = "N/A";
+    String application = "N/A";
+    String recordType = "N/A";
+    String clusterName = "N/A";
+    String pk = "";
+    
+    try
+    {
+
+      ChukwaRecordKey key = new ChukwaRecordKey();
+     
+
+      ChukwaRecord record = new ChukwaRecord();
+      record.setTime(System.currentTimeMillis());
+      int inputSize = f.size();      
+      for(int i=0;i<inputSize;i++) {
+        Object field = f.get(i);
+        
+        if (field == null) {
+          continue;
+        }
+        
+        if (i == this.pkFieldIndex) {
+          pk = field.toString();
+          continue;
+        } else if ( i == this.sourceFieldIndex) {
+          source = field.toString();
+          continue;
+          
+        }else if ( i== this.recordTypeFieldIndex) {
+          recordType = field.toString();
+          continue;
+          
+        }else if ( i== this.applicationFieldIndex) {
+          application = field.toString();
+          continue;
+          
+        } else if ( i== this.clusterNameFieldIndex) {
+          clusterName = field.toString();
+          continue;
+          
+        }else if (i == this.timestampFieldIndex) {
+          
+          timestamp = Long.parseLong(field.toString());
+          record.setTime(timestamp);
+
+          synchronized (calendar)
+          {
+            calendar.setTimeInMillis(timestamp);
+            calendar.set(Calendar.MINUTE, 0);
+            calendar.set(Calendar.SECOND, 0);
+            calendar.set(Calendar.MILLISECOND, 0);
+            timePartition = calendar.getTimeInMillis();
+          }
+          record.setTime(Long.parseLong(field.toString()));
+          continue;
+          
+        }  else if (field instanceof Map) {
+          Map<Object, Object> m = (Map<Object, Object>)field;
+          for(Object o: m.keySet()) {
+            record.add(o.toString(),m.get(o).toString());
+          }
+          continue;
+        } else {
+          if (i <fields.length ) {
+            record.add(fields[i],field.toString());
+          } else {
+            record.add("field-"+i,field.toString());
+          }
+          
+          continue;
+        }
+      }
+
+      record.add(Record.tagsField, " cluster=\"" + clusterName.trim() + "\" ");
+      record.add(Record.sourceField, source);
+      record.add(Record.applicationField, application);
+      key.setKey("" + timePartition + "/" + pk + "/" + timestamp);
+      key.setReduceType(recordType);
+      
+      writer.append(key, record);
+    } catch (ExecException e)
+    {
+      IOException ioe = new IOException();
+      ioe.initCause(e);
+      throw ioe;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public Class getStorePreparationClass() throws IOException {
+    return null;
+  }
+
+}

Added: hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/PARSEDOUBLE.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/PARSEDOUBLE.java?rev=783208&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/PARSEDOUBLE.java (added)
+++ hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/PARSEDOUBLE.java Wed Jun 10 05:44:51 2009
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * string.PARSEDOUBLE implements a binding to the Java method
+ * {@link java.lang.Double#parseDouble(String)}
+ * 
+ * <dl>
+ * <dt><b>Parameters:</b></dt>
+ * <dd><code>strtoconvert</code> - <code>chararray</code>
+ * 
+ * <dt><b>Return Value:</b></dt>
+ * <dd><code>double</code> parsed value</dd>
+ * 
+ * <dt><b>Return Schema:</b></dt>
+ * <dd>parselong: double</dd>
+ * 
+ * <dt><b>Example:</b></dt>
+ * <dd><code>
+ * register string.jar;<br/>
+ * A = load 'mydata' using PigStorage() as ( stringnumber: chararray );<br/>
+ * B = foreach A generate stringnumber, org.apache.hadoop.chukwa.PARSEDOUBLE(stringnumber));
+ * </code></dd>
+ * </dl>
+ * 
+ */
+public class PARSEDOUBLE extends EvalFunc<Double> {
+  
+  public Double exec(Tuple input) throws IOException {
+    if (input == null || input.size() < 1)
+      return null;
+    try {
+      String strtoconvert = input.get(0).toString();
+      Double number = Double.parseDouble(strtoconvert);
+
+      return number;
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  @Override
+  public Schema outputSchema(Schema input) {
+    Schema schema = null;
+    try {
+      schema = new Schema(new Schema.FieldSchema(input.getField(0).alias, DataType.DOUBLE));
+    } catch (FrontendException e) {
+      schema = new Schema(new Schema.FieldSchema(getSchemaName("parseDouble", input),
+          DataType.DOUBLE));
+    }
+    return schema;
+  }
+}

Added: hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/RecordMerger.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/RecordMerger.java?rev=783208&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/RecordMerger.java (added)
+++ hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/RecordMerger.java Wed Jun 10 05:44:51 2009
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * return time,Map
+ * 
+ */
+public class RecordMerger extends EvalFunc<Map<String,Object>> {
+  
+  @SuppressWarnings("unchecked")
+  @Override
+  public Map<String,Object> exec(Tuple input) throws IOException {
+    
+    Map<String, Object> newPigMapFields = new HashMap<String, Object>();
+    DataBag bg = (DataBag) input.get(0);
+    Iterator<Tuple> bagIterator = bg.iterator();
+    Object s = null;
+    while (bagIterator.hasNext()) {
+
+      Map<Object, Object> map = (Map<Object,Object>) bagIterator.next().get(0);
+      Iterator<Object> mapIterator = map.keySet().iterator();
+      while (mapIterator.hasNext()) {
+        s = mapIterator.next();
+        newPigMapFields.put(s.toString(), map.get(s));
+      }
+    }
+
+    return newPigMapFields;
+  }
+
+  @Override
+  public Schema outputSchema(Schema input) {
+    Schema schema = null;
+    try {
+      schema = new Schema(new Schema.FieldSchema(input.getField(0).alias, DataType.MAP));
+    } catch (FrontendException e) {
+      e.printStackTrace();
+    }
+    return schema;
+  }
+}

Added: hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/TimePartition.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/TimePartition.java?rev=783208&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/TimePartition.java (added)
+++ hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/TimePartition.java Wed Jun 10 05:44:51 2009
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+public class TimePartition extends EvalFunc<Long>{
+
+  protected long period = 0;
+  
+  public TimePartition(String strPeriod) {
+    period = Long.parseLong(strPeriod);
+  }
+ 
+  @Override
+  public Long exec(Tuple input) throws IOException {
+    if (input == null || input.size() < 1)
+      return null;
+    try {
+      long timestamp = Long.parseLong(input.get(0).toString());
+      timestamp =  timestamp - (timestamp % (period));
+      
+      return timestamp;
+    } catch (Exception e) {
+      e.printStackTrace();
+      return null;
+    }
+  }
+
+  @Override
+  public Schema outputSchema(Schema input) {
+    Schema schema = null;
+    try {
+      schema = new Schema(new Schema.FieldSchema(input.getField(0).alias, DataType.LONG));
+    } catch (FrontendException e) {
+      schema = new Schema(new Schema.FieldSchema(getSchemaName("timePartition", input),
+          DataType.LONG));
+    }
+    return schema;
+  }
+
+}

Added: hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/tools/PigMover.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/tools/PigMover.java?rev=783208&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/tools/PigMover.java (added)
+++ hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/tools/PigMover.java Wed Jun 10 05:44:51 2009
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.tools;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * 
+ * Utility class to move pig output closer to the Demux output <BR>
+ * pigDir should looks like:<BR>
+ * <UL>
+ * <LI> workingDay + ".D" </LI>
+ * <LI> workingDay + "_" + workingHour + ".H" </LI>
+ * <LI> workingDay + "_" + workingHour + "_" + [0-5] + [0,5] + ".R" </LI>
+ * </UL>
+ * 
+ */
+public class PigMover {
+
+  private static Logger log = Logger.getLogger(PigMover.class);
+
+  public static void usage() {
+    System.out
+        .println("PigMover <cluster> <recordType> <pigDir> <finalOutPutDir>");
+    System.exit(-1);
+  }
+
+  /**
+   * @param args
+   * @throws IOException
+   */
+  public static void main(String[] args) throws IOException {
+
+    if (args.length != 5) {
+      log.warn("Wrong number of arguments");
+      usage();
+    }
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    log.info("fs URI:" + fs.getUri());
+
+    String cluster = args[0];
+    String recordType = args[1];
+
+    log.info("Cluster:" + cluster);
+    log.info("recordType:" + recordType);
+
+    Path rootpigDir = new Path(args[2]);
+    log.info("rootpigDir:" + rootpigDir);
+
+    Path dataDir = new Path(args[3]);
+    log.info("dataDir:" + dataDir);
+
+    if (!fs.exists(dataDir)) {
+      throw new RuntimeException("input directory does not exist.");
+    }
+    
+    String fileName = dataDir.getName();
+    log.info("fileName:" + fileName);
+
+    String rootPigPostProcessDir = args[4];
+    log.info("chukwaPostProcessDir: [" + rootPigPostProcessDir + "]");
+
+    String finalPigOutputDir = rootPigPostProcessDir + "/pigOutputDir_" + System.currentTimeMillis()
+        + "/" + cluster + "/" + recordType;
+    log.info("finalPigOutputDir:" + finalPigOutputDir);
+    
+    Path postProcessDir = new Path(finalPigOutputDir);
+    fs.mkdirs(postProcessDir);
+
+    boolean movingDone = true;
+    FileStatus[] files = fs.listStatus(dataDir);
+    
+    for (int i=0;i<files.length;i++) {
+      log.info("fileIn:" + files[i].getPath());
+      
+      Path p = new Path(finalPigOutputDir + "/"+ recordType + "_" + i + "_" + fileName + ".evt");
+      log.info("fileOut:" + p);
+      if ( fs.rename(files[i].getPath(), p) == false) {
+        log.warn("Cannot rename " + files[i].getPath() + " to " + p);
+        movingDone = false;
+      }
+    }
+    if (movingDone) {
+      log.info("Deleting:" + rootpigDir);
+      fs.delete(rootpigDir,true);
+    }
+  }
+
+}

Added: hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/PigTest.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/PigTest.java?rev=783208&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/PigTest.java (added)
+++ hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/PigTest.java Wed Jun 10 05:44:51 2009
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa;
+
+import static org.apache.pig.ExecType.LOCAL;
+import static org.apache.pig.ExecType.MAPREDUCE;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.test.MiniCluster;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class PigTest extends TestCase {
+
+  protected final Log log = LogFactory.getLog(getClass());
+
+  protected ExecType execType = LOCAL;
+
+  protected MiniCluster cluster;
+  protected PigServer pigServer;
+
+  protected abstract ExecType getExecType();
+  
+  @Before
+  @Override
+  protected void setUp() throws Exception {
+
+    if (getExecType() == MAPREDUCE) {
+      log.info("MapReduce cluster");
+      cluster = MiniCluster.buildCluster();
+      pigServer = new PigServer(MAPREDUCE, cluster.getProperties());
+    } else {
+      log.info("Local cluster");
+      pigServer = new PigServer(LOCAL);
+    }
+  }
+
+  @After
+  @Override
+  protected void tearDown() throws Exception {
+    pigServer.shutdown();
+  }
+
+}

Added: hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestLocalChukwaStorage.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestLocalChukwaStorage.java?rev=783208&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestLocalChukwaStorage.java (added)
+++ hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestLocalChukwaStorage.java Wed Jun 10 05:44:51 2009
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.util.GenerateTestFile;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.pig.ExecType;
+
+public class TestLocalChukwaStorage extends PigTest {
+
+  protected ExecType getExecType() {
+    return ExecType.LOCAL;
+  }
+
+  public void testLocal() {
+
+    File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
+    if (!tempDir.exists()) {
+      tempDir.mkdirs();
+    }
+    String directory = tempDir.getAbsolutePath() + "/TestLocalChukwaStorage_"
+        + System.currentTimeMillis() + "/";
+System.out.println(directory);
+    FileSystem fs = null;
+    Configuration conf = null;
+    
+    try {
+      conf = new Configuration();
+      fs = FileSystem.getLocal(conf);
+      GenerateTestFile.fs =fs;
+      GenerateTestFile.conf = conf;
+      GenerateTestFile.createFile(directory);
+
+      File buildDir = new File(System.getProperty("chukwa.root.build.dir", "../../build/"));
+      String[] files = buildDir.list();
+      for (String f : files) {
+        if (f.startsWith("chukwa-core") && f.endsWith(".jar")) {
+          log.info("Found" + buildDir.getAbsolutePath() + "/" + f);
+          pigServer.registerJar(buildDir.getAbsolutePath() + "/" + f);
+          break;
+        }
+      }
+
+      pigServer.registerJar(System.getProperty("chukwa-pig.build.dir", "../../build/") + "/chukwa-pig.jar");
+      
+      pigServer.registerQuery("A = load '"
+              + directory
+              + "/chukwaTestFile.evt' using  org.apache.hadoop.chukwa.ChukwaStorage() as (ts: long,fields);");
+      pigServer.registerQuery("B = FOREACH A GENERATE ts,'myCluster',fields,fields#'csource','myRecord',fields#'csource','myApplication', fields#'A';");
+      pigServer.registerQuery("define seqWriter org.apache.hadoop.chukwa.ChukwaStorage('c_timestamp', 'c_cluster' ,'fields','c_pk','c_recordtype','c_source','c_application','myFieldA');");
+      pigServer.registerQuery("STORE B into '" + directory
+          + "/chukwa-pig.evt' using seqWriter;");
+
+      try {
+        String res = dumpArachive(fs,conf,directory+ "chukwa-pig.evt");
+        String expected = "===== KEY   =====DataType: myRecordKey: 1242000000/M0/1242205800===== Value =====Timestamp : 1242205800[A] :7[B] :3[C] :9[capp] :myApplication[csource] :M0[ctags] : cluster=\"myCluster\" [myFieldA] :7===== KEY   =====DataType: myRecordKey: 1242000000/M0/1242205800===== Value =====Timestamp : 1242205800[D] :1[capp] :myApplication[csource] :M0[ctags] : cluster=\"myCluster\" ===== KEY   =====DataType: myRecordKey: 1242000000/M1/1242205800===== Value =====Timestamp : 1242205800[A] :17[capp] :myApplication[csource] :M1[ctags] : cluster=\"myCluster\" [myFieldA] :17===== KEY   =====DataType: myRecordKey: 1242000000/M1/1242205800===== Value =====Timestamp : 1242205800[B] :37[C] :51[capp] :myApplication[csource] :M1[ctags] : cluster=\"myCluster\" ===== KEY   =====DataType: myRecordKey: 1242000000/M0/1242205860===== Value =====Timestamp : 1242205860[A] :8[C] :3[D] :12[capp] :myApplication[csource] :M0[ctags] : cluster=\"myCluster\" [myFieldA] :8===== KEY   =
 ====DataType: myRecordKey: 1242000000/M0/1242205860===== Value =====Timestamp : 1242205860[A] :8[B] :6[capp] :myApplication[csource] :M0[ctags] : cluster=\"myCluster\" [myFieldA] :8===== KEY   =====DataType: myRecordKey: 1242000000/M1/1242205860===== Value =====Timestamp : 1242205860[A] :13.2[B] :23[C] :8.5[D] :6[capp] :myApplication[csource] :M1[ctags] : cluster=\"myCluster\" [myFieldA] :13.2===== KEY   =====DataType: myRecordKey: 1242000000/M1/1242205860===== Value =====Timestamp : 1242205860[A] :13.2[B] :23[C] :8.5[D] :6[capp] :myApplication[csource] :M1[ctags] : cluster=\"myCluster\" [myFieldA] :13.2===== KEY   =====DataType: myRecordKey: 1242000000/M0/1242205920===== Value =====Timestamp : 1242205920[A] :8[B] :6[C] :8[D] :6[E] :48.5[capp] :myApplication[csource] :M0[ctags] : cluster=\"myCluster\" [myFieldA] :8===== KEY   =====DataType: myRecordKey: 1242000000/M1/1242205920===== Value =====Timestamp : 1242205920[A] :8.3[B] :5.2[C] :37.7[D] :61.9[E] :40.3[capp] :myApplica
 tion[csource] :M1[ctags] : cluster=\"myCluster\" [myFieldA] :8.3===== KEY   =====DataType: myRecordKey: 1242000000/M1/1242205980===== Value =====Timestamp : 1242205980[A] :18.3[B] :1.2[C] :7.7[capp] :myApplication[csource] :M1[ctags] : cluster=\"myCluster\" [myFieldA] :18.3===== KEY   =====DataType: myRecordKey: 1242000000/M2/1242205980===== Value =====Timestamp : 1242205980[A] :8.9[B] :8.3[C] :7.2[D] :6.1[capp] :myApplication[csource] :M2[ctags] : cluster=\"myCluster\" [myFieldA] :8.9===== KEY   =====DataType: myRecordKey: 1242000000/M3/1242205920===== Value =====Timestamp : 1242205920[A] :12.5[B] :26.82[C] :89.51[capp] :myApplication[csource] :M3[ctags] : cluster=\"myCluster\" [myFieldA] :12.5===== KEY   =====DataType: myRecordKey: 1242000000/M4/1242205920===== Value =====Timestamp : 1242205920[A] :13.91[B] :21.02[C] :18.05[capp] :myApplication[csource] :M4[ctags] : cluster=\"myCluster\" [myFieldA] :13.91";
+        log.info("res[" + res + "]");
+        Assert.assertTrue("expected result differ from current result",res.equals(expected));
+
+        log.info(res);
+      } catch (Throwable e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
+      
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    } finally {
+      if (fs != null) {
+        try {
+          fs.delete(new Path(directory), true);
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+      pigServer.shutdown();
+    }
+  }
+
+  protected String dumpArachive(FileSystem fs, Configuration conf, String file)
+      throws Throwable {
+    SequenceFile.Reader reader = null;
+    log.info("File: [" +  file + "]" + fs.exists(new Path(file)));
+    try {
+      reader = new SequenceFile.Reader(fs, new Path(file), conf);
+
+      ChukwaRecordKey key = new ChukwaRecordKey();
+      ChukwaRecord record = new ChukwaRecord();
+
+      StringBuilder sb = new StringBuilder();
+      while (reader.next(key, record)) {
+       
+        sb.append("===== KEY   =====");
+
+        sb.append("DataType: " + key.getReduceType());
+        sb.append("Key: " + key.getKey());
+        sb.append("===== Value =====");
+
+        String[] fields = record.getFields();
+        Arrays.sort(fields );
+        sb.append("Timestamp : " + record.getTime());
+        for (String field : fields) {
+          sb.append("[" + field + "] :" + record.getValue(field));
+        }
+      }
+      
+      return sb.toString();
+    } catch (Throwable e) {
+      e.printStackTrace();
+      Assert.fail("Exception while reading SeqFile" + e.getMessage());
+      throw e;
+    }
+    finally {
+      if (reader != null) {
+        reader.close();
+      }
+    }
+  }
+}

Added: hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestParseDouble.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestParseDouble.java?rev=783208&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestParseDouble.java (added)
+++ hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestParseDouble.java Wed Jun 10 05:44:51 2009
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.Tuple;
+
+
+public class TestParseDouble extends TestCase {
+
+  public void testPARSEDOUBLE() {
+    PARSEDOUBLE func = new PARSEDOUBLE();
+    String in = "10";
+    Double inDouble = Double.parseDouble(in);
+    Tuple input = DefaultTupleFactory.getInstance().newTuple(in);
+
+    Double output = null;;
+    try {
+      output = func.exec(input);
+      Assert.assertTrue(output.doubleValue() == inDouble.doubleValue());
+    } catch (IOException e) {
+      Assert.fail();
+    }
+  }
+
+  public void testPARSEDOUBLE2() {
+    PARSEDOUBLE func = new PARSEDOUBLE();
+    String in = "10.86";
+    Double inDouble = Double.parseDouble(in);
+    Tuple input = DefaultTupleFactory.getInstance().newTuple(in);
+
+    Double output = null;;
+    try {
+      output = func.exec(input);
+      Assert.assertTrue(output.doubleValue() == inDouble.doubleValue());
+    } catch (IOException e) {
+      Assert.fail();
+    }
+  }
+  
+  
+  public void testPARSEDOUBLE3() {
+    PARSEDOUBLE func = new PARSEDOUBLE();
+    String in = "10aaa";
+   
+    Double output = null;;
+    try {
+      Tuple input = DefaultTupleFactory.getInstance().newTuple(in);
+      output = func.exec(input);
+      Assert.assertNull(output);
+    } catch (IOException e) {
+      Assert.fail();
+    }
+  }
+  
+  public void testPARSEDOUBLE4() {
+    PARSEDOUBLE func = new PARSEDOUBLE();
+    String in = "10.86";
+    Double inDouble = Double.parseDouble(in);
+    Tuple input = DefaultTupleFactory.getInstance().newTuple(new DataByteArray(in));
+
+    Double output = null;;
+    try {
+      output = func.exec(input);
+      Assert.assertTrue(output.doubleValue() == inDouble.doubleValue());
+    } catch (IOException e) {
+      Assert.fail();
+    }
+  }
+  
+}

Added: hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestRecordMerger.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestRecordMerger.java?rev=783208&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestRecordMerger.java (added)
+++ hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestRecordMerger.java Wed Jun 10 05:44:51 2009
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.TupleFactory;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+
+public class TestRecordMerger extends TestCase{
+
+  @SuppressWarnings("unchecked")
+  public void testRecordMerger() {
+    RecordMerger func = new RecordMerger();
+
+    try {
+      Map in = new HashMap<String, String>();
+      TupleFactory tf = TupleFactory.getInstance();
+
+      in.put("A", new  DataByteArray("100"));
+      in.put("B", new  DataByteArray("200"));
+      in.put("C", new  DataByteArray("300"));
+      
+      Map in2 = new HashMap<String, String>();
+      
+      in2.put("D", new  DataByteArray("400"));
+      in2.put("E", new  DataByteArray("500"));
+      
+      DataBag bg =  DefaultBagFactory.getInstance().newDefaultBag();
+      bg.add(tf.newTuple(in));
+      bg.add(tf.newTuple(in2));
+      
+      Map output =  func.exec( tf.newTuple(bg) );
+      
+      Assert.assertTrue(output.containsKey("A") );
+      Assert.assertTrue(output.containsKey("B") );
+      Assert.assertTrue(output.containsKey("C") );
+      Assert.assertTrue(output.containsKey("D") );
+      Assert.assertTrue(output.containsKey("E") );
+      
+      Assert.assertTrue(output.get("A").toString().equals("100") );
+      Assert.assertTrue(output.get("B").toString().equals("200") );
+      Assert.assertTrue(output.get("C").toString().equals("300") );
+      Assert.assertTrue(output.get("D").toString().equals("400") );
+      Assert.assertTrue(output.get("E").toString().equals("500") );
+      
+      
+      
+    } catch (IOException e) {
+      Assert.fail();
+    }
+    
+  }
+}

Added: hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestTimePartition.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestTimePartition.java?rev=783208&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestTimePartition.java (added)
+++ hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestTimePartition.java Wed Jun 10 05:44:51 2009
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa;
+
+import java.io.IOException;
+
+import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.Tuple;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+public class TestTimePartition extends TestCase {
+
+  public void test_5sec_TimePartition() {
+    TimePartition func = new TimePartition("" + (5*1000L));
+    
+    Long timestamp = 1243377169372L;
+    Tuple input = DefaultTupleFactory.getInstance().newTuple(timestamp);
+
+    try {
+      Long timePartition = func.exec(input);
+      long expectedTimePartition = 1243377165000L;
+      Assert.assertTrue(timePartition.longValue() == expectedTimePartition);
+    } catch (IOException e) {
+      Assert.fail();
+    }
+  }
+  
+  
+  public void test_5Min_TimePartition() {
+    TimePartition func = new TimePartition("" + (5*60*1000L));
+    
+    Long timestamp = 1243377169372L;
+    Tuple input = DefaultTupleFactory.getInstance().newTuple(timestamp);
+
+    try {
+      Long timePartition = func.exec(input);
+      long expectedTimePartition = 1243377000000L;
+      Assert.assertTrue(timePartition.longValue() == expectedTimePartition);
+    } catch (IOException e) {
+      Assert.fail();
+    }
+  }
+  
+  public void test_60Min_TimePartition() {
+    TimePartition func = new TimePartition("" + (60*60*1000L));
+    
+    Long timestamp = 1243377169372L;
+    Tuple input = DefaultTupleFactory.getInstance().newTuple(timestamp);
+
+    try {
+      Long timePartition = func.exec(input);
+      long expectedTimePartition = 1243375200000L;
+      Assert.assertTrue(timePartition.longValue() == expectedTimePartition);
+    } catch (IOException e) {
+      Assert.fail();
+    }
+  }
+  
+  public void test_1Day_TimePartition() {
+    TimePartition func = new TimePartition("" + (24*60*60*1000L));
+    
+    Long timestamp = 1243377169372L;
+    Tuple input = DefaultTupleFactory.getInstance().newTuple(timestamp);
+
+    try {
+      Long timePartition = func.exec(input);
+      long expectedTimePartition = 1243296000000L;
+      Assert.assertTrue(timePartition.longValue() == expectedTimePartition);
+    } catch (IOException e) {
+      Assert.fail();
+    }
+  }
+ 
+  public void test_largeTimePartition() {
+    try {
+      TimePartition func = new TimePartition("7776000000" );
+      Long timestamp = 1243377169372L;
+      Tuple input = DefaultTupleFactory.getInstance().newTuple(timestamp);
+      Long timePartition = func.exec(input);
+    } catch (Throwable e) {
+      Assert.fail();
+    }
+  }
+  
+}

Added: hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/util/GenerateTestFile.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/util/GenerateTestFile.java?rev=783208&view=auto
==============================================================================
--- hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/util/GenerateTestFile.java (added)
+++ hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/util/GenerateTestFile.java Wed Jun 10 05:44:51 2009
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+
+
+
+public class GenerateTestFile {
+
+/* Pig Test:
+  A = load './chukwaTestFile.evt' using  org.apache.hadoop.chukwa.ChukwaStorage() as (ts: long,fields);
+  Dump A;
+ 
+  (1242205800L,[A#7,B#3,csource#M0,C#9])
+  (1242205800L,[D#1,csource#M0])
+  (1242205800L,[A#17,csource#M1])
+  (1242205800L,[B#37,C#51,csource#M1])
+  (1242205860L,[D#12,A#8,csource#M0,C#3])
+  (1242205860L,[A#8,B#6,csource#M0])
+  (1242205860L,[D#6,A#13.2,B#23,C#8.5,csource#M1])
+  (1242205860L,[D#6,A#13.2,B#23,C#8.5,csource#M1])
+  (1242205920L,[D#6,E#48.5,A#8,B#6,C#8,csource#M0])
+  (1242205920L,[D#61.9,E#40.3,A#8.3,B#5.2,C#37.7,csource#M1])
+  (1242205980L,[A#18.3,B#1.2,csource#M1,C#7.7])
+  (1242205980L,[D#6.1,A#8.9,B#8.3,C#7.2,csource#M2])
+  (1242205920L,[A#12.5,B#26.82,csource#M3,C#89.51])
+  (1242205920L,[A#13.91,B#21.02,csource#M4,C#18.05])
+  
+ B = group A by (ts,fields#'csource');
+ Dump B;
+ 
+ ((1242205800L,M0),{(1242205800L,[A#7,B#3,csource#M0,C#9]),(1242205800L,[D#1,csource#M0])})
+ ((1242205800L,M1),{(1242205800L,[A#17,csource#M1]),(1242205800L,[B#37,C#51,csource#M1])})
+ ((1242205860L,M0),{(1242205860L,[D#12,A#8,csource#M0,C#3]),(1242205860L,[A#8,B#6,csource#M0])})
+ ((1242205860L,M1),{(1242205860L,[D#6,A#13.2,B#23,C#8.5,csource#M1]),(1242205860L,[D#6,A#13.2,B#23,C#8.5,csource#M1])})
+ ((1242205920L,M0),{(1242205920L,[D#6,E#48.5,A#8,B#6,C#8,csource#M0])})
+ ((1242205920L,M1),{(1242205920L,[D#61.9,E#40.3,A#8.3,B#5.2,C#37.7,csource#M1])})
+ ((1242205920L,M3),{(1242205920L,[A#12.5,B#26.82,csource#M3,C#89.51])})
+ ((1242205920L,M4),{(1242205920L,[A#13.91,B#21.02,csource#M4,C#18.05])})
+ ((1242205980L,M1),{(1242205980L,[A#18.3,B#1.2,csource#M1,C#7.7])})
+ ((1242205980L,M2),{(1242205980L,[D#6.1,A#8.9,B#8.3,C#7.2,csource#M2])})
+
+ C = FOREACH B GENERATE group.$0,group.$1,org.apache.hadoop.chukwa.RecordMerger(A.fields);
+ Dump C;
+ (1242205800L,M0,[D#1,A#7,B#3,csource#M0,C#9])
+ (1242205800L,M1,[A#17,B#37,C#51,csource#M1])
+ (1242205860L,M0,[D#12,A#8,B#6,csource#M0,C#3])
+ (1242205860L,M1,[D#6,A#13.2,B#23,csource#M1,C#8.5])
+ (1242205920L,M0,[D#6,E#48.5,A#8,B#6,csource#M0,C#8])
+ (1242205920L,M1,[D#61.9,E#40.3,A#8.3,B#5.2,csource#M1,C#37.7])
+ (1242205920L,M3,[A#12.5,B#26.82,C#89.51,csource#M3])
+ (1242205920L,M4,[A#13.91,B#21.02,C#18.05,csource#M4])
+ (1242205980L,M1,[A#18.3,B#1.2,C#7.7,csource#M1])
+ (1242205980L,M2,[D#6.1,A#8.9,B#8.3,csource#M2,C#7.2])
+
+ 
+*/
+
+  public static Configuration conf = null;
+  public static FileSystem fs = null;
+  
+  public static void main(String[] args) throws Exception {
+    conf = new Configuration();
+    fs = FileSystem.getLocal(conf);
+    createFile(null);
+  }
+  
+    public static void createFile(String path) throws Exception {
+      
+      Path outputFile = null; 
+    if (path != null) {
+      outputFile = new Path(path + "/chukwaTestFile.evt");
+    } else {
+      outputFile = new Path("chukwaTestFile.evt");
+    }
+
+    outputFile = outputFile.makeQualified(fs);
+    if (fs.exists(outputFile)) {
+      System.out.println("File already there, exit -1," + outputFile );
+      System.exit(-1);
+    }
+    System.out.println("outputFile:" + outputFile);
+    
+    SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(fs, conf,outputFile, ChukwaRecordKey.class,ChukwaRecord.class,CompressionType.NONE);
+    ChukwaRecordKey key = new ChukwaRecordKey();
+    key.setReduceType("TestSeqFile");
+    
+   
+    
+    String chukwaKey = "";
+    String machine = "";
+    String TimePartion = "1242205200"; //Wed, 13 May 2009 09:00:00 GMT
+    
+    {    
+      
+      machine = "M0";
+      long time = 1242205800; // Wed, 13 May 2009 09:10:00 GMT
+      chukwaKey = TimePartion +"/" + machine +"/" + time;
+      key.setKey(chukwaKey);
+      
+      ChukwaRecord record = new ChukwaRecord();
+      record.setTime(time);
+      record.add("csource", machine);
+      record.add("A", "7");
+      record.add("B", "3");
+      record.add("C", "9");
+  
+      seqFileWriter.append(key, record);
+    }
+
+    {    
+      machine = "M0";
+      long time = 1242205800; // Wed, 13 May 2009 09:10:00 GMT
+      chukwaKey = TimePartion +"/" + machine +"/" + time;
+      key.setKey(chukwaKey);
+      
+      ChukwaRecord record = new ChukwaRecord();
+      record.setTime(time);
+      record.add("csource", machine);
+      record.add("D", "1");
+  
+      seqFileWriter.append(key, record);
+    }
+
+    {    
+      machine = "M1";
+      long time = 1242205800; // Wed, 13 May 2009 09:10:00 GMT
+      chukwaKey = TimePartion +"/" + machine +"/" + time;
+      key.setKey(chukwaKey);
+      
+      ChukwaRecord record = new ChukwaRecord();
+      record.setTime(time);
+      record.add("csource", machine);
+      record.add("A", "17");
+  
+      seqFileWriter.append(key, record);
+    }
+    
+    {    
+      machine = "M1";
+      long time = 1242205800; // Wed, 13 May 2009 09:10:00 GMT
+      chukwaKey = TimePartion +"/" + machine +"/" + time;
+      key.setKey(chukwaKey);
+      
+      ChukwaRecord record = new ChukwaRecord();
+      record.setTime(time);
+      record.add("csource", machine);
+      record.add("B", "37");
+      record.add("C", "51");
+      seqFileWriter.append(key, record);
+    }
+    
+    {    
+      machine = "M0";
+      long time = 1242205860; // Wed, 13 May 2009 09:10:00 GMT
+      chukwaKey = TimePartion +"/" + machine +"/" + time;
+      key.setKey(chukwaKey);
+      
+      ChukwaRecord record = new ChukwaRecord();
+      record.setTime(time);
+      record.add("csource", machine);
+      record.add("A", "8");
+      record.add("C", "3");
+      record.add("D", "12");
+      seqFileWriter.append(key, record);
+    }
+    
+    {    
+      machine = "M0";
+      long time = 1242205860; // Wed, 13 May 2009 09:11:00 GMT
+      chukwaKey = TimePartion +"/" + machine +"/" + time;
+      key.setKey(chukwaKey);
+      
+      ChukwaRecord record = new ChukwaRecord();
+      record.setTime(time);
+      record.add("csource", machine);
+      record.add("A", "8");
+      record.add("B", "6");
+      seqFileWriter.append(key, record);
+    }
+    
+    {    
+      machine = "M1";
+      long time = 1242205860; // Wed, 13 May 2009 09:11:00 GMT
+      chukwaKey = TimePartion +"/" + machine +"/" + time;
+      key.setKey(chukwaKey);
+      
+      ChukwaRecord record = new ChukwaRecord();
+      record.setTime(time);
+      record.add("csource", machine);
+      record.add("A", "13.2");
+      record.add("B", "23");
+      record.add("C", "8.5");
+      record.add("D", "6");
+      
+      // create duplicate
+      seqFileWriter.append(key, record);
+      seqFileWriter.append(key, record);
+    }
+    
+    {    
+      machine = "M0";
+      long time = 1242205920; // Wed, 13 May 2009 09:12:00 GMT
+      chukwaKey = TimePartion +"/" + machine +"/" + time;
+      key.setKey(chukwaKey);
+      
+      ChukwaRecord record = new ChukwaRecord();
+      record.setTime(time);
+      record.add("csource", machine);
+      record.add("A", "8");
+      record.add("B", "6");
+      record.add("C", "8");
+      record.add("D", "6");
+      record.add("E", "48.5");
+      seqFileWriter.append(key, record);
+    }
+    
+    {    
+      machine = "M1";
+      long time = 1242205920; // Wed, 13 May 2009 09:12:00 GMT
+      chukwaKey = TimePartion +"/" + machine +"/" + time;
+      key.setKey(chukwaKey);
+      
+      ChukwaRecord record = new ChukwaRecord();
+      record.setTime(time);
+      record.add("csource", machine);
+      record.add("A", "8.3");
+      record.add("B", "5.2");
+      record.add("C", "37.7");
+      record.add("D", "61.9");
+      record.add("E", "40.3");
+      seqFileWriter.append(key, record);
+    }
+    
+    {    
+      machine = "M1";
+      long time = 1242205980; // Wed, 13 May 2009 09:13:00 GMT
+      chukwaKey = TimePartion +"/" + machine +"/" + time;
+      key.setKey(chukwaKey);
+      
+      ChukwaRecord record = new ChukwaRecord();
+      record.setTime(time);
+      record.add("csource", machine);
+      record.add("A", "18.3");
+      record.add("B", "1.2");
+      record.add("C", "7.7");
+      seqFileWriter.append(key, record);
+    }
+    
+    {    
+      machine = "M2";
+      long time = 1242205980; // Wed, 13 May 2009 09:13:00 GMT
+      chukwaKey = TimePartion +"/" + machine +"/" + time;
+      key.setKey(chukwaKey);
+      
+      ChukwaRecord record = new ChukwaRecord();
+      record.setTime(time);
+      record.add("csource", machine);
+      record.add("A", "8.9");
+      record.add("B", "8.3");
+      record.add("C", "7.2");
+      record.add("D", "6.1");
+      seqFileWriter.append(key, record);
+    }
+    
+    {    
+      machine = "M3";
+      // late arrival T0
+      long time = 1242205920; // Wed, 13 May 2009 09:12:00 GMT
+      chukwaKey = TimePartion +"/" + machine +"/" + time;
+      key.setKey(chukwaKey);
+      
+      ChukwaRecord record = new ChukwaRecord();
+      record.setTime(time);
+      record.add("csource", machine);
+      record.add("A", "12.5");
+      record.add("B", "26.82");
+      record.add("C", "89.51");
+      seqFileWriter.append(key, record);
+    }
+    
+    {    
+      machine = "M4";
+      // late arrival T0
+      long time = 1242205920; // Wed, 13 May 2009 09:12:00 GMT
+      chukwaKey = TimePartion +"/" + machine +"/" + time;
+      key.setKey(chukwaKey);
+      
+      ChukwaRecord record = new ChukwaRecord();
+      record.setTime(time);
+      record.add("csource", machine);
+      record.add("A", "13.91");
+      record.add("B", "21.02");
+      record.add("C", "18.05");
+      seqFileWriter.append(key, record);
+    }
+    
+    seqFileWriter.close();
+  }
+}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java?rev=783208&r1=783207&r2=783208&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java Wed Jun 10 05:44:51 2009
@@ -49,7 +49,7 @@
   
   final private static PathFilter POST_PROCESS_DEMUX_DIR_FILTER = new PathFilter() {
     public boolean accept(Path file) {
-      return file.getName().startsWith("demuxOutputDir");
+      return ( file.getName().startsWith("demuxOutputDir") || file.getName().startsWith("pigOutputDir"));
     }     
   };
 



Mime
View raw message