chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r904244 - in /hadoop/chukwa/trunk: CHANGES.txt src/java/org/apache/hadoop/chukwa/util/CreateRecordFile.java src/test/org/apache/hadoop/chukwa/util/TestCreateRecordFile.java
Date Thu, 28 Jan 2010 20:01:06 GMT
Author: asrabkin
Date: Thu Jan 28 20:01:06 2010
New Revision: 904244

URL: http://svn.apache.org/viewvc?rev=904244&view=rev
Log:
CHUKWA-449. Utility to generate sequence file from log file. Contributed by Bill Graham.

Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/CreateRecordFile.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestCreateRecordFile.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=904244&r1=904243&r2=904244&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Thu Jan 28 20:01:06 2010
@@ -4,6 +4,8 @@
 
   NEW FEATURES
 
+    CHUKWA-449. Utility to generate sequence file from log file. (Bill Graham via asrabkin)
+
     CHUKWA-440. Enable addon jar file for Demux from Distributed Cache. (Eric Yang)
 
     CHUKWA-448. Write-ahead buffering for arbitrary adaptors. (asrabkin)

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/CreateRecordFile.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/CreateRecordFile.java?rev=904244&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/CreateRecordFile.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/CreateRecordFile.java Thu Jan
28 20:01:06 2010
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
+import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.TsProcessor;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import java.io.IOException;
+import java.io.File;
+import java.io.BufferedReader;
+import java.io.FileReader;
+
+/**
+ * Helper class used to create sequence files of Chukwa records
+ */
+public class CreateRecordFile {
+
+   public static void makeTestSequenceFile(File inputFile,
+                                           Path outputFile,
+                                           String clusterName,
+                                           String dataType,
+                                           String streamName,
+                                           MapProcessor processor) throws IOException {
+
+     //initialize the output collector and the default processor
+     MockOutputCollector collector = new MockOutputCollector();
+     if (processor == null) processor = new TsProcessor();
+
+     //initialize the sequence file writer
+     Configuration conf = new Configuration();
+     FileSystem fs = outputFile.getFileSystem(conf);
+     FSDataOutputStream out = fs.create(outputFile);
+
+     SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(conf, out,
+                                    ChukwaRecordKey.class, ChukwaRecord.class,
+                                    SequenceFile.CompressionType.NONE, null);
+     long lastSeqID = 0;
+     String line;
+     BufferedReader reader = new BufferedReader(new FileReader(inputFile));
+
+     // for each line, create a chunk and an arckive key, pass it to the
+     // processor, then write it to the sequence file.  
+     while ((line = reader.readLine()) != null) {
+
+       ChunkImpl chunk = new ChunkImpl(dataType, streamName,
+         line.length()  + lastSeqID, line.getBytes(), null);
+       lastSeqID += line.length();
+       chunk.addTag("cluster=\"" + clusterName + "\"");
+
+       ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
+       archiveKey.setTimePartition(System.currentTimeMillis());
+       archiveKey.setDataType(chunk.getDataType());
+       archiveKey.setStreamName(chunk.getStreamName());
+       archiveKey.setSeqId(chunk.getSeqID());
+
+       processor.process(archiveKey, chunk, collector, null);
+       seqFileWriter.append(collector.getChukwaRecordKey(),
+                            collector.getChukwaRecord());
+     }
+
+     out.flush();
+     out.close();
+     seqFileWriter.close();
+     reader.close();
+   }
+
+   private static class MockOutputCollector
+           implements OutputCollector<ChukwaRecordKey, ChukwaRecord> {
+     ChukwaRecordKey chukwaRecordKey;
+     ChukwaRecord chukwaRecord;
+
+     public void collect(ChukwaRecordKey chukwaRecordKey,
+                         ChukwaRecord chukwaRecord) throws IOException {
+       this.chukwaRecordKey = chukwaRecordKey;
+       this.chukwaRecord = chukwaRecord;
+     }
+
+     public ChukwaRecordKey getChukwaRecordKey() { return chukwaRecordKey; }
+     public ChukwaRecord getChukwaRecord() { return chukwaRecord; }
+   }
+
+   public static void main(String[] args) throws IOException,
+                                                 ClassNotFoundException,
+                                                 IllegalAccessException,
+                                                 InstantiationException {
+     if((args.length < 0 && args[0].contains("-h")) || args.length < 2) {
+       usage();
+     }
+
+     File inputFile = new File(args[0]);
+     Path outputFile = new Path(args[1]);
+     String clusterName = "testClusterName";
+     String dataType = "testDataType";
+     String streamName = "testStreamName";               
+     MapProcessor processor = new TsProcessor();
+
+     if (args.length > 2) clusterName = args[2];
+     if (args.length > 3) dataType = args[3];
+     if (args.length > 4) streamName = args[4];
+
+     if (args.length > 5) {
+       Class clazz = Class.forName(args[5]);
+       processor = (MapProcessor)clazz.newInstance();
+     }
+
+     System.out.println("Creating sequence file using the following input:");
+     System.out.println("inputFile  : " + inputFile);
+     System.out.println("outputFile : " + outputFile);
+     System.out.println("clusterName: " + clusterName);
+     System.out.println("dataType   : " + dataType);
+     System.out.println("streamName : " + streamName);
+     System.out.println("processor  : " + processor.getClass().getName());
+
+     makeTestSequenceFile(inputFile, outputFile, clusterName, dataType, streamName, processor);
+
+     System.out.println("Done");
+   }
+
+   public static void usage() {
+     System.out.println("Usage: java " + TempFileUtil.class.toString().split(" ")[1] + "
<inputFile> <outputFile> [clusterName] [dataType] [streamName] [processorClass]");
+     System.out.println("Description: Takes a plain text input file and generates a Hadoop
sequence file contaning ChukwaRecordKey,ChukwaRecord entries");
+     System.out.println("Parameters: inputFile      - Text input file to read");
+     System.out.println("            outputFile     - Sequence file to create");
+     System.out.println("            clusterName    - Cluster name to use in the records");
+     System.out.println("            dataType       - Data type to use in the records");
+     System.out.println("            streamName     - Stream name to use in the records");
+     System.out.println("            processorClass - Processor class to use. Defaults to
TsProcessor");
+     System.exit(0);
+   }
+}

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestCreateRecordFile.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestCreateRecordFile.java?rev=904244&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestCreateRecordFile.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestCreateRecordFile.java Thu
Jan 28 20:01:06 2010
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+import junit.framework.TestCase;
+
+import java.text.SimpleDateFormat;
+import java.text.ParseException;
+import java.net.InetAddress;
+import java.io.File;
+import java.io.IOException;
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.util.Calendar;
+
+import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.TsProcessor;
+import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+
+public class TestCreateRecordFile extends TestCase {
+  private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+  private Calendar calendar = Calendar.getInstance();
+
+  public void testWriteSequenceFile() throws IOException, ParseException {
+    String outputDir = System.getProperty("test.build.data", "/tmp");
+
+    //input configs
+    File inputFile = new File("test/samples/ClientTrace.log");
+    Path outputFile = new Path(outputDir + "/" + this.getClass().getName() + "/ClientTrace.evt");
+    String clusterName = "testClusterName";
+    String dataType = "testDataType";
+    String streamName = "testStreamName";
+    MapProcessor processor = new TsProcessor();
+
+    //create the sequence file
+    CreateRecordFile.makeTestSequenceFile(inputFile, outputFile, clusterName,
+                                          dataType, streamName, processor);
+    //read the output file
+    ChukwaRecordKey key = new ChukwaRecordKey();
+    ChukwaRecord record = new ChukwaRecord();
+
+    Configuration conf = new Configuration();
+    FileSystem fs = outputFile.getFileSystem(conf);
+    SequenceFile.Reader sequenceReader = new SequenceFile.Reader(fs, outputFile, conf);
+
+    //read the input file to assert
+    BufferedReader inputReader = new BufferedReader(new FileReader(inputFile));
+
+    String expectedHostname = InetAddress.getLocalHost().getHostName();
+
+    //Read input and output back comparing each
+    int i = 0;
+    while (sequenceReader.next(key, record)) {
+      String line = inputReader.readLine();
+      assertNotNull("Sequence file contains more records than input file", line);
+
+      long expectedTime = sdf.parse(line.substring(0,23)).getTime();
+      calendar.setTimeInMillis(expectedTime);
+      calendar.set(Calendar.MINUTE, 0);
+      calendar.set(Calendar.SECOND, 0);
+      calendar.set(Calendar.MILLISECOND, 0);
+
+      String expectedKey = calendar.getTimeInMillis() + "/" +
+                           expectedHostname + "/" + expectedTime;
+      String expectedTags = "cluster=\"" + clusterName + "\"";
+
+      //assert key
+      assertEquals("Invalid key found for record " + i,   expectedKey, key.getKey());
+      assertEquals("Invalid dataType found for record " + i, dataType, key.getReduceType());
+
+      //assert record
+      assertEquals("Invalid record time for record " + i, expectedTime, record.getTime());
+      assertEquals("Invalid body for record " + i, line, record.getValue("body"));
+      assertEquals("Invalid capp for record " + i, streamName, record.getValue("capp"));
+      assertEquals("Invalid csource for record " + i, expectedHostname, record.getValue("csource"));
+      assertEquals("Invalid ctags for record " + i, expectedTags , record.getValue("ctags").trim());
+
+      i++;
+    }
+
+    sequenceReader.close();
+    inputReader.close();
+  }
+
+}
\ No newline at end of file



Mime
View raw message