Return-Path: Delivered-To: apmail-hadoop-chukwa-commits-archive@minotaur.apache.org Received: (qmail 86235 invoked from network); 10 Jun 2009 05:45:19 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 10 Jun 2009 05:45:19 -0000 Received: (qmail 24500 invoked by uid 500); 10 Jun 2009 05:45:31 -0000 Delivered-To: apmail-hadoop-chukwa-commits-archive@hadoop.apache.org Received: (qmail 24489 invoked by uid 500); 10 Jun 2009 05:45:31 -0000 Mailing-List: contact chukwa-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: chukwa-dev@hadoop.apache.org Delivered-To: mailing list chukwa-commits@hadoop.apache.org Received: (qmail 24479 invoked by uid 99); 10 Jun 2009 05:45:30 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Jun 2009 05:45:30 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Jun 2009 05:45:14 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 2B61D238887A; Wed, 10 Jun 2009 05:44:53 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r783208 - in /hadoop/chukwa/trunk: ./ contrib/ contrib/chukwa-pig/ contrib/chukwa-pig/lib/ contrib/chukwa-pig/src/ contrib/chukwa-pig/src/java/ contrib/chukwa-pig/src/java/org/ contrib/chukwa-pig/src/java/org/apache/ contrib/chukwa-pig/src/... Date: Wed, 10 Jun 2009 05:44:52 -0000 To: chukwa-commits@hadoop.apache.org From: eyang@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090610054453.2B61D238887A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: eyang Date: Wed Jun 10 05:44:51 2009 New Revision: 783208 URL: http://svn.apache.org/viewvc?rev=783208&view=rev Log: CHUKWA-20. Added pig support for ChukwaRecords. (Jerome Boulon via Eric Yang) Added: hadoop/chukwa/trunk/contrib/ hadoop/chukwa/trunk/contrib/build-contrib.xml hadoop/chukwa/trunk/contrib/build.xml hadoop/chukwa/trunk/contrib/chukwa-pig/ hadoop/chukwa/trunk/contrib/chukwa-pig/build.xml hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar (with props) hadoop/chukwa/trunk/contrib/chukwa-pig/lib/ hadoop/chukwa/trunk/contrib/chukwa-pig/lib/pig-test.jar (with props) hadoop/chukwa/trunk/contrib/chukwa-pig/lib/pig.jar (with props) hadoop/chukwa/trunk/contrib/chukwa-pig/src/ hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/ hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/ hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/ hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/ hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/ hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/ChukwaStorage.java hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/PARSEDOUBLE.java hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/RecordMerger.java hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/TimePartition.java hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/tools/ hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/tools/PigMover.java hadoop/chukwa/trunk/contrib/chukwa-pig/test/ hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/ hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/ hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/ hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/ hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/ hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/ hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/PigTest.java hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestLocalChukwaStorage.java hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestParseDouble.java hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestRecordMerger.java hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestTimePartition.java hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/util/ hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/util/GenerateTestFile.java Modified: hadoop/chukwa/trunk/build.xml hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java Modified: hadoop/chukwa/trunk/build.xml URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/build.xml?rev=783208&r1=783207&r2=783208&view=diff ============================================================================== --- hadoop/chukwa/trunk/build.xml (original) +++ hadoop/chukwa/trunk/build.xml Wed Jun 10 05:44:51 2009 @@ -23,7 +23,8 @@ - + + @@ -354,13 +355,13 @@ - + Building the .jar files. - + @@ -449,6 +450,14 @@ + + + + + + + + @@ -695,9 +704,15 @@ - + + + + + + + @@ -817,13 +832,24 @@ - + + + + + + + + + + + + @@ -1024,6 +1050,10 @@ + + + + Added: hadoop/chukwa/trunk/contrib/build-contrib.xml URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/build-contrib.xml?rev=783208&view=auto ============================================================================== --- hadoop/chukwa/trunk/contrib/build-contrib.xml (added) +++ hadoop/chukwa/trunk/contrib/build-contrib.xml Wed Jun 10 05:44:51 2009 @@ -0,0 +1,50 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Added: hadoop/chukwa/trunk/contrib/build.xml URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/build.xml?rev=783208&view=auto ============================================================================== --- hadoop/chukwa/trunk/contrib/build.xml (added) +++ hadoop/chukwa/trunk/contrib/build.xml Wed Jun 10 05:44:51 2009 @@ -0,0 +1,64 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Added: hadoop/chukwa/trunk/contrib/chukwa-pig/build.xml URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/build.xml?rev=783208&view=auto ============================================================================== --- hadoop/chukwa/trunk/contrib/chukwa-pig/build.xml (added) +++ hadoop/chukwa/trunk/contrib/chukwa-pig/build.xml Wed Jun 10 05:44:51 2009 @@ -0,0 +1,337 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + You need Apache Ivy 2.0 or later from http://ant.apache.org/ + It could not be loaded from ${ivy_repo_url} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Reports generated:${build.ivy.report.dir} + + + + + + + + + + + + + + + Building the .jar files. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + *** Building Test Sources *** + *** ${build.classes} *** + + + + + + + + + + + + + + + + + + + + + *** ${test.build.classes} *** ${chukwa.root.build.dir} *** + + + + + + + + + + + + + + + + + + + + + + + + + + Tests failed! + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Added: hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar?rev=783208&view=auto ============================================================================== Binary file - no diff available. Propchange: hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Added: hadoop/chukwa/trunk/contrib/chukwa-pig/lib/pig-test.jar URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/lib/pig-test.jar?rev=783208&view=auto ============================================================================== Binary file - no diff available. Propchange: hadoop/chukwa/trunk/contrib/chukwa-pig/lib/pig-test.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Added: hadoop/chukwa/trunk/contrib/chukwa-pig/lib/pig.jar URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/lib/pig.jar?rev=783208&view=auto ============================================================================== Binary file - no diff available. Propchange: hadoop/chukwa/trunk/contrib/chukwa-pig/lib/pig.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Added: hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/ChukwaStorage.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/ChukwaStorage.java?rev=783208&view=auto ============================================================================== --- hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/ChukwaStorage.java (added) +++ hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/ChukwaStorage.java Wed Jun 10 05:44:51 2009 @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.chukwa; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Calendar; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; +import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; +import org.apache.hadoop.chukwa.extraction.engine.Record; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.SequenceFileRecordReader; +import org.apache.hadoop.record.Buffer; +import org.apache.pig.ExecType; +import org.apache.pig.LoadFunc; +import org.apache.pig.StoreFunc; +import org.apache.pig.backend.datastorage.DataStorage; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat; +import org.apache.pig.builtin.Utf8StorageConverter; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; +import org.apache.pig.data.DefaultTupleFactory; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.io.BufferedPositionedInputStream; +import org.apache.pig.impl.logicalLayer.schema.Schema; + + +public class ChukwaStorage extends Utf8StorageConverter implements LoadFunc, + StoreFunc +{ + Schema schema = null; + SequenceFileRecordReader reader; + SequenceFile.Reader r; + SequenceFile.Writer writer; + DataOutputStream dos; + FSDataOutputStream fsd = null; + Calendar calendar = Calendar.getInstance(); + + int timestampFieldIndex = -1; + int pkFieldIndex = -1; + int sourceFieldIndex = -1; + int clusterNameFieldIndex = -1; + int recordTypeFieldIndex = -1; + int applicationFieldIndex = -1; + + String[] fields = null; + private TupleFactory tf = DefaultTupleFactory.getInstance(); + + public ChukwaStorage() { + } + + public ChukwaStorage(String... scfields ) { + + this.fields = scfields; + for (int i=0;i< scfields.length;i++) { + if (scfields[i].equalsIgnoreCase("c_timestamp")) { + timestampFieldIndex = i; + } else if (scfields[i].equalsIgnoreCase("c_pk")) { + pkFieldIndex = i; + } else if (scfields[i].equalsIgnoreCase("c_source")) { + sourceFieldIndex = i; + } else if (scfields[i].equalsIgnoreCase("c_recordtype")) { + recordTypeFieldIndex =i; + } else if (scfields[i].equalsIgnoreCase("c_application")) { + applicationFieldIndex =i; + } else if (scfields[i].equalsIgnoreCase("c_cluster")) { + clusterNameFieldIndex =i; + } + } + + } + public void bindTo(String fileName, BufferedPositionedInputStream is, + long offset, long end) throws IOException + { + JobConf conf = PigInputFormat.sJob; + if (conf == null) { + conf = new JobConf(); + } + + FileSplit split = new FileSplit(new Path(fileName), offset, end - offset, + (String[]) null); + reader = new SequenceFileRecordReader(conf, + split); + if (reader.getValueClass() != ChukwaRecord.class) + throw new IOException( + "The value class in the sequence file does not match that for Chukwa data"); + + } + + public Tuple getNext() throws IOException + { + ChukwaRecord record = new ChukwaRecord(); + if (!reader.next(reader.createKey(), record)) + return null; + + Tuple ret = tf.newTuple(2); + try + { + ret.set(0, new Long(record.getTime())); + + HashMap pigMapFields = new HashMap(); + TreeMap mapFields = record.getMapFields(); + + if (mapFields != null) + { + for (String key : mapFields.keySet()) + { + pigMapFields.put(key, new DataByteArray(record.getValue(key).getBytes())); + } + } + ret.set(1, pigMapFields); + + } catch (ExecException e) + { + e.printStackTrace(); + throw new IOException(e); + } + return ret; + } + + /* + * (non-Javadoc) + * + * @see org.apache.pig.LoadFunc#fieldsToRead(org.apache.pig.impl.logicalLayer.schema.Schema) + */ + public void fieldsToRead(Schema schema) + { + } + + public Schema determineSchema(String fileName, ExecType execType, + DataStorage storage) throws IOException + { + Schema newSchema = new Schema(); + newSchema.add(new Schema.FieldSchema("timestamp", DataType.LONG)); + newSchema.add(new Schema.FieldSchema("map", DataType.MAP)); + + return schema; + } + + @SuppressWarnings("deprecation") + @Override + public void bindTo(OutputStream os) throws IOException + { + JobConf conf = new JobConf(); + dos = new DataOutputStream(os); + fsd = new FSDataOutputStream(dos); + writer = SequenceFile.createWriter(conf, fsd, + ChukwaRecordKey.class, ChukwaRecord.class, + SequenceFile.CompressionType.BLOCK, new DefaultCodec()); + } + + @Override + public void finish() throws IOException + { + if (reader != null) { + try { + reader.close(); + }catch(Throwable e) { + } + } + + if (writer != null) { + try { + writer.close(); + }catch(Throwable e) { + } + } + } + + @SuppressWarnings("unchecked") + @Override + public void putNext(Tuple f) throws IOException + { + long timePartition = 0l; + long timestamp = 0L; + String source = "N/A"; + String application = "N/A"; + String recordType = "N/A"; + String clusterName = "N/A"; + String pk = ""; + + try + { + + ChukwaRecordKey key = new ChukwaRecordKey(); + + + ChukwaRecord record = new ChukwaRecord(); + record.setTime(System.currentTimeMillis()); + int inputSize = f.size(); + for(int i=0;i m = (Map)field; + for(Object o: m.keySet()) { + record.add(o.toString(),m.get(o).toString()); + } + continue; + } else { + if (i + *
Parameters:
+ *
strtoconvert - chararray + * + *
Return Value:
+ *
double parsed value
+ * + *
Return Schema:
+ *
parselong: double
+ * + *
Example:
+ *
+ * register string.jar;
+ * A = load 'mydata' using PigStorage() as ( stringnumber: chararray );
+ * B = foreach A generate stringnumber, org.apache.hadoop.chukwa.PARSEDOUBLE(stringnumber)); + *
+ * + * + */ +public class PARSEDOUBLE extends EvalFunc { + + public Double exec(Tuple input) throws IOException { + if (input == null || input.size() < 1) + return null; + try { + String strtoconvert = input.get(0).toString(); + Double number = Double.parseDouble(strtoconvert); + + return number; + } catch (Exception e) { + return null; + } + } + + @Override + public Schema outputSchema(Schema input) { + Schema schema = null; + try { + schema = new Schema(new Schema.FieldSchema(input.getField(0).alias, DataType.DOUBLE)); + } catch (FrontendException e) { + schema = new Schema(new Schema.FieldSchema(getSchemaName("parseDouble", input), + DataType.DOUBLE)); + } + return schema; + } +} Added: hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/RecordMerger.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/RecordMerger.java?rev=783208&view=auto ============================================================================== --- hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/RecordMerger.java (added) +++ hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/RecordMerger.java Wed Jun 10 05:44:51 2009 @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.chukwa; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import org.apache.pig.EvalFunc; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.logicalLayer.schema.Schema; + +/** + * return time,Map + * + */ +public class RecordMerger extends EvalFunc> { + + @SuppressWarnings("unchecked") + @Override + public Map exec(Tuple input) throws IOException { + + Map newPigMapFields = new HashMap(); + DataBag bg = (DataBag) input.get(0); + Iterator bagIterator = bg.iterator(); + Object s = null; + while (bagIterator.hasNext()) { + + Map map = (Map) bagIterator.next().get(0); + Iterator mapIterator = map.keySet().iterator(); + while (mapIterator.hasNext()) { + s = mapIterator.next(); + newPigMapFields.put(s.toString(), map.get(s)); + } + } + + return newPigMapFields; + } + + @Override + public Schema outputSchema(Schema input) { + Schema schema = null; + try { + schema = new Schema(new Schema.FieldSchema(input.getField(0).alias, DataType.MAP)); + } catch (FrontendException e) { + e.printStackTrace(); + } + return schema; + } +} Added: hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/TimePartition.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/TimePartition.java?rev=783208&view=auto ============================================================================== --- hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/TimePartition.java (added) +++ hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/TimePartition.java Wed Jun 10 05:44:51 2009 @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.chukwa; + +import java.io.IOException; + +import org.apache.pig.EvalFunc; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.logicalLayer.schema.Schema; + +public class TimePartition extends EvalFunc{ + + protected long period = 0; + + public TimePartition(String strPeriod) { + period = Long.parseLong(strPeriod); + } + + @Override + public Long exec(Tuple input) throws IOException { + if (input == null || input.size() < 1) + return null; + try { + long timestamp = Long.parseLong(input.get(0).toString()); + timestamp = timestamp - (timestamp % (period)); + + return timestamp; + } catch (Exception e) { + e.printStackTrace(); + return null; + } + } + + @Override + public Schema outputSchema(Schema input) { + Schema schema = null; + try { + schema = new Schema(new Schema.FieldSchema(input.getField(0).alias, DataType.LONG)); + } catch (FrontendException e) { + schema = new Schema(new Schema.FieldSchema(getSchemaName("timePartition", input), + DataType.LONG)); + } + return schema; + } + +} Added: hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/tools/PigMover.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/tools/PigMover.java?rev=783208&view=auto ============================================================================== --- hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/tools/PigMover.java (added) +++ hadoop/chukwa/trunk/contrib/chukwa-pig/src/java/org/apache/hadoop/chukwa/tools/PigMover.java Wed Jun 10 05:44:51 2009 @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.chukwa.tools; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; + +/** + * + * Utility class to move pig output closer to the Demux output
+ * pigDir should looks like:
+ *
    + *
  • workingDay + ".D"
  • + *
  • workingDay + "_" + workingHour + ".H"
  • + *
  • workingDay + "_" + workingHour + "_" + [0-5] + [0,5] + ".R"
  • + *
+ * + */ +public class PigMover { + + private static Logger log = Logger.getLogger(PigMover.class); + + public static void usage() { + System.out + .println("PigMover "); + System.exit(-1); + } + + /** + * @param args + * @throws IOException + */ + public static void main(String[] args) throws IOException { + + if (args.length != 5) { + log.warn("Wrong number of arguments"); + usage(); + } + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + log.info("fs URI:" + fs.getUri()); + + String cluster = args[0]; + String recordType = args[1]; + + log.info("Cluster:" + cluster); + log.info("recordType:" + recordType); + + Path rootpigDir = new Path(args[2]); + log.info("rootpigDir:" + rootpigDir); + + Path dataDir = new Path(args[3]); + log.info("dataDir:" + dataDir); + + if (!fs.exists(dataDir)) { + throw new RuntimeException("input directory does not exist."); + } + + String fileName = dataDir.getName(); + log.info("fileName:" + fileName); + + String rootPigPostProcessDir = args[4]; + log.info("chukwaPostProcessDir: [" + rootPigPostProcessDir + "]"); + + String finalPigOutputDir = rootPigPostProcessDir + "/pigOutputDir_" + System.currentTimeMillis() + + "/" + cluster + "/" + recordType; + log.info("finalPigOutputDir:" + finalPigOutputDir); + + Path postProcessDir = new Path(finalPigOutputDir); + fs.mkdirs(postProcessDir); + + boolean movingDone = true; + FileStatus[] files = fs.listStatus(dataDir); + + for (int i=0;i(); + TupleFactory tf = TupleFactory.getInstance(); + + in.put("A", new DataByteArray("100")); + in.put("B", new DataByteArray("200")); + in.put("C", new DataByteArray("300")); + + Map in2 = new HashMap(); + + in2.put("D", new DataByteArray("400")); + in2.put("E", new DataByteArray("500")); + + DataBag bg = DefaultBagFactory.getInstance().newDefaultBag(); + bg.add(tf.newTuple(in)); + bg.add(tf.newTuple(in2)); + + Map output = func.exec( tf.newTuple(bg) ); + + Assert.assertTrue(output.containsKey("A") ); + Assert.assertTrue(output.containsKey("B") ); + Assert.assertTrue(output.containsKey("C") ); + Assert.assertTrue(output.containsKey("D") ); + Assert.assertTrue(output.containsKey("E") ); + + Assert.assertTrue(output.get("A").toString().equals("100") ); + Assert.assertTrue(output.get("B").toString().equals("200") ); + Assert.assertTrue(output.get("C").toString().equals("300") ); + Assert.assertTrue(output.get("D").toString().equals("400") ); + Assert.assertTrue(output.get("E").toString().equals("500") ); + + + + } catch (IOException e) { + Assert.fail(); + } + + } +} Added: hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestTimePartition.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestTimePartition.java?rev=783208&view=auto ============================================================================== --- hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestTimePartition.java (added) +++ hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/TestTimePartition.java Wed Jun 10 05:44:51 2009 @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.chukwa; + +import java.io.IOException; + +import org.apache.pig.data.DefaultTupleFactory; +import org.apache.pig.data.Tuple; + +import junit.framework.Assert; +import junit.framework.TestCase; + +public class TestTimePartition extends TestCase { + + public void test_5sec_TimePartition() { + TimePartition func = new TimePartition("" + (5*1000L)); + + Long timestamp = 1243377169372L; + Tuple input = DefaultTupleFactory.getInstance().newTuple(timestamp); + + try { + Long timePartition = func.exec(input); + long expectedTimePartition = 1243377165000L; + Assert.assertTrue(timePartition.longValue() == expectedTimePartition); + } catch (IOException e) { + Assert.fail(); + } + } + + + public void test_5Min_TimePartition() { + TimePartition func = new TimePartition("" + (5*60*1000L)); + + Long timestamp = 1243377169372L; + Tuple input = DefaultTupleFactory.getInstance().newTuple(timestamp); + + try { + Long timePartition = func.exec(input); + long expectedTimePartition = 1243377000000L; + Assert.assertTrue(timePartition.longValue() == expectedTimePartition); + } catch (IOException e) { + Assert.fail(); + } + } + + public void test_60Min_TimePartition() { + TimePartition func = new TimePartition("" + (60*60*1000L)); + + Long timestamp = 1243377169372L; + Tuple input = DefaultTupleFactory.getInstance().newTuple(timestamp); + + try { + Long timePartition = func.exec(input); + long expectedTimePartition = 1243375200000L; + Assert.assertTrue(timePartition.longValue() == expectedTimePartition); + } catch (IOException e) { + Assert.fail(); + } + } + + public void test_1Day_TimePartition() { + TimePartition func = new TimePartition("" + (24*60*60*1000L)); + + Long timestamp = 1243377169372L; + Tuple input = DefaultTupleFactory.getInstance().newTuple(timestamp); + + try { + Long timePartition = func.exec(input); + long expectedTimePartition = 1243296000000L; + Assert.assertTrue(timePartition.longValue() == expectedTimePartition); + } catch (IOException e) { + Assert.fail(); + } + } + + public void test_largeTimePartition() { + try { + TimePartition func = new TimePartition("7776000000" ); + Long timestamp = 1243377169372L; + Tuple input = DefaultTupleFactory.getInstance().newTuple(timestamp); + Long timePartition = func.exec(input); + } catch (Throwable e) { + Assert.fail(); + } + } + +} Added: hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/util/GenerateTestFile.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/util/GenerateTestFile.java?rev=783208&view=auto ============================================================================== --- hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/util/GenerateTestFile.java (added) +++ hadoop/chukwa/trunk/contrib/chukwa-pig/test/src/java/org/apache/hadoop/chukwa/util/GenerateTestFile.java Wed Jun 10 05:44:51 2009 @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.chukwa.util; + +import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; +import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; + + + +public class GenerateTestFile { + +/* Pig Test: + A = load './chukwaTestFile.evt' using org.apache.hadoop.chukwa.ChukwaStorage() as (ts: long,fields); + Dump A; + + (1242205800L,[A#7,B#3,csource#M0,C#9]) + (1242205800L,[D#1,csource#M0]) + (1242205800L,[A#17,csource#M1]) + (1242205800L,[B#37,C#51,csource#M1]) + (1242205860L,[D#12,A#8,csource#M0,C#3]) + (1242205860L,[A#8,B#6,csource#M0]) + (1242205860L,[D#6,A#13.2,B#23,C#8.5,csource#M1]) + (1242205860L,[D#6,A#13.2,B#23,C#8.5,csource#M1]) + (1242205920L,[D#6,E#48.5,A#8,B#6,C#8,csource#M0]) + (1242205920L,[D#61.9,E#40.3,A#8.3,B#5.2,C#37.7,csource#M1]) + (1242205980L,[A#18.3,B#1.2,csource#M1,C#7.7]) + (1242205980L,[D#6.1,A#8.9,B#8.3,C#7.2,csource#M2]) + (1242205920L,[A#12.5,B#26.82,csource#M3,C#89.51]) + (1242205920L,[A#13.91,B#21.02,csource#M4,C#18.05]) + + B = group A by (ts,fields#'csource'); + Dump B; + + ((1242205800L,M0),{(1242205800L,[A#7,B#3,csource#M0,C#9]),(1242205800L,[D#1,csource#M0])}) + ((1242205800L,M1),{(1242205800L,[A#17,csource#M1]),(1242205800L,[B#37,C#51,csource#M1])}) + ((1242205860L,M0),{(1242205860L,[D#12,A#8,csource#M0,C#3]),(1242205860L,[A#8,B#6,csource#M0])}) + ((1242205860L,M1),{(1242205860L,[D#6,A#13.2,B#23,C#8.5,csource#M1]),(1242205860L,[D#6,A#13.2,B#23,C#8.5,csource#M1])}) + ((1242205920L,M0),{(1242205920L,[D#6,E#48.5,A#8,B#6,C#8,csource#M0])}) + ((1242205920L,M1),{(1242205920L,[D#61.9,E#40.3,A#8.3,B#5.2,C#37.7,csource#M1])}) + ((1242205920L,M3),{(1242205920L,[A#12.5,B#26.82,csource#M3,C#89.51])}) + ((1242205920L,M4),{(1242205920L,[A#13.91,B#21.02,csource#M4,C#18.05])}) + ((1242205980L,M1),{(1242205980L,[A#18.3,B#1.2,csource#M1,C#7.7])}) + ((1242205980L,M2),{(1242205980L,[D#6.1,A#8.9,B#8.3,C#7.2,csource#M2])}) + + C = FOREACH B GENERATE group.$0,group.$1,org.apache.hadoop.chukwa.RecordMerger(A.fields); + Dump C; + (1242205800L,M0,[D#1,A#7,B#3,csource#M0,C#9]) + (1242205800L,M1,[A#17,B#37,C#51,csource#M1]) + (1242205860L,M0,[D#12,A#8,B#6,csource#M0,C#3]) + (1242205860L,M1,[D#6,A#13.2,B#23,csource#M1,C#8.5]) + (1242205920L,M0,[D#6,E#48.5,A#8,B#6,csource#M0,C#8]) + (1242205920L,M1,[D#61.9,E#40.3,A#8.3,B#5.2,csource#M1,C#37.7]) + (1242205920L,M3,[A#12.5,B#26.82,C#89.51,csource#M3]) + (1242205920L,M4,[A#13.91,B#21.02,C#18.05,csource#M4]) + (1242205980L,M1,[A#18.3,B#1.2,C#7.7,csource#M1]) + (1242205980L,M2,[D#6.1,A#8.9,B#8.3,csource#M2,C#7.2]) + + +*/ + + public static Configuration conf = null; + public static FileSystem fs = null; + + public static void main(String[] args) throws Exception { + conf = new Configuration(); + fs = FileSystem.getLocal(conf); + createFile(null); + } + + public static void createFile(String path) throws Exception { + + Path outputFile = null; + if (path != null) { + outputFile = new Path(path + "/chukwaTestFile.evt"); + } else { + outputFile = new Path("chukwaTestFile.evt"); + } + + outputFile = outputFile.makeQualified(fs); + if (fs.exists(outputFile)) { + System.out.println("File already there, exit -1," + outputFile ); + System.exit(-1); + } + System.out.println("outputFile:" + outputFile); + + SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(fs, conf,outputFile, ChukwaRecordKey.class,ChukwaRecord.class,CompressionType.NONE); + ChukwaRecordKey key = new ChukwaRecordKey(); + key.setReduceType("TestSeqFile"); + + + + String chukwaKey = ""; + String machine = ""; + String TimePartion = "1242205200"; //Wed, 13 May 2009 09:00:00 GMT + + { + + machine = "M0"; + long time = 1242205800; // Wed, 13 May 2009 09:10:00 GMT + chukwaKey = TimePartion +"/" + machine +"/" + time; + key.setKey(chukwaKey); + + ChukwaRecord record = new ChukwaRecord(); + record.setTime(time); + record.add("csource", machine); + record.add("A", "7"); + record.add("B", "3"); + record.add("C", "9"); + + seqFileWriter.append(key, record); + } + + { + machine = "M0"; + long time = 1242205800; // Wed, 13 May 2009 09:10:00 GMT + chukwaKey = TimePartion +"/" + machine +"/" + time; + key.setKey(chukwaKey); + + ChukwaRecord record = new ChukwaRecord(); + record.setTime(time); + record.add("csource", machine); + record.add("D", "1"); + + seqFileWriter.append(key, record); + } + + { + machine = "M1"; + long time = 1242205800; // Wed, 13 May 2009 09:10:00 GMT + chukwaKey = TimePartion +"/" + machine +"/" + time; + key.setKey(chukwaKey); + + ChukwaRecord record = new ChukwaRecord(); + record.setTime(time); + record.add("csource", machine); + record.add("A", "17"); + + seqFileWriter.append(key, record); + } + + { + machine = "M1"; + long time = 1242205800; // Wed, 13 May 2009 09:10:00 GMT + chukwaKey = TimePartion +"/" + machine +"/" + time; + key.setKey(chukwaKey); + + ChukwaRecord record = new ChukwaRecord(); + record.setTime(time); + record.add("csource", machine); + record.add("B", "37"); + record.add("C", "51"); + seqFileWriter.append(key, record); + } + + { + machine = "M0"; + long time = 1242205860; // Wed, 13 May 2009 09:10:00 GMT + chukwaKey = TimePartion +"/" + machine +"/" + time; + key.setKey(chukwaKey); + + ChukwaRecord record = new ChukwaRecord(); + record.setTime(time); + record.add("csource", machine); + record.add("A", "8"); + record.add("C", "3"); + record.add("D", "12"); + seqFileWriter.append(key, record); + } + + { + machine = "M0"; + long time = 1242205860; // Wed, 13 May 2009 09:11:00 GMT + chukwaKey = TimePartion +"/" + machine +"/" + time; + key.setKey(chukwaKey); + + ChukwaRecord record = new ChukwaRecord(); + record.setTime(time); + record.add("csource", machine); + record.add("A", "8"); + record.add("B", "6"); + seqFileWriter.append(key, record); + } + + { + machine = "M1"; + long time = 1242205860; // Wed, 13 May 2009 09:11:00 GMT + chukwaKey = TimePartion +"/" + machine +"/" + time; + key.setKey(chukwaKey); + + ChukwaRecord record = new ChukwaRecord(); + record.setTime(time); + record.add("csource", machine); + record.add("A", "13.2"); + record.add("B", "23"); + record.add("C", "8.5"); + record.add("D", "6"); + + // create duplicate + seqFileWriter.append(key, record); + seqFileWriter.append(key, record); + } + + { + machine = "M0"; + long time = 1242205920; // Wed, 13 May 2009 09:12:00 GMT + chukwaKey = TimePartion +"/" + machine +"/" + time; + key.setKey(chukwaKey); + + ChukwaRecord record = new ChukwaRecord(); + record.setTime(time); + record.add("csource", machine); + record.add("A", "8"); + record.add("B", "6"); + record.add("C", "8"); + record.add("D", "6"); + record.add("E", "48.5"); + seqFileWriter.append(key, record); + } + + { + machine = "M1"; + long time = 1242205920; // Wed, 13 May 2009 09:12:00 GMT + chukwaKey = TimePartion +"/" + machine +"/" + time; + key.setKey(chukwaKey); + + ChukwaRecord record = new ChukwaRecord(); + record.setTime(time); + record.add("csource", machine); + record.add("A", "8.3"); + record.add("B", "5.2"); + record.add("C", "37.7"); + record.add("D", "61.9"); + record.add("E", "40.3"); + seqFileWriter.append(key, record); + } + + { + machine = "M1"; + long time = 1242205980; // Wed, 13 May 2009 09:13:00 GMT + chukwaKey = TimePartion +"/" + machine +"/" + time; + key.setKey(chukwaKey); + + ChukwaRecord record = new ChukwaRecord(); + record.setTime(time); + record.add("csource", machine); + record.add("A", "18.3"); + record.add("B", "1.2"); + record.add("C", "7.7"); + seqFileWriter.append(key, record); + } + + { + machine = "M2"; + long time = 1242205980; // Wed, 13 May 2009 09:13:00 GMT + chukwaKey = TimePartion +"/" + machine +"/" + time; + key.setKey(chukwaKey); + + ChukwaRecord record = new ChukwaRecord(); + record.setTime(time); + record.add("csource", machine); + record.add("A", "8.9"); + record.add("B", "8.3"); + record.add("C", "7.2"); + record.add("D", "6.1"); + seqFileWriter.append(key, record); + } + + { + machine = "M3"; + // late arrival T0 + long time = 1242205920; // Wed, 13 May 2009 09:12:00 GMT + chukwaKey = TimePartion +"/" + machine +"/" + time; + key.setKey(chukwaKey); + + ChukwaRecord record = new ChukwaRecord(); + record.setTime(time); + record.add("csource", machine); + record.add("A", "12.5"); + record.add("B", "26.82"); + record.add("C", "89.51"); + seqFileWriter.append(key, record); + } + + { + machine = "M4"; + // late arrival T0 + long time = 1242205920; // Wed, 13 May 2009 09:12:00 GMT + chukwaKey = TimePartion +"/" + machine +"/" + time; + key.setKey(chukwaKey); + + ChukwaRecord record = new ChukwaRecord(); + record.setTime(time); + record.add("csource", machine); + record.add("A", "13.91"); + record.add("B", "21.02"); + record.add("C", "18.05"); + seqFileWriter.append(key, record); + } + + seqFileWriter.close(); + } +} Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java?rev=783208&r1=783207&r2=783208&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java Wed Jun 10 05:44:51 2009 @@ -49,7 +49,7 @@ final private static PathFilter POST_PROCESS_DEMUX_DIR_FILTER = new PathFilter() { public boolean accept(Path file) { - return file.getName().startsWith("demuxOutputDir"); + return ( file.getName().startsWith("demuxOutputDir") || file.getName().startsWith("pigOutputDir")); } };