hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r685353 [9/13] - in /hadoop/core/trunk: ./ src/contrib/chukwa/ src/contrib/chukwa/bin/ src/contrib/chukwa/build/ src/contrib/chukwa/conf/ src/contrib/chukwa/dist/ src/contrib/chukwa/docs/ src/contrib/chukwa/docs/paper/ src/contrib/chukwa/ha...
Date Tue, 12 Aug 2008 22:35:23 GMT
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.test;
+
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
+import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
+import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
+import org.apache.hadoop.chukwa.datacollection.writer.ConsoleWriter;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+import java.io.*;
+import java.util.*;
+
+public class FileTailerStressTest {
+
+  static final int DELAY_MIN = 10*1000;
+  static final int DELAY_RANGE = 2* 1000;
+  
+  static class OccasionalWriterThread extends Thread
+  {
+    File file;
+    
+    OccasionalWriterThread(File f)  {
+      file = f;
+    }
+    
+    public void run()  {
+      try {
+      FileOutputStream fos = new FileOutputStream(file);
+      PrintWriter out = new PrintWriter(fos);
+      Random rand = new Random();
+      while(true) {
+        int delay = rand.nextInt( DELAY_RANGE ) + DELAY_MIN;
+        Thread.sleep(delay);
+        Date d = new Date();
+        out.println("some test data written at " + d.toString());
+        out.flush();
+      }
+      } catch(IOException e) {
+        e.printStackTrace();
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+    
+static int FILES_TO_USE  = 100;
+  /**
+   * @param args
+   */
+  public static void main(String[] args)
+  {
+    try{
+      Server server = new Server(9990);
+      Context root = new Context(server,"/",Context.SESSIONS);
+  
+      ServletCollector.setWriter(new ConsoleWriter(true));
+      root.addServlet(new ServletHolder(new ServletCollector()), "/*");
+      server.start();
+      server.setStopAtShutdown(false);
+  
+      Thread.sleep(1000);
+      ChukwaAgent agent = new ChukwaAgent();
+      HttpConnector connector = new HttpConnector(agent, "http://localhost:9990/chukwa");
+      connector.start();
+      
+      ChukwaConfiguration cc = new ChukwaConfiguration();
+      int portno = cc.getInt("chukwaAgent.control.port", 9093);
+      ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
+      
+
+      File workdir = new File("/tmp/stresstest/");
+      workdir.mkdir();
+      for(int i = 0; i < FILES_TO_USE; ++i) {
+        File newTestF = new File( "/tmp/stresstest/" + i);
+        
+        newTestF.deleteOnExit();
+        (new OccasionalWriterThread(newTestF)).start();
+        cli.addFile("test-lines", newTestF.getAbsolutePath());
+      }
+
+      Thread.sleep(60 * 1000);
+      System.out.println("cleaning up");
+      workdir.delete();
+    } catch(Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.test;
+
+import java.net.URI;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+
+public class SinkFileValidator {
+  
+  public static void main(String[] args)
+  {
+    String fsURL = "hdfs://localhost:9000";
+    String fname;
+    if(args.length < 1)
+    {
+      System.out.println("usage:  SinkFileValidator <filename> [filesystem URI] ");
+      System.exit(0);
+    }
+    fname = args[0];
+    if(args.length > 1)
+      fsURL = args[1];
+
+    Configuration conf = new Configuration();
+    try
+    {
+    FileSystem fs;
+    if(fsURL.equals("local"))
+      fs = FileSystem.getLocal(conf);
+    else
+       fs= FileSystem.get(new URI(fsURL), conf);
+    SequenceFile.Reader r= new SequenceFile.Reader(fs, new Path(fname), conf);
+    System.out.println("key class name is " + r.getKeyClassName());
+    System.out.println("value class name is " + r.getValueClassName());
+    
+    ChukwaArchiveKey key = new ChukwaArchiveKey();
+    ChunkImpl evt =  ChunkImpl.getBlankChunk();
+    int events = 0;
+    while(r.next(key, evt) &&  (events < 5))
+    {
+      if(!Writable.class.isAssignableFrom(key.getClass()))
+        System.out.println("warning: keys aren't writable");
+      
+      if(!Writable.class.isAssignableFrom(evt.getClass()))
+        System.out.println("warning: values aren't writable");
+      
+      if(evt.getData().length > 1000)
+      {
+        System.out.println("got event; data: " + new String(evt.getData(), 0, 1000));
+        System.out.println("....[truncating]");
+      }
+      else
+        System.out.println("got event; data: " + new String(evt.getData()));
+      events ++;
+    }
+    System.out.println("file looks OK!");
+    }
+    catch(Exception e)
+    {
+      e.printStackTrace();
+    }
+    
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+import java.io.IOException;
+
+import org.apache.hadoop.chukwa.Chunk;
+
+public interface ChukwaWriter
+{
+	void init() throws IOException;
+	void add(Chunk data) throws IOException;
+	void close();
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+import java.io.IOException;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.hadoop.chukwa.Chunk;
+
+public class ConsoleWriter implements ChukwaWriter {
+
+  boolean printData;
+  volatile long dataSize=0;
+  final Timer statTimer;
+  
+  
+  private class StatReportingTask extends TimerTask {
+    private long lastTs=System.currentTimeMillis();
+    private long lastDataSize=0;
+    public void run() {
+      long time =  System.currentTimeMillis();
+      long interval= time - lastTs;
+      lastTs = time;
+      
+      long ds = dataSize;
+      long dataRate =  1000 * (ds - lastDataSize) / interval;  //bytes/sec
+        //refers only to data field, not including http or chukwa headers
+      lastDataSize = ds;
+      
+      System.out.println("stat=datacollection.writer.ConsoleWriter|dataRate=" + dataRate );
+    }
+  };
+  
+  
+  public ConsoleWriter(boolean printData) {
+    this.printData = printData;
+    statTimer = new Timer();
+  }
+  
+  public void close()
+  {
+    statTimer.cancel();
+  }
+
+  public void init() throws IOException
+  {
+     System.out.println("----  DUMMY HDFS WRITER IN USE ---");
+
+     statTimer.schedule(new StatReportingTask(), 1000,10*1000);
+  }
+
+  public void add(Chunk data) throws IOException
+  {
+    int startOffset = 0;
+
+    dataSize += data.getData().length;
+    if(printData) {
+      System.out.println(data.getData().length + " bytes of data in chunk");
+      
+      for(int offset: data.getRecordOffsets()) {
+        System.out.print(data.getStreamName());
+        System.out.print(" ");
+        System.out.print(data.getSource());
+        System.out.print(" ");
+        System.out.print(data.getDataType());
+        System.out.print(") ");
+        System.out.print(new String(data.getData(), startOffset, offset - startOffset + 1));
+        startOffset= offset + 1;
+      }
+    }
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+import java.io.*;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.ChunkImpl;
+
+public class InMemoryWriter implements ChukwaWriter {
+
+  ByteArrayOutputStream buf;
+  
+  public void close() {
+    buf.reset();
+  }
+
+  public void init() throws IOException {
+    buf = new ByteArrayOutputStream();
+  }
+
+  public void add(Chunk data) throws IOException {
+    DataOutputStream dos = new DataOutputStream(buf);
+    data.write(dos);
+    synchronized(this) {
+      notify();
+    }
+  }
+  
+  DataInputStream dis = null;
+  /**
+   * Try to read bytes, waiting up to ms
+   * @param bytes amount to try to read
+   * @param ms  time to wait
+   * @return a newly read-in chunk
+   * @throws IOException
+   */
+  public Chunk readOutChunk(int bytes, int ms) throws IOException {
+
+    long readStartTime = System.currentTimeMillis();
+    try {
+      while(buf.size() < bytes )  {
+        synchronized(this) {
+          long timeLeft = ms - System.currentTimeMillis() + readStartTime;
+          if(timeLeft > 0)
+              wait(timeLeft);
+        }
+      }
+      if(dis == null)
+       dis = new DataInputStream( new ByteArrayInputStream(buf.toByteArray()));
+      
+      return ChunkImpl.read(dis);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return null;
+    }
+    
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+public interface PipelineStageWriter extends ChukwaWriter{
+	public void setNextStage(ChukwaWriter next);
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Calendar;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.log4j.Logger;
+
+/**
+ * This class <b>is</b> thread-safe -- rotate() and save() both synchronize on
+ * this object.
+ * 
+ */
+public class SeqFileWriter implements ChukwaWriter
+{
+	static Logger log = Logger.getLogger(SeqFileWriter.class);
+	public static final boolean ENABLE_ROTATION = true;
+
+	private FileSystem fs = null;
+	private ChukwaConfiguration conf = null;
+
+	private String outputDir = null;
+	private Calendar calendar = Calendar.getInstance();
+
+	private Path currentPath = null;
+	private String currentFileName = null;
+	private FSDataOutputStream currentOutputStr = null;
+	private static SequenceFile.Writer seqFileWriter = null;
+
+	private Timer timer = null;
+
+	private Timer statTimer = null;
+	private volatile long dataSize = 0;
+
+	public SeqFileWriter() throws IOException
+	{
+		conf = new ChukwaConfiguration(true);
+		init();
+	}
+
+	public void init() throws IOException
+	{
+		outputDir = conf.get("chukwaCollector.outputDir", "/chukwa");
+
+		int rotateInterval = conf.getInt("chukwaCollector.rotateInterval",
+				1000 * 60 * 5);//defaults to 5 minutes
+		//check if they've told us the file system to use
+    String fsname = conf.get("writer.hdfs.filesystem");
+    if (fsname == null || fsname.equals("")){
+      //otherwise try to get the filesystem from hadoop
+      fsname = conf.get("fs.default.name");
+    }
+		
+
+		log.info("rotateInterval is " + rotateInterval);
+		log.info("ENABLE_ROTATION is " + ENABLE_ROTATION);
+		log.info("outputDir is " + outputDir);
+		log.info("fsname is " + fsname);
+		log.info("filesystem type from hadoop-default.xml is "
+				+ conf.get("fs.hdfs.impl"));
+
+		if (fsname == null)
+		{
+			log.error("no filesystem name");
+			throw new IOException("no filesystem");
+		}
+		try
+		{
+			fs = FileSystem.get(new URI(fsname), conf);
+			if (fs == null)
+			{
+				log.error("can't connect to HDFS at " + fs.getUri());
+				return;
+			} else
+				log.info("filesystem is " + fs.getUri());
+		} catch (IOException e)
+		{
+			log.error(
+							"can't connect to HDFS, trying default file system instead (likely to be local)",
+							e);
+			try
+			{
+				fs = FileSystem.get(conf);
+			} catch (IOException err)
+			{
+				log.error("can't connect to default file system either", e);
+			}
+		} catch (URISyntaxException e)
+		{
+			log.error("problem generating new URI from config setting");
+			return;
+		}
+
+		calendar.setTimeInMillis(System.currentTimeMillis());
+		int minutes = calendar.get(Calendar.MINUTE);
+		// number of minutes at current time
+
+		int dec = minutes / 10; // 'tens' digit of current time
+
+		int m = minutes - (dec * 10); // 'units' digit
+		if (m < 5)
+		{
+			m = 5 - m;
+		} else
+		{
+			m = 10 - m;
+		}
+
+		log.info("Current date [" + calendar.getTime().toString()
+				+ "] next schedule [" + m + "]");
+		rotate();
+
+		timer = new Timer();
+
+		if (ENABLE_ROTATION)
+		{
+			log.info("sink rotation enabled, rotating every " + rotateInterval
+					+ " millis");
+			timer.schedule(new TimerTask()
+			{
+				public void run()
+				{
+					rotate();
+				}
+
+			}, Math.min(rotateInterval, m * 60 * 1000), rotateInterval);
+
+			statTimer = new Timer();
+		} else
+			log.warn("sink rotation is OFF!!");
+
+		statTimer.schedule(new StatReportingTask(), 1000, 60 * 1000);
+	}
+
+	private class StatReportingTask extends TimerTask
+	{
+		private long lastTs = System.currentTimeMillis();
+		private long lastDataSize = 0;
+
+		public void run()
+		{
+			long time = System.currentTimeMillis();
+			long interval = time - lastTs;
+			lastTs = time;
+
+			long ds = dataSize;
+			long dataRate = 1000 * (ds - lastDataSize) / interval; // kb/sec
+			lastDataSize = ds;
+
+			log.info("stat=datacollection.writer.hdfs|dataSize=" + dataSize);
+			log.info("stat=datacollection.writer.hdfs|dataRate=" + dataRate);
+		}
+	};
+
+	void rotate()
+	{
+		calendar.setTimeInMillis(System.currentTimeMillis());
+
+		log.info("start Date [" + calendar.getTime() + "]");
+		//granularity of rollover directory structure is hourly
+		String newDir = new java.text.SimpleDateFormat("yyyy_dd_HH")
+				.format(calendar.getTime());
+
+		log.info("Rotate from " + Thread.currentThread().getName());
+
+		Path newDirPath = new Path(outputDir + "/" + newDir);
+		log.info("Rotate directory[" + newDirPath.toString() + "]");
+		try
+		{
+			if (!fs.exists(newDirPath))
+			{
+				log.info("Create new directory:" + newDirPath.toString());
+				try
+				{
+					fs.mkdirs(newDirPath);
+				} catch (Exception e)
+				{
+					if (!fs.exists(newDirPath))
+					{
+						log.info("Failed to create new directory:"
+								+ newDirPath.toString() + "] ", e);
+					}
+				}
+			} else // use the existing directory, because we haven't hit a new hour yet
+			{
+				log.info("Rotate from [" + Thread.currentThread().getName()
+						+ "] directory (" + newDirPath + ") already exists.");
+
+			}
+	    String newName = new java.text.SimpleDateFormat("yyyy_dd_HH_mm_ss_SSS").format(calendar.getTime());
+	    newName += "_" + new java.rmi.server.UID().toString();
+	    newName = newName.replace("-", "");
+	    newName = newName.replace(":", "");
+	    newName = newName.replace(".", "");
+
+			newName = newDirPath + "/" + newName.trim();
+
+			Path newOutputPath = new Path(newName + ".chukwa");
+
+			FSDataOutputStream newOutputStr = fs.create(newOutputPath);
+			FSDataOutputStream previousOutputStr = null;
+			Path previousPath = null;
+			String previousFileName = null;
+
+			synchronized (this)
+			{
+				previousOutputStr = currentOutputStr;
+				previousPath = currentPath;
+				previousFileName = currentFileName;
+
+				currentOutputStr = newOutputStr;
+				currentPath = newOutputPath;
+				currentFileName = newName;
+				if (previousOutputStr != null)
+				{
+					previousOutputStr.close();
+					fs.rename(previousPath,
+							new Path(previousFileName + ".done"));
+				}
+
+				// Turn compression ON if the 5 mins archives are big
+				seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
+						ChukwaArchiveKey.class, ChunkImpl.class,
+						SequenceFile.CompressionType.NONE, null);
+			}
+		} catch (IOException e)
+		{
+			log.error("failed to do rotate", e);
+		}
+		log.debug("finished rotate()");
+	}
+
+	public synchronized void add(Chunk chunk) throws IOException
+	{
+		if (chunk != null)
+		{
+			try
+			{
+				assert chunk instanceof ChunkImpl : "bad input type";
+				ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
+
+				// FIXME compute this once an hour
+				synchronized (calendar)
+				{
+					calendar.setTimeInMillis(System.currentTimeMillis());
+					calendar.set(Calendar.MINUTE, 0);
+					calendar.set(Calendar.SECOND, 0);
+					calendar.set(Calendar.MILLISECOND, 0);
+
+					archiveKey.setTimePartition(calendar.getTimeInMillis());
+				}
+
+				archiveKey.setDataType(chunk.getDataType());
+				archiveKey.setStreamName(chunk.getStreamName());
+				archiveKey.setSeqId(chunk.getSeqID());
+
+				seqFileWriter.append(archiveKey, chunk);
+
+				dataSize += chunk.getData().length;
+				// currentOutput.sync(); //force file out to stable storage on
+				// the cluster.
+				// note that seqFileWriter.sync() would do something completely
+				// different
+			} catch (IOException e)
+			{
+				log.error(e.getMessage());
+				rotate();
+				throw e;
+			}
+		}
+	}
+
+	public void close()
+	{
+		synchronized (this)
+		{
+			try
+			{
+				this.currentOutputStr.close();
+				fs.rename(currentPath, new Path(currentFileName + ".done"));
+			} catch (IOException e)
+			{
+				log.error("failed to close and rename stream", e);
+			}
+		}
+	}
+
+	/*
+	 * public static class SeqFileKey implements
+	 * org.apache.hadoop.io.WritableComparable<SeqFileKey>{
+	 * 
+	 * public long seqID; public String streamName; public long
+	 * collectorTimestamp;
+	 * 
+	 * public SeqFileKey() {} // for use in deserializing
+	 * 
+	 * SeqFileKey(Chunk event) { seqID = event.getSeqID(); streamName =
+	 * event.getStreamName() + "_" + event.getSource(); collectorTimestamp =
+	 * System.currentTimeMillis(); }
+	 * 
+	 * public void readFields(DataInput in) throws IOException { seqID =
+	 * in.readLong(); streamName = in.readUTF(); collectorTimestamp =
+	 * in.readLong(); }
+	 * 
+	 * public void write(DataOutput out) throws IOException {
+	 * out.writeLong(seqID); out.writeUTF(streamName);
+	 * out.writeLong(collectorTimestamp); }
+	 * 
+	 * public int compareTo(SeqFileKey o) { int cmp =
+	 * streamName.compareTo(o.streamName); if(cmp == 0) { if(seqID < o.seqID)
+	 * return -1; else if (seqID == o.seqID) return 0; else return 1; } else
+	 * return cmp; }
+	 * 
+	 * public boolean equals(Object o) { return (o instanceof SeqFileKey) &&
+	 * (compareTo((SeqFileKey) o) == 0); }
+	 * 
+	 * public int hashCode() { return streamName.hashCode() ^ (int)(seqID >> 32) ^
+	 * (int) seqID; }
+	 *  }
+	 */
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/Consolidator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/Consolidator.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/Consolidator.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/Consolidator.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.chukwa.util.DatabaseWriter;
+
+import java.sql.*;
+import java.util.Calendar;
+import java.sql.ResultSetMetaData;
+import java.text.SimpleDateFormat;
+
+public class Consolidator extends Thread {
+	private Log log = LogFactory.getLog(Consolidator.class);
+	private String table = null;
+	private String jdbc = null;
+	private int[] intervals;
+	public Consolidator(String table, String intervalString) {
+		super(table);
+		try {
+			int i=0;
+			String[] temp = intervalString.split("\\,");
+			intervals = new int[temp.length];
+			for(String s: temp) {
+			    intervals[i]=Integer.parseInt(s);
+			    i++;
+			}
+			this.table = table;
+		} catch (NumberFormatException ex) {
+			log.error("Unable to parse summary interval");
+		}		
+	}
+	public void run() {
+		ResultSet rs = null;
+		String[] columns;
+		int[] columnsType;
+        String groupBy = "";
+        
+		for(int interval : intervals) {
+			// Start reducing from beginning of time;
+			Calendar aYearAgo = Calendar.getInstance();
+			aYearAgo.set(2008, 12, 30, 0, 0, 0);
+
+			long start = aYearAgo.getTimeInMillis();  //starting from 2008/01/01
+			long end = start + (interval*60000);
+		    log.debug("start time: "+start);
+		    log.debug("end time: "+end);
+			Calendar now = Calendar.getInstance();
+			DatabaseWriter db = new DatabaseWriter();
+			String fields = null;
+			String dateclause = null;
+			boolean emptyPrimeKey = false;
+			log.debug("Consolidate for "+interval+" minutes interval.");
+			String table = this.table+"_"+interval;
+			// Find the most recent entry
+			try {
+			    String query = "select * from "+table+" order by timestamp desc limit 1";
+	            log.debug("Query: "+query);
+	            rs = db.query(query);
+	            if(rs==null) {
+	          	    throw new SQLException("Table undefined.");
+	            }
+	            ResultSetMetaData rmeta = rs.getMetaData();
+	            boolean empty=true;
+	            if(rs.next()) {
+	                for(int i=1;i<=rmeta.getColumnCount();i++) {
+		                if(rmeta.getColumnName(i).toLowerCase().equals("timestamp")) {
+		            	    start = rs.getTimestamp(i).getTime();
+		                }
+	                }
+	                empty=false;
+	            }
+	            if(empty) {
+	              	throw new SQLException("Table is empty.");
+	            }
+                end = start + (interval*60000);
+			} catch (SQLException ex) {
+			    try {
+				    String query = "select * from "+this.table+" order by timestamp limit 1";
+		            log.debug("Query: "+query);
+	                rs = db.query(query);
+	                if(rs.next()) {
+	    	            ResultSetMetaData rmeta = rs.getMetaData();
+	    	            for(int i=1;i<=rmeta.getColumnCount();i++) {
+	    	                if(rmeta.getColumnName(i).toLowerCase().equals("timestamp")) {
+	    	                	start = rs.getTimestamp(i).getTime();
+	    	                }
+	    	            }
+				    }
+                    end = start + (interval*60000);
+				} catch(SQLException ex2) {
+				    log.error("Unable to determine starting point in table: "+this.table);
+					log.error("SQL Error:"+ExceptionUtil.getStackTrace(ex2));
+					return;
+				}
+			}
+			try {
+                ResultSetMetaData rmeta = rs.getMetaData();
+                int col = rmeta.getColumnCount();
+                columns = new String[col];
+                columnsType = new int[col];
+                for(int i=1;i<=col;i++) {
+            	    columns[i-1]=rmeta.getColumnName(i);
+              	    columnsType[i-1]=rmeta.getColumnType(i);
+                }
+
+		        for(int i=0;i<columns.length;i++) {
+		    	    if(i==0) {
+		    		    fields=columns[i];
+	    	            if(columnsType[i]==java.sql.Types.VARCHAR) {
+	    	            	groupBy = " group by "+columns[i];
+	    	            }
+		    	    } else {
+		    		    if(columnsType[i]==java.sql.Types.VARCHAR || columnsType[i]==java.sql.Types.TIMESTAMP) {
+		    	            fields=fields+","+columns[i];
+		    	            if(columnsType[i]==java.sql.Types.VARCHAR) {
+		    	            	groupBy = " group by "+columns[i];
+		    	            }
+		    		    } else {
+		    	            fields=fields+",AVG("+columns[i]+") as "+columns[i];
+		    		    }
+		    	    }
+		        }
+			} catch(SQLException ex) {
+			  	log.error("SQL Error:"+ExceptionUtil.getStackTrace(ex));
+			  	return;
+			}
+            if(groupBy.equals("")) {
+            	emptyPrimeKey = true;
+            }
+			long previousStart = start;
+        	while(end < now.getTimeInMillis()-(interval*2*60000)) {
+			    // Select new data sample for the given intervals
+			    if(interval == 5) {
+				    table=this.table;
+			    } else if(interval == 30) {
+				    table=this.table+"_5";				
+			    } else if(interval == 120) {
+				    table=this.table+"_30";
+			    }
+	            SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+			    String startS = formatter.format(start);
+			    String endS = formatter.format(end);
+			    dateclause = "Timestamp >= '"+startS+"' and Timestamp <= '"+endS+"'";
+			    if(emptyPrimeKey) {
+			    	groupBy = "group by "+dateclause;
+			    }
+				String query = "insert ignore into "+this.table+"_"+interval+" (select "+fields+" from "+table+" where "+dateclause+groupBy+")";
+				log.debug(query);
+                db.execute(query);
+                db.close();
+        		if(previousStart == start) {
+        			start = start + (interval*60000);
+        			end = start + (interval*60000);
+            		previousStart = start;
+        		}
+        	}
+		}
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChuckwaArchiveBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChuckwaArchiveBuilder.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChuckwaArchiveBuilder.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChuckwaArchiveBuilder.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.extraction.archive;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+public class ChuckwaArchiveBuilder extends Configured implements Tool
+{
+	static Logger log = Logger.getLogger(ChuckwaArchiveBuilder.class);
+
+	static int printUsage()
+	{
+		System.out
+				.println("ChuckwaArchiveBuilder <Daily/Hourly> <input> <output>");
+		ToolRunner.printGenericCommandUsage(System.out);
+		return -1;
+	}
+	
+	public int run(String[] args) throws Exception
+	{
+	
+
+		// Make sure there are exactly 3 parameters left.
+		if (args.length != 3)
+		{
+			System.out.println("ERROR: Wrong number of parameters: "
+					+ args.length + " instead of 3.");
+			return printUsage();
+		}
+	
+		JobConf conf = new JobConf(getConf(), ChuckwaArchiveBuilder.class);
+
+		
+		conf.setInputFormat(SequenceFileInputFormat.class);
+		
+		conf.setMapperClass(IdentityMapper.class);
+		conf.setReducerClass(IdentityReducer.class);
+		
+		if (args[0].equalsIgnoreCase("Daily"))
+		{
+			conf.setPartitionerClass(ChukwaArchiveDailyPartitioner.class);
+			conf.setOutputFormat(ChukwaArchiveDailyOutputFormat.class);
+			conf.setJobName("Chukwa-DailyArchiveBuilder");
+		}
+		else if (args[0].equalsIgnoreCase("Hourly"))
+		{
+			conf.setJobName("Chukwa-HourlyArchiveBuilder");
+			conf.setPartitionerClass(ChukwaArchiveHourlyPartitioner.class);
+			conf.setOutputFormat(ChukwaArchiveHourlyOutputFormat.class);			
+		}
+		else
+		{
+			System.out.println("ERROR: Wrong Time partionning: "
+					+ args[0] + " instead of [Hourly/Daily].");
+			return printUsage();
+		}
+
+		
+		conf.setOutputKeyClass(ChukwaArchiveKey.class);
+		conf.setOutputValueClass(ChunkImpl.class);
+				
+		//FIXME need compression - read config
+		//conf.setCompressMapOutput(true);
+		//conf.setMapOutputCompressorClass(LzoCodec.class);
+		
+		//
+		
+		FileInputFormat.setInputPaths(conf, args[1]);
+		FileOutputFormat.setOutputPath(conf, new Path(args[2]));
+
+		JobClient.runJob(conf);
+		return 0;
+	}
+
+	public static void main(String[] args) throws Exception
+	{
+		int res = ToolRunner.run(new Configuration(),
+				new ChuckwaArchiveBuilder(), args);
+		System.exit(res);
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyOutputFormat.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyOutputFormat.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyOutputFormat.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.archive;
+
+
+import java.text.SimpleDateFormat;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
+import org.apache.log4j.Logger;
+
+public class ChukwaArchiveDailyOutputFormat extends MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl>
+{
+	static Logger log = Logger.getLogger(ChukwaArchiveDailyOutputFormat.class);
+	SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
+	
+	@Override
+	protected String generateFileNameForKeyValue(ChukwaArchiveKey key, ChunkImpl chunk,
+			String name)
+	{
+		
+		if (log.isDebugEnabled())
+			{log.debug("ChukwaArchiveOutputFormat.fileName: " + sdf.format(key.getTimePartition()));}
+		return sdf.format(key.getTimePartition()) + ".arc";
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyPartitioner.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyPartitioner.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyPartitioner.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.archive;
+
+import java.text.SimpleDateFormat;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+
+public class ChukwaArchiveDailyPartitioner<K, V> 
+	implements Partitioner<ChukwaArchiveKey,ChunkImpl>
+{
+	SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
+	
+	public void configure(JobConf arg0)
+	{}
+
+	public int getPartition(ChukwaArchiveKey key,ChunkImpl chunl, int numReduceTasks)
+	{
+		
+		 return (sdf.format(key.getTimePartition()).hashCode() & Integer.MAX_VALUE) % numReduceTasks;
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyOutputFormat.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyOutputFormat.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyOutputFormat.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.archive;
+
+
+import java.text.SimpleDateFormat;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
+import org.apache.log4j.Logger;
+
+public class ChukwaArchiveHourlyOutputFormat extends MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl>
+{
+	static Logger log = Logger.getLogger(ChukwaArchiveHourlyOutputFormat.class);
+	SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd_HH-00");
+	
+	@Override
+	protected String generateFileNameForKeyValue(ChukwaArchiveKey key, ChunkImpl chunk,
+			String name)
+	{
+		
+		if (log.isDebugEnabled())
+			{log.debug("ChukwaArchiveOutputFormat.fileName: " + sdf.format(key.getTimePartition()));}
+		return sdf.format(key.getTimePartition()) + ".arc";
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyPartitioner.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyPartitioner.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveHourlyPartitioner.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.archive;
+
+import java.text.SimpleDateFormat;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+
+public class ChukwaArchiveHourlyPartitioner<K, V> 
+	implements Partitioner<ChukwaArchiveKey,ChunkImpl>
+{
+	SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd_HH-00");
+	
+	public void configure(JobConf arg0)
+	{}
+
+	public int getPartition(ChukwaArchiveKey key,ChunkImpl chunl, int numReduceTasks)
+	{
+		
+		 return (sdf.format(key.getTimePartition()).hashCode() & Integer.MAX_VALUE) % numReduceTasks;
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveMerger.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveMerger.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveMerger.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveMerger.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,6 @@
+package org.apache.hadoop.chukwa.extraction.archive;
+
+public class ChukwaArchiveMerger
+{
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DataExpiration.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DataExpiration.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DataExpiration.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DataExpiration.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,5 @@
+package org.apache.hadoop.chukwa.extraction.database;
+
+public class DataExpiration {
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseHelper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseHelper.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseHelper.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseHelper.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,73 @@
+package org.apache.hadoop.chukwa.extraction.database;
+
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.Record;
+
+public class DatabaseHelper
+{
+	private static final String databaseRecordType = "Database/Database_";	
+	static final String sqlField = "sql";	
+	private String table = null;
+	private HashMap<String, Object> pairs = new HashMap<String, Object>();
+	private ArrayList<String> array = null;
+	
+	public DatabaseHelper()
+	{
+	    array = new ArrayList<String>();
+	}
+
+	public DatabaseHelper(String table) {
+		this.table = table;
+	}
+
+	public void add(HashMap<String, Object> pairs) {
+		Iterator<String> ki = pairs.keySet().iterator();
+		while(ki.hasNext()) {
+			String keyName = ki.next();
+			Object value = pairs.get(keyName);
+			if(value.getClass().getName().equals("String")) {
+				array.add(keyName+"=\""+value+"\"");
+			} else {
+				array.add(keyName+"="+value);
+			}
+		}
+	}
+	
+	public void add(long time, String key, String value) {
+        array.add("timestamp="+time+","+key+"=\""+value+"\"");
+	}
+
+	public void add(long time, String key, double value) {
+			array.add("timestamp="+time+","+key+"="+value);
+	}
+
+	
+	public String toString() {
+		StringBuffer batch = new StringBuffer();
+		StringBuffer keyValues = new StringBuffer();
+		for(int i=0;i<array.size();i++) {
+			keyValues.append(array.get(i));
+			batch.append("INSERT INTO ");
+			batch.append(table);
+			batch.append(" SET ");
+			batch.append(keyValues);
+			batch.append(" ON DUPLICATE KEY UPDATE ");
+			batch.append(keyValues);
+		}		
+        return batch.toString();
+	}
+	public ChukwaRecord buildChukwaRecord()	{
+		ChukwaRecord chukwaRecord = new ChukwaRecord();
+		StringBuilder queries = new StringBuilder();
+		
+		chukwaRecord.add(sqlField, this.toString());
+		chukwaRecord.add(Record.destinationField, DatabaseHelper.databaseRecordType + table + ".evt");
+		chukwaRecord.add(Record.dataSourceField, DatabaseHelper.databaseRecordType + table );
+		return chukwaRecord;
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseLoader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseLoader.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseLoader.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseLoader.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,90 @@
+package org.apache.hadoop.chukwa.extraction.database;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+
+public class DatabaseLoader
+{
+
+	private static Log log = LogFactory.getLog(DatabaseLoader.class);
+
+	/**
+	 * @param args
+	 * @throws URISyntaxException
+	 * @throws IOException
+	 */
+	public static void main(String[] args) throws IOException,
+			URISyntaxException
+	{
+		System.out.println("Input directory:" + args[0]);
+
+		ChukwaConfiguration conf = new ChukwaConfiguration();
+		String fsName = conf.get("writer.hdfs.filesystem");
+		FileSystem fs = FileSystem.get(new URI(fsName), conf);
+
+		Path srcDir = new Path(args[0]);
+		FileStatus fstat = fs.getFileStatus(srcDir);
+
+		if (!fstat.isDir())
+		{
+			throw new IOException(args[0] + " is not a directory!");
+		} else
+		{
+			FileStatus[] datasourceDirectories = fs.listStatus(srcDir,new EventFileFilter());
+			for (FileStatus datasourceDirectory : datasourceDirectories)
+			{
+
+				// rename working file
+				String databaseInputFilename = datasourceDirectory.getPath()
+						.getName();
+				
+				Path inProgressdatabaseInputFilePath = new Path(databaseInputFilename
+						+ "." + System.currentTimeMillis() + ".pgs");
+				
+				// Maybe the file has already been processed by another loader
+				if (fs.exists(datasourceDirectory.getPath()))
+				{
+					fs.rename(datasourceDirectory.getPath(), inProgressdatabaseInputFilePath);
+
+					SequenceFile.Reader r = new SequenceFile.Reader(fs,datasourceDirectory.getPath(), conf);
+					Text key = new Text();
+					ChukwaRecord record = new ChukwaRecord();
+					try
+					{
+						while (r.next(key, record))
+						{
+							System.out.println(record.getValue(DatabaseHelper.sqlField));
+						} // End while(r.next(key, databaseRecord) )
+					} // end Try
+					catch (Exception e)
+					{
+						log.error("Unable to insert data into database"
+								+ e.getMessage());
+						e.printStackTrace();
+					} 
+					
+				}
+			} // End for(FileStatus datasourceDirectory :datasourceDirectories)
+		} // End Else
+	}
+}
+
+class EventFileFilter implements PathFilter
+{
+	  public boolean accept(Path path) 
+	  {
+	    return (path.toString().endsWith(".evt"));
+	  }
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,26 @@
+package org.apache.hadoop.chukwa.extraction.database;
+
+import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
+import org.apache.hadoop.chukwa.util.DatabaseWriter;
+
+public class MetricDataLoader {
+	
+	private DatabaseWriter db = null;
+	private DataConfig dc = new DataConfig();
+	
+	public MetricDataLoader() {
+		db = new DatabaseWriter();
+	}
+	public void load(String src) {
+		
+	}
+	public void save() {
+		
+	}
+	public void process() {
+		// open hdfs files and process them.
+		String dfs = dc.get("chukwa.engine.dsDirectory.rootFolder");
+		load(dfs);
+		save();
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux;
+
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.Record;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
+import org.apache.log4j.Logger;
+
+public class ChukwaRecordOutputFormat extends MultipleSequenceFileOutputFormat<Text, ChukwaRecord>
+{
+	static Logger log = Logger.getLogger(ChukwaRecordOutputFormat.class);
+
+	@Override
+	protected String generateFileNameForKeyValue(Text key, ChukwaRecord record,
+			String name)
+	{
+		if (log.isDebugEnabled())
+			{log.debug("ChukwaOutputFormat.fileName: " +record.getValue(Record.destinationField));}
+		return record.getValue(Record.destinationField);
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordPartitioner.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordPartitioner.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordPartitioner.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux;
+
+import org.apache.hadoop.chukwa.extraction.engine.Record;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.log4j.Logger;
+
+public class ChukwaRecordPartitioner<Text, ChukwaRecord> implements Partitioner<org.apache.hadoop.io.Text, org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord>
+{
+	static Logger log = Logger.getLogger(ChukwaRecordPartitioner.class);
+	public void configure(JobConf arg0)
+	{}
+
+	public int getPartition(org.apache.hadoop.io.Text key,
+			org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord record, int numReduceTasks)
+	{
+		if (!record.containsField(Record.dataSourceField))
+		{
+			throw new RuntimeException("PartitionerField not set!");
+		}
+		//FIXME remove before production or set to debug level
+		log.info("Partitioner key: [" + record.getValue(Record.dataSourceField) + "] - Reducer:" + ( (record.getValue(Record.dataSourceField).hashCode() & Integer.MAX_VALUE) % numReduceTasks));
+		
+		return (record.getValue(Record.dataSourceField).hashCode() & Integer.MAX_VALUE) % numReduceTasks;
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.ProcessorFactory;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+public class Demux extends Configured implements Tool
+{
+	static Logger log = Logger.getLogger(Demux.class);
+	
+	public static class MapClass extends MapReduceBase implements
+			Mapper<ChukwaArchiveKey, ChunkImpl , Text, ChukwaRecord>
+	{
+		
+		public void map(ChukwaArchiveKey key, ChunkImpl chunk,
+				OutputCollector<Text, ChukwaRecord> output, Reporter reporter)
+				throws IOException
+		{
+			try {
+				log.info("Entry: ["+ chunk.getData() + "] EventType: [" + chunk.getDataType() + "]");
+				
+				ProcessorFactory.getProcessor(chunk.getDataType()).process(chunk, output, reporter);
+			} catch(Exception e) {
+				e.printStackTrace();
+			}
+		}
+	}
+
+	static int printUsage() {
+		System.out
+				.println("Demux [-m <maps>] [-r <reduces>] <input> <output>");
+		ToolRunner.printGenericCommandUsage(System.out);
+		return -1;
+	}
+
+	public int run(String[] args) throws Exception
+	{
+		JobConf conf = new JobConf(getConf(), Demux.class);
+
+		conf.setJobName("Chukwa-Demux");
+		conf.setInputFormat(SequenceFileInputFormat.class);
+		conf.setMapperClass(Demux.MapClass.class);
+		conf.setPartitionerClass(ChukwaRecordPartitioner.class);
+		conf.setReducerClass(IdentityReducer.class);
+
+		conf.setOutputKeyClass(Text.class);
+		conf.setOutputValueClass(ChukwaRecord.class);
+		conf.setOutputFormat(ChukwaRecordOutputFormat.class);
+		//
+		
+		List<String> other_args = new ArrayList<String>();
+		for (int i = 0; i < args.length; ++i) {
+			try {
+				if ("-m".equals(args[i])) {
+					conf.setNumMapTasks(Integer.parseInt(args[++i]));
+				} else if ("-r".equals(args[i])) {
+					conf.setNumReduceTasks(Integer.parseInt(args[++i]));
+				} else 	{
+					other_args.add(args[i]);
+				}
+			} catch (NumberFormatException except) {
+				System.out.println("ERROR: Integer expected instead of "
+						+ args[i]);
+				return printUsage();
+			} catch (ArrayIndexOutOfBoundsException except) {
+				System.out.println("ERROR: Required parameter missing from "
+						+ args[i - 1]);
+				return printUsage();
+			}
+		}
+		// Make sure there are exactly 2 parameters left.
+		if (other_args.size() != 2) {
+			System.out.println("ERROR: Wrong number of parameters: "
+					+ other_args.size() + " instead of 2.");
+			return printUsage();
+		}
+		
+		FileInputFormat.setInputPaths(conf, other_args.get(0));
+		FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
+
+		JobClient.runJob(conf);
+		return 0;
+	}
+
+	public static void main(String[] args) throws Exception {
+		int res = ToolRunner.run(new Configuration(),
+				new Demux(), args);
+		System.exit(res);
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveOrMergeRecordFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveOrMergeRecordFile.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveOrMergeRecordFile.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveOrMergeRecordFile.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class MoveOrMergeRecordFile extends Configured implements Tool
+{
+	public int run(String[] args) throws Exception
+	{
+		JobConf conf = new JobConf(getConf(), MoveOrMergeRecordFile.class);
+
+		conf.setJobName("Chukwa-MoveOrMergeLogFile");
+		conf.setInputFormat(SequenceFileInputFormat.class);
+		
+		conf.setMapperClass(IdentityMapper.class);
+		conf.setReducerClass(IdentityReducer.class);
+		
+		//conf.setPartitionerClass(ChukwaPartitioner.class);
+		//conf.setOutputFormat(ChukwaOutputFormat.class);
+		
+		conf.setOutputKeyClass(Text.class);
+		conf.setOutputValueClass(ChukwaRecord.class);
+		
+		
+		FileInputFormat.setInputPaths(conf, args[0]);
+		FileOutputFormat.setOutputPath(conf, new Path(args[1]));
+
+		JobClient.runJob(conf);
+		return 0;
+	}
+
+	/**
+	 * @param args
+	 * @throws Exception 
+	 */
+	public static void main(String[] args) throws Exception
+	{
+		ChukwaConfiguration conf = new ChukwaConfiguration();
+		String fsName = conf.get("writer.hdfs.filesystem");
+		FileSystem fs = FileSystem.get(new URI(fsName), conf);
+		Path srcDir = new Path(args[0]);
+		String destDir = args[1];
+		
+		FileStatus fstat = fs.getFileStatus(srcDir);
+		
+		if (!fstat.isDir())
+		{
+			throw new IOException(args[0] + " is not a directory!");
+		}
+		else
+		{
+			FileStatus[] datasourceDirectories = fs.listStatus(srcDir);
+			for(FileStatus datasourceDirectory : datasourceDirectories)
+			{
+				System.out.println(datasourceDirectory.getPath() + " isDir?" +datasourceDirectory.isDir());
+				if (!datasourceDirectory.isDir())
+				{
+					throw new IOException("Top level should just contains directories :" + datasourceDirectory.getPath());
+				}
+				
+				String dirName = datasourceDirectory.getPath().getName();
+				
+				if (dirName.equals("_logs"))
+				{
+					continue;
+				}
+				
+				Path destPath = new Path(destDir + "/" + dirName);
+				System.out.println("dest directory path: " + destPath);
+				
+				if (!fs.exists(destPath))
+				 {
+					System.out.println("create datasource directory [" + destDir + "/" + dirName + "]");
+					fs.mkdirs(destPath);
+				 }
+				
+				FileStatus[] evts = fs.listStatus(datasourceDirectory.getPath(),new EventFileFilter());
+				for(FileStatus eventFile : evts)	
+				{
+
+					Path eventFilePath = eventFile.getPath();
+					String filename = eventFilePath.getName();
+					System.out.println("src dir File: ["+  filename+"]");					
+					Path destFilePath = new Path(destDir + "/" + dirName + "/" + filename);
+					if (!fs.exists(destFilePath))
+					{
+						System.out.println("Moving File: [" + destFilePath +"]");
+						fs.rename(eventFilePath, destFilePath);
+					}
+					else
+					{
+						System.out.println("Need to merge! : [" + destFilePath +"]");
+						String strMrPath = datasourceDirectory.getPath().toString()+ "/" + "MR_" + System.currentTimeMillis();
+						Path mrPath = new Path(strMrPath);
+						System.out.println("\t New MR directory : [" + mrPath +"]");
+						// Create MR input Dir
+						fs.mkdirs(mrPath);
+						// Move Input files 
+						fs.rename(eventFilePath, new Path(strMrPath+"/1.done"));
+						fs.rename(destFilePath, new Path(strMrPath+"/2.done"));
+						
+						// Merge
+						String[] mergeArgs = new String[2];
+						mergeArgs[0] = strMrPath;
+						mergeArgs[1] = strMrPath + "/mrOutput";
+						System.out.println("\t Running Merge! : output [" + mergeArgs[1] +"]");
+						int res = ToolRunner.run(new ChukwaConfiguration(),new MoveOrMergeRecordFile(), mergeArgs);
+						System.out.println("MR exit status: " + res);
+						if (res == 0)
+						{
+							System.out.println("\t Moving output file : to [" + destFilePath +"]");
+							fs.rename(new Path(mergeArgs[1]+"/part-00000"), destFilePath);
+						}
+						else
+						{
+							throw new RuntimeException("Error in M/R merge operation!");
+						}
+					}
+				}
+			}
+		}
+		System.out.println("Done with mapred main()");
+	}
+}
+
+class EventFileFilter implements PathFilter {
+	  public boolean accept(Path path) {
+	    return (path.toString().endsWith(".evt"));
+	  }
+	}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/TaggerPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/TaggerPlugin.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/TaggerPlugin.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/TaggerPlugin.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux;
+
+import org.apache.hadoop.chukwa.extraction.engine.Record;
+
+public interface TaggerPlugin
+{
+	public void tag(String line, Record record);
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.util.Calendar;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.Record;
+import org.apache.hadoop.chukwa.util.RecordConstants;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+public abstract class  AbstractProcessor implements ChunkProcessor
+{
+	Calendar calendar = Calendar.getInstance();
+	Chunk chunk = null;
+	byte[] bytes;
+	int[] recordOffsets ;
+	int currentPos = 0;
+	int startOffset = 0;
+	Text key = new Text();
+	
+	public AbstractProcessor()
+	{}
+	
+	protected abstract void parse(String recordEntry, OutputCollector<Text, ChukwaRecord> output, Reporter reporter);
+	
+	
+	public void process(Chunk chunk,OutputCollector<Text, ChukwaRecord> output, Reporter reporter)	{
+		reset(chunk);
+		while (hasNext()) {
+			parse(nextLine(), output, reporter);
+		}
+	}
+	
+	
+	protected void buildGenericRecord(ChukwaRecord record, String body,long timestamp,String dataSource)	{
+		calendar.setTimeInMillis( timestamp);
+		String fileName = dataSource + "/" + dataSource + new java.text.SimpleDateFormat("_yyyy_MM_dd_HH").format(calendar.getTime());
+		int minutes = calendar.get(Calendar.MINUTE);
+		int dec = minutes/10;
+		fileName += "_" + dec ;
+		
+		int m = minutes - (dec*10);
+		if (m < 5) { 
+		  fileName += "0.evt";
+		} else {
+		  fileName += "5.evt";
+		}
+
+		record.setTime(timestamp);
+		record.add(Record.rawField, body);
+		record.add(Record.dataSourceField, dataSource);
+		record.add(Record.destinationField, fileName);
+		record.add(Record.sourceField, chunk.getSource());
+		record.add(Record.streamNameField, chunk.getStreamName());
+		record.add(Record.typeField, chunk.getDataType());
+	}
+
+	
+	protected void reset(Chunk chunk)	{
+		this.chunk = chunk;
+		this.bytes = chunk.getData();
+		this.recordOffsets = chunk.getRecordOffsets();
+		currentPos = 0;
+		startOffset = 0;
+	}
+	
+	protected boolean hasNext() {
+		return (currentPos < recordOffsets.length);
+	}
+	
+	protected String nextLine()	{
+		String log = new String(bytes,startOffset,(recordOffsets[currentPos]-startOffset));
+		startOffset = recordOffsets[currentPos] + 1;
+		currentPos ++;
+		return RecordConstants.recoverRecordSeparators("\n", log);
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkProcessor.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkProcessor.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+public interface ChunkProcessor
+{
+	public String getDataType();
+	public void process(Chunk chunk,OutputCollector<Text, ChukwaRecord> output, Reporter reporter);
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,30 @@
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+public class DFInvalidRecord extends Exception
+{
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 1254238125122522523L;
+
+	public DFInvalidRecord()
+	{
+	}
+
+	public DFInvalidRecord(String arg0)
+	{
+		super(arg0);
+	}
+
+	public DFInvalidRecord(Throwable arg0)
+	{
+		super(arg0);
+	}
+
+	public DFInvalidRecord(String arg0, Throwable arg1)
+	{
+		super(arg0, arg1);
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFProcessor.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFProcessor.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,113 @@
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.chukwa.extraction.database.DatabaseHelper;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class DFProcessor extends AbstractProcessor
+{
+	static Logger log = Logger.getLogger(DFProcessor.class);
+	private static final String[] columns = {"Filesystem:","1K-blocks:","Used:","Available:","Use%:","MountedOn:"};
+	private static final String[] headerCols = {"Filesystem","1K-blocks","Used","Available","Use%","Mounted on"};
+	
+	private static String regex= null;
+	private static Pattern p = null;
+	private Matcher matcher = null;
+	private SimpleDateFormat sdf = null;
+	
+	public DFProcessor()
+	{
+		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+		regex="([0-9]{4}\\-[0-9]{2}\\-[0-9]{2} [0-9]{2}\\:[0-9]{2}:[0-9]{2},[0-9]{3}) (INFO|DEBUG|ERROR|WARN) (.*?): (.*)";
+		p = Pattern.compile(regex);
+		matcher = p.matcher("-");
+	}
+
+	@Override
+	protected void parse(String recordEntry,
+			OutputCollector<Text, ChukwaRecord> output, Reporter reporter)
+	{
+		matcher.reset(recordEntry);
+		if (matcher.matches())
+		{
+			log.info("PbsNodeProcessor Matches");
+			
+			try
+			{
+				Date d = sdf.parse( matcher.group(0).trim());
+				
+//				String logLevel = matcher.group(2);
+//				String className = matcher.group(3);
+				String body = matcher.group(4);
+			
+				String[] lines = body.split("\n");
+				
+				
+				String[] outputCols = lines[0].split("[\\s]++");
+				
+				if (outputCols.length!=6 && 
+					outputCols[0].intern() != headerCols[0].intern() &&
+					outputCols[1].intern() != headerCols[1].intern() &&
+					outputCols[2].intern() != headerCols[2].intern() &&
+					outputCols[3].intern() != headerCols[3].intern() &&
+					outputCols[4].intern() != headerCols[4].intern() &&
+					outputCols[5].intern() != headerCols[5].intern() )
+				{
+					throw new DFInvalidRecord("Wrong output format (header) [" + recordEntry + "]");
+				}
+				
+				String[] values = null;
+				// database (long timestamp,String table,String tag,int tableType,int sqlType)
+				DatabaseHelper databaseRecord = new DatabaseHelper("system");
+				
+				// Data
+				//databaseRecord.addKey("Used",""+totalUsedNode);
+				
+				for (int i=1;i<lines.length;i++)
+				{
+					values = lines[i].split("[\\s]++");
+					databaseRecord.add(d.getTime(),values[0]+"."+columns[1], values[1]);
+					databaseRecord.add(d.getTime(),values[0]+"."+columns[2], values[2]);
+					databaseRecord.add(d.getTime(),values[0]+"."+columns[3], values[3]);
+					databaseRecord.add(d.getTime(),values[0]+"."+columns[4], values[4]);
+					databaseRecord.add(d.getTime(),values[0]+"."+columns[5], values[5]);
+				}
+				//Output DF info to database
+				output.collect(key, databaseRecord.buildChukwaRecord());
+				log.info("DFProcessor output 1 DF to database");
+			}
+			catch (ParseException e)
+			{
+				e.printStackTrace();
+				log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e);
+			}
+			catch (IOException e)
+			{
+				e.printStackTrace();
+				log.warn("Unable to collect output in DFProcessor [" + recordEntry + "]", e);
+			}
+			catch (DFInvalidRecord e)
+			{
+				e.printStackTrace();
+				log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e);
+			}
+	}
+	}
+
+	public String getDataType()
+	{
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+}



Mime
View raw message