hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r723855 [6/23] - in /hadoop/core/trunk: ./ src/contrib/ src/contrib/chukwa/ src/contrib/chukwa/bin/ src/contrib/chukwa/conf/ src/contrib/chukwa/docs/ src/contrib/chukwa/docs/paper/ src/contrib/chukwa/hadoop-packaging/ src/contrib/chukwa/lib...
Date Fri, 05 Dec 2008 20:30:21 GMT
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java Fri Dec  5 12:30:14 2008
@@ -22,6 +22,8 @@
 import java.net.URL;
 import java.util.*;
 
+import org.apache.hadoop.conf.Configuration;
+
 /***
  * An iterator returning a list of Collectors to try.
  * This class is nondeterministic, since it puts collectors back on the list after some period.
@@ -29,6 +31,7 @@
  * No node will be polled more than once per maxRetryRateMs milliseconds. hasNext() will continue return
  * true if you have not called it recently.
  *
+ *
  */
 public class RetryListOfCollectors implements Iterator<String> {
 
@@ -36,25 +39,39 @@
   List<String> collectors;
   long lastLookAtFirstNode;
   int nextCollector=0;
+  private String portNo; 
+  Configuration conf;
   
-
   public RetryListOfCollectors(File collectorFile, int maxRetryRateMs) throws IOException {
     this.maxRetryRateMs = maxRetryRateMs;
     lastLookAtFirstNode = 0;
     collectors = new ArrayList<String>();
+    conf = new Configuration();
+    portNo = conf.get("chukwaCollector.http.port","8080");
     
     try{
       BufferedReader br  = new BufferedReader(new FileReader(collectorFile));
       String line;
       while((line = br.readLine()) != null) {
-        if(!line.contains("://")) //no protocol, assume http
-          collectors.add("http://"+line);
-        else
-          collectors.add(line);
+        if(!line.contains("://")) { 
+        	//no protocol, assume http
+        	if(line.matches(":\\d+")) {
+                collectors.add("http://" + line);
+        	} else {
+                collectors.add("http://" + line + ":" + portNo + "/");
+        	}
+        } else {
+        	if(line.matches(":\\d+")) {
+                collectors.add(line);
+        	} else {
+                collectors.add(line + ":" + portNo + "/");
+        	}
+        	collectors.add(line);
+        }
       }
       br.close();
     }catch(FileNotFoundException e){
-      System.err.println("Error in RetryListOfCollectors() opening file conf/connectors file from agent, double check that you have set the CHUKWA_HOME environment variable. Also, ensure file exists and is in classpath");
+      System.err.println("Error in RetryListOfCollectors() opening file: collectors, double check that you have set the CHUKWA_CONF_DIR environment variable. Also, ensure file exists and is in classpath");
     }catch(IOException e){
       System.err.println("I/O error in RetryListOfcollectors instantiation in readLine() from specified collectors file");
       throw e;

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java Fri Dec  5 12:30:14 2008
@@ -88,4 +88,10 @@
     this.interrupt();
   }
 
+@Override
+public void reloadConfiguration()
+{
+	System.out.println("reloadConfiguration");
+}
+
 }

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java Fri Dec  5 12:30:14 2008
@@ -18,14 +18,15 @@
 
 package org.apache.hadoop.chukwa.datacollection.writer;
 
-import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.chukwa.Chunk;
 
 public interface ChukwaWriter
 {
-	void init() throws IOException;
-	void add(Chunk data) throws IOException;
-	void close();
+	public void init() throws WriterException;
+	public void add(Chunk data) throws WriterException;
+	public void add(List<Chunk> chunks) throws WriterException;
+	public void close() throws WriterException;;
 
 }

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ClientAck.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ClientAck.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ClientAck.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ClientAck.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+import org.apache.log4j.Logger;
+
+public class ClientAck
+{
+	static Logger log = Logger.getLogger(ClientAck.class);
+	
+	// TODO move all constant to config
+	
+	public static final int OK = 100;
+	public static final int KO = -100;
+	public static final int KO_LOCK = -200;
+	
+	private long ts = 0;
+	
+	private Object lock = new Object();
+	private int status = 0;
+	private Throwable exception = null;
+	private int waitTime = 6*1000;// 6 secs
+	private int timeOut = 15*1000;
+	
+	public ClientAck()
+	{
+		this.ts = System.currentTimeMillis() + timeOut;
+	}
+	
+  public int getTimeOut()
+  {
+    return timeOut;
+  }
+
+  public void wait4Ack()
+	{
+		synchronized(lock)
+		{
+//			log.info(">>>>>>>>>>>>>>>>>>>>>>>>> Client synch");
+			while (this.status == 0)
+			{
+//				log.info(">>>>>>>>>>>>>>>>>>>>>>>>> Client Before wait");
+				try { lock.wait(waitTime);}
+				catch(InterruptedException e)
+				{}
+				long now = System.currentTimeMillis();
+				if (now > ts)
+				{
+					this.status = KO_LOCK;
+					this.exception = new RuntimeException("More than maximum time lock [" + this.toString() +"]");
+				}
+			}
+//			log.info("[" + Thread.currentThread().getName() + "] >>>>>>>>>>>>>>>>> Client after wait status [" + status +  "] [" + this.toString() + "]");
+		}
+	}
+
+	public void releaseLock(int status, Throwable exception)
+	{
+		this.exception = exception;
+		this.status = status;
+		
+//		log.info("[" + Thread.currentThread().getName() + "] <<<<<<<<<<<<<<<<< Server synch [" + status + "] ----->>>> [" + this.toString() + "]");
+		synchronized(lock)
+		{		
+//			log.info("<<<<<<<<<<<<<<< Server before notify");
+			lock.notifyAll();
+		}
+//		log.info("<<<<<<<<<<<<<<< Server after notify");
+	}
+	
+	public int getStatus()
+	{
+		return status;
+	}
+
+	public void setStatus(int status)
+	{
+		this.status = status;
+	}
+
+	public Throwable getException()
+	{
+		return exception;
+	}
+
+	public void setException(Throwable exception)
+	{
+		this.exception = exception;
+	}
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java Fri Dec  5 12:30:14 2008
@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.chukwa.datacollection.writer;
 
-import java.io.IOException;
+import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
 
@@ -59,14 +59,14 @@
     statTimer.cancel();
   }
 
-  public void init() throws IOException
+  public void init() throws WriterException
   {
      System.out.println("----  DUMMY HDFS WRITER IN USE ---");
 
      statTimer.schedule(new StatReportingTask(), 1000,10*1000);
   }
 
-  public void add(Chunk data) throws IOException
+  public void add(Chunk data) throws WriterException
   {
     int startOffset = 0;
 
@@ -87,4 +87,14 @@
     }
   }
 
+@Override
+public void add(List<Chunk> chunks) throws WriterException
+{
+	for(Chunk chunk: chunks)
+	{
+		add(chunk);
+	}
+	
+}
+
 }

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java Fri Dec  5 12:30:14 2008
@@ -18,6 +18,7 @@
 package org.apache.hadoop.chukwa.datacollection.writer;
 
 import java.io.*;
+import java.util.List;
 
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.ChunkImpl;
@@ -25,29 +26,44 @@
 public class InMemoryWriter implements ChukwaWriter {
 
   ByteArrayOutputStream buf;
-  
+
   public void close() {
     buf.reset();
   }
 
-  public void init() throws IOException {
+  public void init() throws WriterException {
     buf = new ByteArrayOutputStream();
   }
 
-  public void add(Chunk data) throws IOException {
+  public void add(Chunk data) throws WriterException {
     DataOutputStream dos = new DataOutputStream(buf);
-    data.write(dos);
-    synchronized(this) {
+    try {
+      data.write(dos);
+    } catch (IOException e) {
+      e.printStackTrace();
+      throw new WriterException(e);
+    }
+    synchronized (this) {
       notify();
     }
   }
-  
+
+  @Override
+  public void add(List<Chunk> chunks) throws WriterException {
+    for (Chunk chunk : chunks) {
+      add(chunk);
+    }
+
+  }
+
   DataInputStream dis = null;
+
   /**
    * Try to read bytes, waiting up to ms
+   * 
    * @param bytes amount to try to read
-   * @param ms  time to wait
-   * @return a newly read-in chunk
+   * @param ms time to wait
+  * @return a newly read-in chunk
    * @throws IOException
    */
   public Chunk readOutChunk(int bytes, int ms) throws IOException {
@@ -63,13 +79,11 @@
       }
       if(dis == null)
        dis = new DataInputStream( new ByteArrayInputStream(buf.toByteArray()));
-      
       return ChunkImpl.read(dis);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       return null;
     }
-    
   }
 
 }

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java Fri Dec  5 12:30:14 2008
@@ -22,6 +22,7 @@
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Calendar;
+import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
 
@@ -42,9 +43,13 @@
  */
 public class SeqFileWriter implements ChukwaWriter
 {
-	static Logger log = Logger.getLogger(SeqFileWriter.class);
 	public static final boolean ENABLE_ROTATION = true;
-
+	
+	static final int STAT_INTERVAL_SECONDS = 30;
+	static final Object lock = new Object();
+	
+    static Logger log = Logger.getLogger(SeqFileWriter.class);
+  
 	private FileSystem fs = null;
 	private ChukwaConfiguration conf = null;
 
@@ -55,217 +60,281 @@
 	private String currentFileName = null;
 	private FSDataOutputStream currentOutputStr = null;
 	private static SequenceFile.Writer seqFileWriter = null;
-
+	
+	private static ClientAck clientAck = new ClientAck();
+	private static long nextRotate = 0;
+	private static int rotateInterval = 1000*60;
+	
+	private static Timer clientAckTimer = null;
+	
 	private Timer timer = null;
 
 	private Timer statTimer = null;
 	private volatile long dataSize = 0;
 
-	public SeqFileWriter() throws IOException
+	
+	private int initWriteChunkRetries = 10;
+	private int writeChunkRetries = initWriteChunkRetries;
+	
+	public SeqFileWriter() throws WriterException
 	{
 		conf = new ChukwaConfiguration(true);
 		init();
 	}
 
-	public void init() throws IOException
+	public void init() throws WriterException
 	{
 		outputDir = conf.get("chukwaCollector.outputDir", "/chukwa");
 
-		int rotateInterval = conf.getInt("chukwaCollector.rotateInterval",
+		rotateInterval = conf.getInt("chukwaCollector.rotateInterval",
 				1000 * 60 * 5);//defaults to 5 minutes
+		nextRotate = System.currentTimeMillis() + rotateInterval;
+		
+		initWriteChunkRetries = conf.getInt("chukwaCollector.writeChunkRetries", 10);
+		writeChunkRetries = initWriteChunkRetries;
+		
 		//check if they've told us the file system to use
-    String fsname = conf.get("writer.hdfs.filesystem");
-    if (fsname == null || fsname.equals("")){
-      //otherwise try to get the filesystem from hadoop
-      fsname = conf.get("fs.default.name");
-    }
+	    String fsname = conf.get("writer.hdfs.filesystem");
+	    if (fsname == null || fsname.equals(""))
+	    {
+	      //otherwise try to get the filesystem from hadoop
+	      fsname = conf.get("fs.default.name");
+	    }
 		
 
 		log.info("rotateInterval is " + rotateInterval);
-		log.info("ENABLE_ROTATION is " + ENABLE_ROTATION);
 		log.info("outputDir is " + outputDir);
 		log.info("fsname is " + fsname);
 		log.info("filesystem type from hadoop-default.xml is "
 				+ conf.get("fs.hdfs.impl"));
 
-		if (fsname == null)
-		{
+		if (fsname == null) {
 			log.error("no filesystem name");
-			throw new IOException("no filesystem");
-		}
-		try
-		{
+			throw new WriterException("no filesystem");
+		}	try {
 			fs = FileSystem.get(new URI(fsname), conf);
-			if (fs == null)
-			{
+			if (fs == null) {
 				log.error("can't connect to HDFS at " + fs.getUri());
 				return;
 			} else
 				log.info("filesystem is " + fs.getUri());
-		} catch (IOException e)
-		{
+		} catch (IOException e) {
 			log.error(
 							"can't connect to HDFS, trying default file system instead (likely to be local)",
 							e);
-			try
-			{
+			try	{
 				fs = FileSystem.get(conf);
-			} catch (IOException err)
-			{
+			} catch (IOException err) {
 				log.error("can't connect to default file system either", e);
 			}
-		} catch (URISyntaxException e)
-		{
+		} catch (URISyntaxException e) 	{
 			log.error("problem generating new URI from config setting");
 			return;
 		}
 
-		calendar.setTimeInMillis(System.currentTimeMillis());
-		int minutes = calendar.get(Calendar.MINUTE);
-		// number of minutes at current time
-
-		int dec = minutes / 10; // 'tens' digit of current time
-
-		int m = minutes - (dec * 10); // 'units' digit
-		if (m < 5)
-		{
-			m = 5 - m;
-		} else
-		{
-			m = 10 - m;
-		}
-
-		log.info("Current date [" + calendar.getTime().toString()
-				+ "] next schedule [" + m + "]");
+		// Setup everything by rotating
 		rotate();
-
-		timer = new Timer();
-
-		if (ENABLE_ROTATION)
+		 
+		clientAckTimer = new Timer();
+		clientAckTimer.schedule(new TimerTask()
 		{
-			log.info("sink rotation enabled, rotating every " + rotateInterval
-					+ " millis");
-			timer.schedule(new TimerTask()
+			public void run() 
 			{
-				public void run()
+				synchronized (lock) 
 				{
-					rotate();
+					ClientAck previous = clientAck ;
+					SeqFileWriter.clientAck = new ClientAck();
+					
+					try
+					{
+						// SeqFile is uncompressed for now
+						// So we can flush every xx secs
+						// But if we're using block Compression
+						// this is not true anymore
+						// because this will trigger
+						// the compression
+						if (currentOutputStr != null)
+						{
+							currentOutputStr.flush(); 
+						}
+						previous.releaseLock(ClientAck.OK, null);
+						long now = System.currentTimeMillis();
+						if (now >= nextRotate)
+						{
+							nextRotate = System.currentTimeMillis() + rotateInterval;
+							rotate();
+						}
+					}
+					catch(Throwable e)
+					{
+						previous.releaseLock(ClientAck.KO, e);
+						log.warn("Exception when flushing ", e);
+						e.printStackTrace();
+					}	
 				}
+			}
 
-			}, Math.min(rotateInterval, m * 60 * 1000), rotateInterval);
-
-			statTimer = new Timer();
-		} else
-			log.warn("sink rotation is OFF!!");
-
-		statTimer.schedule(new StatReportingTask(), 1000, 60 * 1000);
+		}, (5*1000), (5*1000));
+		
+		statTimer = new Timer();
+		statTimer.schedule(new StatReportingTask(), 1000, STAT_INTERVAL_SECONDS * 1000);
+		
+		
+		
 	}
 
 	private class StatReportingTask extends TimerTask
 	{
 		private long lastTs = System.currentTimeMillis();
-		private long lastDataSize = 0;
 
 		public void run()
 		{
-			long time = System.currentTimeMillis();
-			long interval = time - lastTs;
+			
+		  long time = System.currentTimeMillis();
+			long currentDs = dataSize;
+			dataSize = 0;
+			
+		  long interval = time - lastTs;
 			lastTs = time;
 
-			long ds = dataSize;
-			long dataRate = 1000 * (ds - lastDataSize) / interval; // kb/sec
-			lastDataSize = ds;
-
-			log.info("stat=datacollection.writer.hdfs|dataSize=" + dataSize);
-			log.info("stat=datacollection.writer.hdfs|dataRate=" + dataRate);
+			long dataRate = 1000 * currentDs / interval; // kb/sec
+			log.info("stat:datacollection.writer.hdfs dataSize=" + currentDs + " dataRate=" + dataRate);
 		}
 	};
 
+	
 	void rotate()
 	{
 		calendar.setTimeInMillis(System.currentTimeMillis());
 
 		log.info("start Date [" + calendar.getTime() + "]");
-		//granularity of rollover directory structure is hourly
-		String newDir = new java.text.SimpleDateFormat("yyyy_dd_HH")
-				.format(calendar.getTime());
-
 		log.info("Rotate from " + Thread.currentThread().getName());
 
-		Path newDirPath = new Path(outputDir + "/" + newDir);
-		log.info("Rotate directory[" + newDirPath.toString() + "]");
-		try
-		{
-			if (!fs.exists(newDirPath))
-			{
-				log.info("Create new directory:" + newDirPath.toString());
-				try
-				{
-					fs.mkdirs(newDirPath);
-				} catch (Exception e)
-				{
-					if (!fs.exists(newDirPath))
-					{
-						log.info("Failed to create new directory:"
-								+ newDirPath.toString() + "] ", e);
-					}
-				}
-			} else // use the existing directory, because we haven't hit a new hour yet
-			{
-				log.info("Rotate from [" + Thread.currentThread().getName()
-						+ "] directory (" + newDirPath + ") already exists.");
+		String newName = new java.text.SimpleDateFormat("yyyyddHHmmssSSS").format(calendar.getTime());
+		newName += "_" + new java.rmi.server.UID().toString();
+		newName = newName.replace("-", "");
+		newName = newName.replace(":", "");
+		newName = newName.replace(".", "");
+		newName = outputDir + "/" + newName.trim();
 
-			}
-	    String newName = new java.text.SimpleDateFormat("yyyy_dd_HH_mm_ss_SSS").format(calendar.getTime());
-	    newName += "_" + new java.rmi.server.UID().toString();
-	    newName = newName.replace("-", "");
-	    newName = newName.replace(":", "");
-	    newName = newName.replace(".", "");
-
-			newName = newDirPath + "/" + newName.trim();
-
-			Path newOutputPath = new Path(newName + ".chukwa");
-
-			FSDataOutputStream newOutputStr = fs.create(newOutputPath);
-			FSDataOutputStream previousOutputStr = null;
-			Path previousPath = null;
-			String previousFileName = null;
 
-			synchronized (this)
+		synchronized (lock) 
+		{
+			try
 			{
-				previousOutputStr = currentOutputStr;
-				previousPath = currentPath;
-				previousFileName = currentFileName;
+				FSDataOutputStream previousOutputStr = currentOutputStr;
+				Path previousPath = currentPath;
+				String previousFileName = currentFileName;
 
-				currentOutputStr = newOutputStr;
-				currentPath = newOutputPath;
-				currentFileName = newName;
-				if (previousOutputStr != null)
+				if (previousOutputStr != null) 	
 				{
 					previousOutputStr.close();
 					fs.rename(previousPath,
 							new Path(previousFileName + ".done"));
 				}
-
-				// Turn compression ON if the 5 mins archives are big
+				Path newOutputPath = new Path(newName + ".chukwa");			
+				FSDataOutputStream newOutputStr = fs.create(newOutputPath);
+				currentOutputStr = newOutputStr;
+				currentPath = newOutputPath;
+				currentFileName = newName;
+				// Uncompressed for now
 				seqFileWriter = SequenceFile.createWriter(conf, newOutputStr,
 						ChukwaArchiveKey.class, ChunkImpl.class,
 						SequenceFile.CompressionType.NONE, null);
 			}
-		} catch (IOException e)
-		{
-			log.error("failed to do rotate", e);
+			catch (IOException e)
+			{
+				log.fatal("IO Exception in rotate. Exiting!");
+				e.printStackTrace();
+				// TODO  
+				// As discussed for now:
+				// Everytime this happen in the past it was because HDFS was down, 
+				// so there's nothing we can do
+				// Shutting down the collector for now
+				// Watchdog will re-start it automatically
+				System.exit(-1);
+			}		
 		}
+
 		log.debug("finished rotate()");
 	}
 
-	public synchronized void add(Chunk chunk) throws IOException
+	// TODO merge the 2 add functions
+	@Override
+	public void add(List<Chunk> chunks) throws WriterException
 	{
-		if (chunk != null)
+		if (chunks != null) 	
 		{
-			try
+			try 
+			{
+				ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
+
+				// FIXME compute this once an hour
+				// 
+				synchronized (calendar)
+				{
+					calendar.setTimeInMillis(System.currentTimeMillis());
+					calendar.set(Calendar.MINUTE, 0);
+					calendar.set(Calendar.SECOND, 0);
+					calendar.set(Calendar.MILLISECOND, 0);
+
+					archiveKey.setTimePartition(calendar.getTimeInMillis());
+				}
+
+				ClientAck localClientAck = null;					
+				synchronized(lock)
+				{
+					localClientAck = SeqFileWriter.clientAck;
+					for (Chunk chunk : chunks)
+					{
+						archiveKey.setDataType(chunk.getDataType());
+						archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource() + "/" + chunk.getStreamName());
+						archiveKey.setSeqId(chunk.getSeqID());
+
+						if (chunk != null) 	
+						{
+							seqFileWriter.append(archiveKey, chunk);
+							// compute size for stats
+							dataSize += chunk.getData().length;
+						}
+
+					}
+				}// End synchro
+
+				localClientAck.wait4Ack();
+				if (localClientAck.getStatus() != ClientAck.OK)
+				{
+					log.warn("Exception after notyfyAll on the lock - Thread:" + Thread.currentThread().getName(),localClientAck.getException());
+					throw new WriterException(localClientAck.getException());
+				}
+				else
+				{
+					// sucess
+					writeChunkRetries = initWriteChunkRetries;
+				}
+
+			}
+			catch (IOException e) 
 			{
-				assert chunk instanceof ChunkImpl : "bad input type";
+				writeChunkRetries --;
+				log.error("Could not save the chunk. ", e);
+
+				if (writeChunkRetries < 0)
+				{
+					log.fatal("Too many IOException when trying to write a chunk, Collector is going to exit!");
+					System.exit(-1);
+				}
+				throw new WriterException(e);
+			}
+		}
+
+	}
+	
+	public void add(Chunk chunk) throws WriterException
+	{
+	  
+		if (chunk != null) 	{
+			try {
 				ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
 
 				// FIXME compute this once an hour
@@ -280,71 +349,72 @@
 				}
 
 				archiveKey.setDataType(chunk.getDataType());
-				archiveKey.setStreamName(chunk.getStreamName());
+				archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource() + "/" + chunk.getStreamName());
 				archiveKey.setSeqId(chunk.getSeqID());
 
-				seqFileWriter.append(archiveKey, chunk);
-
-				dataSize += chunk.getData().length;
-				// currentOutput.sync(); //force file out to stable storage on
-				// the cluster.
-				// note that seqFileWriter.sync() would do something completely
-				// different
-			} catch (IOException e)
+				ClientAck localClientAck = null;
+				synchronized(lock)
+				{
+					localClientAck = SeqFileWriter.clientAck;
+					log.info("[" + Thread.currentThread().getName() + "] Client >>>>>>>>>>>> Current Ack object ===>>>>" + localClientAck.toString());
+					seqFileWriter.append(archiveKey, chunk);
+					
+					// compute size for stats
+					dataSize += chunk.getData().length;
+				}
+				localClientAck.wait4Ack();
+				
+				if (localClientAck.getStatus() != ClientAck.OK)
+				{
+					log.warn("Exception after notyfyAll on the lock - Thread:" + Thread.currentThread().getName(),localClientAck.getException());
+					throw new WriterException(localClientAck.getException());
+				}	
+				else
+				{
+					// sucess
+					writeChunkRetries = initWriteChunkRetries;
+				}
+			} 
+			catch (IOException e) 
 			{
-				log.error(e.getMessage());
-				rotate();
-				throw e;
+				writeChunkRetries --;
+				log.error("Could not save the chunk. ", e);
+	
+				if (writeChunkRetries < 0)
+				{
+					log.fatal("Too many IOException when trying to write a chunk, Collector is going to exit!");
+					System.exit(-1);
+				}
+				throw new WriterException(e);
 			}
 		}
 	}
 
 	public void close()
 	{
-		synchronized (this)
+		synchronized (lock)
 		{
-			try
-			{
-				this.currentOutputStr.close();
+		  if (timer != null)
+			  timer.cancel();
+		  if (statTimer != null)
+			  statTimer.cancel();
+		  if (clientAckTimer != null)
+			  clientAckTimer.cancel();
+			try {
+				
+				if (this.currentOutputStr != null)
+				{
+					this.currentOutputStr.close();
+				}
+					
+				clientAck.releaseLock(ClientAck.OK, null);
 				fs.rename(currentPath, new Path(currentFileName + ".done"));
-			} catch (IOException e)
+			} catch (IOException e) 
 			{
+				clientAck.releaseLock(ClientAck.OK, e);
 				log.error("failed to close and rename stream", e);
 			}
 		}
 	}
 
-	/*
-	 * public static class SeqFileKey implements
-	 * org.apache.hadoop.io.WritableComparable<SeqFileKey>{
-	 * 
-	 * public long seqID; public String streamName; public long
-	 * collectorTimestamp;
-	 * 
-	 * public SeqFileKey() {} // for use in deserializing
-	 * 
-	 * SeqFileKey(Chunk event) { seqID = event.getSeqID(); streamName =
-	 * event.getStreamName() + "_" + event.getSource(); collectorTimestamp =
-	 * System.currentTimeMillis(); }
-	 * 
-	 * public void readFields(DataInput in) throws IOException { seqID =
-	 * in.readLong(); streamName = in.readUTF(); collectorTimestamp =
-	 * in.readLong(); }
-	 * 
-	 * public void write(DataOutput out) throws IOException {
-	 * out.writeLong(seqID); out.writeUTF(streamName);
-	 * out.writeLong(collectorTimestamp); }
-	 * 
-	 * public int compareTo(SeqFileKey o) { int cmp =
-	 * streamName.compareTo(o.streamName); if(cmp == 0) { if(seqID < o.seqID)
-	 * return -1; else if (seqID == o.seqID) return 0; else return 1; } else
-	 * return cmp; }
-	 * 
-	 * public boolean equals(Object o) { return (o instanceof SeqFileKey) &&
-	 * (compareTo((SeqFileKey) o) == 0); }
-	 * 
-	 * public int hashCode() { return streamName.hashCode() ^ (int)(seqID >> 32) ^
-	 * (int) seqID; }
-	 *  }
-	 */
 }

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/WriterException.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,29 @@
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+public class WriterException extends Exception
+{
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = -4207275200546397145L;
+
+	public WriterException()
+	{}
+
+	public WriterException(String message)
+	{
+		super(message);
+	}
+
+	public WriterException(Throwable cause)
+	{
+		super(cause);
+	}
+
+	public WriterException(String message, Throwable cause)
+	{
+		super(message, cause);
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveBuilder.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.archive;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+public class ChukwaArchiveBuilder extends Configured implements Tool
+{
+	static Logger log = Logger.getLogger(ChukwaArchiveBuilder.class);
+
+	static int printUsage()
+	{
+		System.out
+				.println("ChuckwaArchiveBuilder <Stream/DataType/Daily/Hourly> <input> <output>");
+		ToolRunner.printGenericCommandUsage(System.out);
+		return -1;
+	}
+	
+	public int run(String[] args) throws Exception
+	{
+	
+
+		// Make sure there are exactly 3 parameters left.
+		if (args.length != 3)
+		{
+			System.out.println("ERROR: Wrong number of parameters: "
+					+ args.length + " instead of 3.");
+			return printUsage();
+		}
+	
+		JobConf jobConf = new JobConf(getConf(), ChukwaArchiveBuilder.class);
+		jobConf.addResource(new Path("conf/chukwa-demux-conf.xml"));
+		
+		jobConf.setInputFormat(SequenceFileInputFormat.class);
+		
+		jobConf.setMapperClass(IdentityMapper.class);
+		jobConf.setReducerClass(IdentityReducer.class);
+		
+		if (args[0].equalsIgnoreCase("Daily"))
+		{
+			jobConf.setPartitionerClass(ChukwaArchiveDailyPartitioner.class);
+			jobConf.setOutputFormat(ChukwaArchiveDailyOutputFormat.class);
+			jobConf.setJobName("Chukwa-DailyArchiveBuilder");
+		}
+		else if (args[0].equalsIgnoreCase("Hourly"))
+		{
+			jobConf.setJobName("Chukwa-HourlyArchiveBuilder");
+			jobConf.setPartitionerClass(ChukwaArchiveHourlyPartitioner.class);
+			jobConf.setOutputFormat(ChukwaArchiveHourlyOutputFormat.class);			
+		}
+		else if (args[0].equalsIgnoreCase("DataType"))
+    {
+      jobConf.setJobName("Chukwa-HourlyArchiveBuilder-DataType");
+      int reduceCount = jobConf.getInt("chukwaArchiveBuilder.reduceCount", 1);
+      log.info("Reduce Count:" + reduceCount);
+      jobConf.setNumReduceTasks(reduceCount);
+      
+      jobConf.setPartitionerClass(ChukwaArchiveDataTypePartitioner.class);
+      jobConf.setOutputFormat(ChukwaArchiveDataTypeOutputFormat.class);     
+    }
+		else if (args[0].equalsIgnoreCase("Stream"))
+    {
+      jobConf.setJobName("Chukwa-HourlyArchiveBuilder-Stream");
+      int reduceCount = jobConf.getInt("chukwaArchiveBuilder.reduceCount", 1);
+      log.info("Reduce Count:" + reduceCount);
+      jobConf.setNumReduceTasks(reduceCount);
+      
+      jobConf.setPartitionerClass(ChukwaArchiveStreamNamePartitioner.class);
+      jobConf.setOutputFormat(ChukwaArchiveStreamNameOutputFormat.class);     
+    }
+		else
+		{
+			System.out.println("ERROR: Wrong Time partionning: "
+					+ args[0] + " instead of [Stream/DataType/Hourly/Daily].");
+			return printUsage();
+		}
+
+    
+		jobConf.set("mapred.compress.map.output", "true");
+		jobConf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.LzoCodec");
+		jobConf.set("mapred.output.compress", "true");
+		jobConf.set("mapred.output.compression.type", "BLOCK");
+	
+		
+		
+		jobConf.setOutputKeyClass(ChukwaArchiveKey.class);
+		jobConf.setOutputValueClass(ChunkImpl.class);
+	
+		FileInputFormat.setInputPaths(jobConf, args[1]);
+		FileOutputFormat.setOutputPath(jobConf, new Path(args[2]));
+
+		JobClient.runJob(jobConf);
+		return 0;
+	}
+
+	public static void main(String[] args) throws Exception
+	{
+		int res = ToolRunner.run(new Configuration(),
+				new ChukwaArchiveBuilder(), args);
+		System.exit(res);
+	}
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyOutputFormat.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyOutputFormat.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDailyOutputFormat.java Fri Dec  5 12:30:14 2008
@@ -31,6 +31,7 @@
 	static Logger log = Logger.getLogger(ChukwaArchiveDailyOutputFormat.class);
 	SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
 	
+	
 	@Override
 	protected String generateFileNameForKeyValue(ChukwaArchiveKey key, ChunkImpl chunk,
 			String name)
@@ -38,6 +39,7 @@
 		
 		if (log.isDebugEnabled())
 			{log.debug("ChukwaArchiveOutputFormat.fileName: " + sdf.format(key.getTimePartition()));}
+		
 		return sdf.format(key.getTimePartition()) + ".arc";
 	}
 }

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypeOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypeOutputFormat.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypeOutputFormat.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypeOutputFormat.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.archive;
+
+
+import java.text.SimpleDateFormat;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
+import org.apache.log4j.Logger;
+
+public class ChukwaArchiveDataTypeOutputFormat extends MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl>
+{
+	static Logger log = Logger.getLogger(ChukwaArchiveDataTypeOutputFormat.class);
+	SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
+	
+	
+	@Override
+	protected String generateFileNameForKeyValue(ChukwaArchiveKey key, ChunkImpl chunk,
+			String name)
+	{
+		
+		if (log.isDebugEnabled())
+			{log.debug("ChukwaArchiveOutputFormat.fileName: " + sdf.format(key.getTimePartition()));}
+		
+		return chunk.getDataType() + "_" +sdf.format(key.getTimePartition()) + ".arc";
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypePartitioner.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypePartitioner.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveDataTypePartitioner.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.archive;
+
+import java.text.SimpleDateFormat;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+
+public class ChukwaArchiveDataTypePartitioner<K, V> 
+	implements Partitioner<ChukwaArchiveKey,ChunkImpl>
+{
+	SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
+	
+	public void configure(JobConf arg0)
+	{}
+
+	public int getPartition(ChukwaArchiveKey key,ChunkImpl chunk, int numReduceTasks)
+	{
+		
+		 return ( (chunk.getDataType() + "_" +sdf.format(key.getTimePartition() )).hashCode() & Integer.MAX_VALUE) % numReduceTasks;
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNameOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNameOutputFormat.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNameOutputFormat.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNameOutputFormat.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.archive;
+
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
+
+public class ChukwaArchiveStreamNameOutputFormat extends MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl>
+{
+
+  @Override
+  protected String generateLeafFileName(String name)
+  {
+    return "chukwaArchive-" + super.generateLeafFileName(name);
+  }
+  
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNamePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNamePartitioner.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNamePartitioner.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveStreamNamePartitioner.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.archive;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+
+public class ChukwaArchiveStreamNamePartitioner<K, V> 
+	implements Partitioner<ChukwaArchiveKey,ChunkImpl>
+{
+	public void configure(JobConf arg0)
+	{}
+
+	public int getPartition(ChukwaArchiveKey key,ChunkImpl chunk, int numReduceTasks)
+	{
+		
+		 return ( (chunk.getSource() + "/" + chunk.getStreamName()).hashCode() & Integer.MAX_VALUE) % numReduceTasks;
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DBException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DBException.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DBException.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DBException.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,30 @@
+package org.apache.hadoop.chukwa.extraction.database;
+
+public class DBException extends Exception
+{
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = -4509384580029389936L;
+
+	public DBException()
+	{
+	}
+
+	public DBException(String message)
+	{
+		super(message);
+	}
+
+	public DBException(Throwable cause)
+	{
+		super(cause);
+	}
+
+	public DBException(String message, Throwable cause)
+	{
+		super(message, cause);
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DBPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DBPlugin.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DBPlugin.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DBPlugin.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,9 @@
+package org.apache.hadoop.chukwa.extraction.database;
+
+import org.apache.hadoop.io.SequenceFile;
+
+public interface DBPlugin
+{
+	void process(SequenceFile.Reader reader)
+	throws DBException;
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseLoader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseLoader.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseLoader.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/DatabaseLoader.java Fri Dec  5 12:30:14 2008
@@ -1,23 +1,25 @@
 package org.apache.hadoop.chukwa.extraction.database;
 
 import java.io.IOException;
-import java.net.URI;
 import java.net.URISyntaxException;
+import java.sql.SQLException;
+import java.util.HashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
-import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
 
 public class DatabaseLoader
 {
 
+	static HashMap<String, String> hashDatasources = new HashMap<String, String>();
+	static ChukwaConfiguration conf = null;
+	static FileSystem fs = null;
+	
 	private static Log log = LogFactory.getLog(DatabaseLoader.class);
 
 	/**
@@ -28,53 +30,98 @@
 	public static void main(String[] args) throws IOException,
 			URISyntaxException
 	{
+		//FIXME quick implementation to be able to load data into database
+		
 		System.out.println("Input directory:" + args[0]);
 
-		ChukwaConfiguration conf = new ChukwaConfiguration();
-		String fsName = conf.get("writer.hdfs.filesystem");
-		FileSystem fs = FileSystem.get(new URI(fsName), conf);
+		
+		for(int i=1;i<args.length;i++)
+		{
+			hashDatasources.put(args[i], "");
+		}
+		
+		conf = new ChukwaConfiguration();
+		fs = FileSystem.get(conf);
+		Path demuxDir = new Path(args[0]);
+		FileStatus fstat = fs.getFileStatus(demuxDir);
 
-		Path srcDir = new Path(args[0]);
+		if (!fstat.isDir())
+		{
+			throw new IOException(args[0] + " is not a directory!");
+		} 
+		else
+		{
+			// cluster Directory
+			FileStatus[] clusterDirectories = fs.listStatus(demuxDir);
+			for (FileStatus clusterDirectory : clusterDirectories)
+			{
+				FileStatus[] datasourceDirectories = fs.listStatus(clusterDirectory.getPath());
+				
+				String directoryName = null;
+				for (FileStatus datasourceDirectory : datasourceDirectories)
+				{
+					directoryName = datasourceDirectory.getPath().getName();
+					if (directoryName.equals("_log") || (!hashDatasources.containsKey(directoryName)))
+					{
+						log.info("Skipping this directory:" + directoryName );
+						continue;
+					}
+					try
+					{
+						processDS(clusterDirectory.getPath().getName(),datasourceDirectory.getPath());
+					}
+					catch(Exception e)
+					{
+						e.printStackTrace();
+						log.warn("Exception in DatabaseLoader:" ,e);
+					}
+				}
+			}
+			
+			System.exit(0);
+		}
+	}
+		
+	static void processDS(String cluster, Path datasourcePath) throws IOException
+	{
+		Path srcDir = datasourcePath;
 		FileStatus fstat = fs.getFileStatus(srcDir);
 
 		if (!fstat.isDir())
 		{
-			throw new IOException(args[0] + " is not a directory!");
+			throw new IOException(datasourcePath.getName() + " is not a directory!");
 		} else
 		{
 			FileStatus[] datasourceDirectories = fs.listStatus(srcDir,new EventFileFilter());
 			for (FileStatus datasourceDirectory : datasourceDirectories)
 			{
-
-				// rename working file
-				String databaseInputFilename = datasourceDirectory.getPath()
-						.getName();
+				String dataSource = datasourceDirectory.getPath().getName();
+				dataSource = dataSource.substring(0,dataSource.indexOf('_'));
 				
-				Path inProgressdatabaseInputFilePath = new Path(databaseInputFilename
-						+ "." + System.currentTimeMillis() + ".pgs");
 				
+				// Need to rename if we want todo some processing in para.
+				//
 				// Maybe the file has already been processed by another loader
 				if (fs.exists(datasourceDirectory.getPath()))
 				{
-					fs.rename(datasourceDirectory.getPath(), inProgressdatabaseInputFilePath);
-
-					SequenceFile.Reader r = new SequenceFile.Reader(fs,datasourceDirectory.getPath(), conf);
-					Text key = new Text();
-					ChukwaRecord record = new ChukwaRecord();
+					
+					log.info("Processing: " + datasourceDirectory.getPath().getName());
+					
 					try
 					{
-						while (r.next(key, record))
-						{
-							System.out.println(record.getValue(DatabaseHelper.sqlField));
-						} // End while(r.next(key, databaseRecord) )
-					} // end Try
-					catch (Exception e)
+						MetricDataLoader mdl = new MetricDataLoader(cluster);
+						mdl.process(datasourceDirectory.getPath());
+					} catch (SQLException e)
 					{
-						log.error("Unable to insert data into database"
-								+ e.getMessage());
 						e.printStackTrace();
-					} 
+						log.warn("SQLException in MetricDataLoader:" ,e);
+					} catch (URISyntaxException e)
+					{
+						e.printStackTrace();
+						log.warn("Exception in MetricDataLoader:" ,e);
+					}
 					
+					log.info("Processed: " + datasourceDirectory.getPath().getName());
 				}
 			} // End for(FileStatus datasourceDirectory :datasourceDirectories)
 		} // End Else
@@ -87,4 +134,4 @@
 	  {
 	    return (path.toString().endsWith(".evt"));
 	  }
-}
\ No newline at end of file
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MRJobCounters.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MRJobCounters.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MRJobCounters.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MRJobCounters.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,74 @@
+package org.apache.hadoop.chukwa.extraction.database;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.io.SequenceFile.Reader;
+
+public class MRJobCounters implements DBPlugin
+{
+	private static Log log = LogFactory.getLog(MRJobCounters.class);
+	
+	static final String[] fields = 
+		{"FILE_SYSTEMS_HDFS_BYTES_READ","FILE_SYSTEMS_HDFS_BYTES_WRITTEN",
+		"FILE_SYSTEMS_LOCAL_BYTES_READ","FILE_SYSTEMS_LOCAL_BYTES_WRITTEN","HodId",
+		"JOB_COUNTERS__DATA-LOCAL_MAP_TASKS","JOB_COUNTERS__LAUNCHED_MAP_TASKS",
+		"JOB_COUNTERS__LAUNCHED_REDUCE_TASKS","JOB_COUNTERS__RACK-LOCAL_MAP_TASKS",
+		"JobId","MAP-REDUCE_FRAMEWORK_COMBINE_INPUT_RECORDS","MAP-REDUCE_FRAMEWORK_COMBINE_OUTPUT_RECORDS",
+		"MAP-REDUCE_FRAMEWORK_MAP_INPUT_BYTES","MAP-REDUCE_FRAMEWORK_MAP_INPUT_RECORDS",
+		"MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_BYTES","MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_RECORDS",
+		"MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_BYTES","MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_RECORDS",
+		"MAP-REDUCE_FRAMEWORK_REDUCE_INPUT_GROUPS","MAP-REDUCE_FRAMEWORK_REDUCE_INPUT_RECORDS",
+		"MAP-REDUCE_FRAMEWORK_REDUCE_OUTPUT_RECORDS"};
+//	[FILE_SYSTEMS_HDFS_BYTES_READ] :801280331655
+//	[FILE_SYSTEMS_HDFS_BYTES_WRITTEN] :44142889
+//	[FILE_SYSTEMS_LOCAL_BYTES_READ] :1735570776310
+//	[FILE_SYSTEMS_LOCAL_BYTES_WRITTEN] :2610893176016
+//	[HodId] :0.0
+//	[JOB_COUNTERS__DATA-LOCAL_MAP_TASKS] :5545
+//	[JOB_COUNTERS__LAUNCHED_MAP_TASKS] :5912
+//	[JOB_COUNTERS__LAUNCHED_REDUCE_TASKS] :739
+//	[JOB_COUNTERS__RACK-LOCAL_MAP_TASKS] :346
+//	[JobId] :2.008042104030008E15
+//	[MAP-REDUCE_FRAMEWORK_COMBINE_INPUT_RECORDS] :0
+//	[MAP-REDUCE_FRAMEWORK_COMBINE_OUTPUT_RECORDS] :0
+//	[MAP-REDUCE_FRAMEWORK_MAP_INPUT_BYTES] :801273929542
+//	[MAP-REDUCE_FRAMEWORK_MAP_INPUT_RECORDS] :9406887059
+//	[MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_BYTES] :784109666437
+//	[MAP-REDUCE_FRAMEWORK_MAP_OUTPUT_RECORDS] :9406887059
+//	[MAP-REDUCE_FRAMEWORK_REDUCE_INPUT_GROUPS] :477623
+//	[MAP-REDUCE_FRAMEWORK_REDUCE_INPUT_RECORDS] :739000
+//	[MAP-REDUCE_FRAMEWORK_REDUCE_OUTPUT_RECORDS] :739000
+
+	
+	@Override
+	public void process(Reader reader) throws DBException
+	{
+		ChukwaRecordKey key = new ChukwaRecordKey();
+		ChukwaRecord record = new ChukwaRecord();
+		try
+		{
+			StringBuilder sb = new StringBuilder();
+			while (reader.next(key, record))
+			{
+				
+				sb.append("insert into MRJobCounters ");
+				for (String field :fields)
+				{
+					sb.append(" set ").append(field).append(" = ").append(record.getValue(field)).append(", ");
+				}
+				sb.append(" set timestamp =").append( record.getTime()).append(";\n");
+			} 
+			System.out.println(sb.toString());
+		} 
+		catch (Exception e)
+		{
+			log.error("Unable to insert data into database"
+					+ e.getMessage());
+			e.printStackTrace();
+		} 
+
+	}
+
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java Fri Dec  5 12:30:14 2008
@@ -1,26 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.chukwa.extraction.database;
 
-import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.database.DatabaseConfig;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
+import org.apache.hadoop.chukwa.util.ClusterConfig;
 import org.apache.hadoop.chukwa.util.DatabaseWriter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
 
 public class MetricDataLoader {
-	
-	private DatabaseWriter db = null;
-	private DataConfig dc = new DataConfig();
-	
-	public MetricDataLoader() {
-		db = new DatabaseWriter();
-	}
-	public void load(String src) {
-		
-	}
-	public void save() {
+     private static Log log = LogFactory.getLog(MetricDataLoader.class);     
+	 private static Connection conn = null;    
+     private static Statement stmt = null; 
+     private ResultSet rs = null; 
+     private static DatabaseConfig mdlConfig = null;
+     private static HashMap<String, String> normalize = null;
+     private static HashMap<String, String> transformer = null;
+     private static HashMap<String, Float> conversion = null;
+     private static HashMap<String, String> dbTables = null;
+     private HashMap<String, HashMap<String,Integer>> dbSchema = null;
+     private static String newSpace="-";
+
+     /** Creates a new instance of DBWriter */
+    public MetricDataLoader() {        
+        initEnv("");
+    }
+    
+    public MetricDataLoader(String cluster) {
+        initEnv(cluster);    	
+    }
+    
+    private void initEnv(String cluster){
+       mdlConfig = new DatabaseConfig();
+       transformer = mdlConfig.startWith("metric.");
+       conversion = new HashMap<String, Float>();
+       normalize = mdlConfig.startWith("normalize.");
+       dbTables = mdlConfig.startWith("report.db.name.");
+       Iterator<?> entries = mdlConfig.iterator();
+       while(entries.hasNext()) {
+           String entry = entries.next().toString();
+           if(entry.startsWith("conversion.")) {
+               String[] metrics = entry.split("=");
+               try {
+            	   float convertNumber = Float.parseFloat(metrics[1]);
+                   conversion.put(metrics[0],convertNumber);               
+               } catch (NumberFormatException ex) {
+                   log.error(metrics[0]+" is not a number.");
+               }
+           }
+       }
+       String jdbc_url = "";
+       log.debug("cluster name:"+cluster);
+       if(!cluster.equals("")) {
+    	   ClusterConfig cc = new ClusterConfig();
+    	   jdbc_url = cc.getURL(cluster);
+       }
+       try {
+           // The newInstance() call is a work around for some
+           // broken Java implementations
+           String jdbcDriver = System.getenv("JDBC_DRIVER");
+           Class.forName(jdbcDriver).newInstance();
+           log.debug("Initialized JDBC URL: "+jdbc_url);
+       } catch (Exception ex) {
+           // handle the error
+           log.error(ex,ex);
+       }
+       try {
+           conn = DriverManager.getConnection(jdbc_url);
+           HashMap<String, String> dbNames = mdlConfig.startWith("report.db.name.");
+           Iterator<String> ki = dbNames.keySet().iterator();
+           dbSchema = new HashMap<String, HashMap<String,Integer>>();
+    	   DatabaseWriter dbWriter = new DatabaseWriter(cluster);
+           while(ki.hasNext()) {
+        	   String table = dbNames.get(ki.next().toString());
+        	   String query = "select * from "+table+"_template limit 1";
+        	   try {
+	        	   ResultSet rs = dbWriter.query(query);
+	        	   ResultSetMetaData rmeta = rs.getMetaData();
+	        	   HashMap<String, Integer> tableSchema = new HashMap<String, Integer>();
+	        	   for(int i=1;i<=rmeta.getColumnCount();i++) {
+	        		   tableSchema.put(rmeta.getColumnName(i),rmeta.getColumnType(i));
+	        	   }
+	               dbSchema.put(table, tableSchema); 
+        	   } catch(SQLException ex) {
+        		   log.debug("table: "+table+" template does not exist, MDL will not load data for this table.");
+        	   }
+           }
+           dbWriter.close();
+       } catch (SQLException ex) {
+           log.error(ex,ex);
+       }       
+    }
+    
+    public void interrupt() {
+    }
+    
+    private String escape(String s,String c){
+        
+        String ns = s.trim();
+        Pattern pattern=Pattern.compile(" +");
+        Matcher matcher = pattern.matcher(ns);
+        String s2= matcher.replaceAll(c);
+
+        return s2;
+
+      
+    }
+    
+    public void process(Path source)  throws IOException, URISyntaxException, SQLException {
+
+		System.out.println("Input file:" + source.getName());
+
+		ChukwaConfiguration conf = new ChukwaConfiguration();
+		String fsName = conf.get("writer.hdfs.filesystem");
+		FileSystem fs = FileSystem.get(new URI(fsName), conf);
+
+		SequenceFile.Reader r = 
+			new SequenceFile.Reader(fs,source, conf);
+
+        stmt = conn.createStatement(); 
+        conn.setAutoCommit(false);
+        
+		ChukwaRecordKey key = new ChukwaRecordKey();
+		ChukwaRecord record = new ChukwaRecord();
+		try
+		{
+			while (r.next(key, record))
+			{
+				boolean isSuccessful=true;
+                String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());
+				log.debug("Timestamp: " + record.getTime());
+				log.debug("DataType: " + key.getReduceType());
+				log.debug("StreamName: " + source.getName());
 		
-	}
-	public void process() {
-		// open hdfs files and process them.
-		String dfs = dc.get("chukwa.engine.dsDirectory.rootFolder");
-		load(dfs);
-		save();
-	}
+				String[] fields = record.getFields();
+	            String table = null;
+	            String[] priKeys = null;
+	            HashMap<String, HashMap<String, String>> hashReport = new HashMap<String ,HashMap<String, String>>();
+	            String normKey = new String();
+	            String node = record.getValue("csource");
+	            String recordType = key.getReduceType().toLowerCase();
+	            if(dbTables.containsKey("report.db.name."+recordType)) {
+	            	
+	            	String[] tmp = mdlConfig.findTableName(mdlConfig.get("report.db.name."+recordType), record.getTime(), record.getTime()); 
+	                table = tmp[0];
+	            } else {
+	            	log.debug("report.db.name."+recordType+" does not exist.");
+	            	continue;
+	            }
+	            log.debug("table name:"+table);
+	            try {
+	                priKeys = mdlConfig.get("report.db.primary.key."+recordType).split(",");
+	            } catch (Exception nullException) {
+	            }
+	            for (String field: fields) {
+	            	String keyName = escape(field.toLowerCase(),newSpace);
+	                String keyValue = escape(record.getValue(field).toLowerCase(),newSpace);
+	                if(normalize.containsKey("normalize." + recordType + "." + keyName)) {
+	                	if(normKey.equals("")) {
+	                        normKey = keyName + "." + keyValue;
+	                	} else {
+	                		normKey = normKey + "." + keyName + "." + keyValue;
+	                	}
+	                }
+	                String normalizedKey = "metric." + recordType + "." + normKey;
+	                if(hashReport.containsKey(node)) {
+	                    HashMap<String, String> tmpHash = hashReport.get(node);
+	                    tmpHash.put(normalizedKey, keyValue);
+	                    hashReport.put(node, tmpHash);
+	                } else {
+	                    HashMap<String, String> tmpHash = new HashMap<String, String>();
+	                    tmpHash.put(normalizedKey, keyValue);
+	                    hashReport.put(node, tmpHash);
+	                }
+	            }
+	            for (String field: fields){                
+	                String valueName=escape(field.toLowerCase(),newSpace);
+	                String valueValue=escape(record.getValue(field).toLowerCase(),newSpace);
+	                String normalizedKey = "metric." + recordType + "." + valueName;
+	                if(!normKey.equals("")) {
+	                	normalizedKey = "metric." + recordType + "." + normKey + "." + valueName;
+	                }
+	                if(hashReport.containsKey(node)) {
+	                    HashMap<String, String> tmpHash = hashReport.get(node);
+	                    tmpHash.put(normalizedKey, valueValue);
+	                    hashReport.put(node, tmpHash);
+	                } else {
+	                    HashMap<String, String> tmpHash = new HashMap<String, String>();
+	                    tmpHash.put(normalizedKey, valueValue);
+	                    hashReport.put(node, tmpHash);
+	                    
+	                }
+
+	            }
+	            Iterator<String> i = hashReport.keySet().iterator();
+	            while(i.hasNext()) {
+	                Object iteratorNode = i.next();
+	                HashMap<String, String> recordSet = hashReport.get(iteratorNode);
+	                Iterator<String> fi = recordSet.keySet().iterator();
+	                // Map any primary key that was not included in the report keyName
+	                String sqlPriKeys = "";
+	                try {
+	                    for (String priKey : priKeys) {
+	                        if(priKey.equals("timestamp")) {
+	                            sqlPriKeys = sqlPriKeys + priKey + " = \"" + sqlTime +"\"";
+	                        }
+	                        if(!priKey.equals(priKeys[priKeys.length-1])) {
+	                            sqlPriKeys = sqlPriKeys + ", ";
+	                        }
+	                    }
+	                } catch (Exception nullException) {
+	                    // ignore if primary key is empty
+	                }
+	                // Map the hash objects to database table columns
+	                String sqlValues = "";
+	                boolean firstValue=true;
+	                while(fi.hasNext()) {
+	                    String fieldKey = (String) fi.next();
+	                    if(transformer.containsKey(fieldKey)) {
+		                	if(!firstValue) {
+		                		sqlValues=sqlValues+", ";
+		                	}
+	                    	try {
+	                        	if(dbSchema.get(dbTables.get("report.db.name."+recordType)).get(transformer.get(fieldKey))== java.sql.Types.VARCHAR||
+	                        			dbSchema.get(dbTables.get("report.db.name."+recordType)).get(transformer.get(fieldKey))== java.sql.Types.BLOB) {
+		                        	if(conversion.containsKey("conversion."+fieldKey)) {
+		                                sqlValues = sqlValues + transformer.get(fieldKey) + "=" + recordSet.get(fieldKey) + conversion.get("conversion."+fieldKey).toString();
+		                        	} else {
+			                            sqlValues = sqlValues + transformer.get(fieldKey) + "=\"" + recordSet.get(fieldKey)+"\"";                                                                
+		                        	}
+	                        	} else {
+	                        		double tmp;
+	                        		tmp=Double.parseDouble(recordSet.get(fieldKey).toString());
+	                        		if(conversion.containsKey("conversion."+fieldKey)) {
+	                        			tmp=tmp*Double.parseDouble(conversion.get("conversion."+fieldKey).toString());
+	                        		}
+	                        		if(Double.isNaN(tmp)) {
+	                        			tmp=0;
+	                        		}
+	                        		sqlValues = sqlValues + transformer.get(fieldKey) + "=" + tmp;
+	                        	}
+	    	                    firstValue=false;	
+	                        } catch (NumberFormatException ex) {
+	                        	if(conversion.containsKey("conversion."+fieldKey)) {
+	                                sqlValues = sqlValues + transformer.get(fieldKey) + "=" + recordSet.get(fieldKey) + conversion.get("conversion."+fieldKey).toString();
+	                        	} else {
+		                            sqlValues = sqlValues + transformer.get(fieldKey) + "=\"" + recordSet.get(fieldKey)+"\"";                                                                
+	                        	}
+	    	                    firstValue=false;
+	                        }
+	                    }
+	                }                
+	
+	                String sql = null;
+	                if(sqlPriKeys.length()>0) {
+	                    sql = "INSERT INTO " + table + " SET " + sqlPriKeys + "," + sqlValues + 
+                        " ON DUPLICATE KEY UPDATE " + sqlPriKeys + "," + sqlValues + ";";	                	
+	                } else {
+	                    sql = "INSERT INTO " + table + " SET " + sqlValues + 
+	                          " ON DUPLICATE KEY UPDATE " + sqlValues + ";";
+	                }
+	                log.debug(sql);
+	                stmt.addBatch(sql);
+	                String logMsg = (isSuccessful ? "Saved" : "Error occurred in saving");
+	                long currentTimeMillis = System.currentTimeMillis();
+	                long latencyMillis = currentTimeMillis - record.getTime();
+	                int latencySeconds = ((int)(latencyMillis + 500)) / 1000;
+	                log.debug(logMsg + " (" + recordType + "," + RecordUtil.getClusterName(record) +
+	                       "," + record.getTime() +
+	                       ") " + latencySeconds + " sec");	               
+	            }
+
+			}
+			@SuppressWarnings("unused")
+			int[] updateCounts = stmt.executeBatch();
+		} catch (SQLException ex) {
+			// handle any errors
+			log.error(ex, ex);
+			log.error("SQLException: " + ex.getMessage());
+			log.error("SQLState: " + ex.getSQLState());
+			log.error("VendorError: " + ex.getErrorCode());
+		} catch (Exception e) {
+			e.printStackTrace();
+		} finally {
+            if (rs != null) {
+                try {
+                    rs.close();
+                } catch (SQLException sqlEx) {
+                    // ignore
+                }
+                rs = null;
+            }
+            if (stmt != null) {
+                try {
+                    stmt.close();
+                } catch (SQLException sqlEx) {
+                    // ignore
+                }
+                stmt = null;
+            }
+        }    	
+    }
+    
+	public static void main(String[] args) {
+		try {
+			MetricDataLoader mdl = new MetricDataLoader();
+			mdl.process(new Path(args[0]));
+		} catch(Exception e) {
+			e.printStackTrace();
+		}
+    }
+    
 }

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java Fri Dec  5 12:30:14 2008
@@ -19,22 +19,27 @@
 package org.apache.hadoop.chukwa.extraction.demux;
 
 
+import org.apache.hadoop.chukwa.extraction.demux.processor.Util;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.chukwa.extraction.engine.Record;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
 import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
 import org.apache.log4j.Logger;
 
-public class ChukwaRecordOutputFormat extends MultipleSequenceFileOutputFormat<Text, ChukwaRecord>
+public class ChukwaRecordOutputFormat extends MultipleSequenceFileOutputFormat<ChukwaRecordKey, ChukwaRecord>
 {
 	static Logger log = Logger.getLogger(ChukwaRecordOutputFormat.class);
-
+	
 	@Override
-	protected String generateFileNameForKeyValue(Text key, ChukwaRecord record,
+	protected String generateFileNameForKeyValue(ChukwaRecordKey key, ChukwaRecord record,
 			String name)
 	{
-		if (log.isDebugEnabled())
-			{log.debug("ChukwaOutputFormat.fileName: " +record.getValue(Record.destinationField));}
-		return record.getValue(Record.destinationField);
+		String output = RecordUtil.getClusterName(record)
+							+ "/" + key.getReduceType() 
+							+ "/" + key.getReduceType() + Util.generateTimeOutput(record.getTime());
+
+		//{log.info("ChukwaOutputFormat.fileName: [" + output +"]");}
+	
+		return output;
 	}
 }

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordPartitioner.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordPartitioner.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordPartitioner.java Fri Dec  5 12:30:14 2008
@@ -18,28 +18,30 @@
 
 package org.apache.hadoop.chukwa.extraction.demux;
 
-import org.apache.hadoop.chukwa.extraction.engine.Record;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.log4j.Logger;
 
-public class ChukwaRecordPartitioner<Text, ChukwaRecord> implements Partitioner<org.apache.hadoop.io.Text, org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord>
+public class ChukwaRecordPartitioner<K, V> 
+	implements Partitioner<ChukwaRecordKey, ChukwaRecord>
 {
 	static Logger log = Logger.getLogger(ChukwaRecordPartitioner.class);
 	public void configure(JobConf arg0)
 	{}
 
-	public int getPartition(org.apache.hadoop.io.Text key,
+	public int getPartition(org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey key,
 			org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord record, int numReduceTasks)
 	{
-		if (!record.containsField(Record.dataSourceField))
+		if (log.isDebugEnabled())
 		{
-			throw new RuntimeException("PartitionerField not set!");
+
+			log.debug("Partitioner key: [" + key.getReduceType() 
+					+ "] - Reducer:"
+					+ ( (key.getReduceType().hashCode() & Integer.MAX_VALUE) % numReduceTasks));	
 		}
-		//FIXME remove before production or set to debug level
-		log.info("Partitioner key: [" + record.getValue(Record.dataSourceField) + "] - Reducer:" + ( (record.getValue(Record.dataSourceField).hashCode() & Integer.MAX_VALUE) % numReduceTasks));
-		
-		return (record.getValue(Record.dataSourceField).hashCode() & Integer.MAX_VALUE) % numReduceTasks;
+		return (key.getReduceType().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
 	}
 
 }

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux;
+
+import java.io.IOException;
+import java.net.URI;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.log4j.Logger;
+
+// TODO do an abstract class for all rolling 
+public class DailyChukwaRecordRolling extends Configured implements Tool
+{
+	static Logger log = Logger.getLogger(DailyChukwaRecordRolling.class);
+	
+	static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
+	static ChukwaConfiguration conf = null;
+	static FileSystem fs = null;
+	static final String HadoopLogDir = "_logs";
+	static final String hadoopTempDir = "_temporary";
+	
+	static boolean rollInSequence = true;
+	static boolean deleteRawdata = false;
+
+	public static void usage()
+	{
+		System.err.println("usage: java org.apache.hadoop.chukwa.extraction.demux.DailyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
+		System.exit(-1);
+	}
+	
+	
+	public static void buildDailyFiles(String chukwaMainRepository, String tempDir,String rollingFolder, int workingDay) throws IOException
+	{
+		// process
+		Path dayPath = new Path(rollingFolder + "/daily/" + workingDay) ;
+		FileStatus[] clustersFS = fs.listStatus(dayPath);
+		for(FileStatus clusterFs : clustersFS)
+		{
+			String cluster = clusterFs.getPath().getName();
+			
+			Path dataSourceClusterHourPaths = new Path(rollingFolder + "/daily/" + workingDay + "/" + cluster) ;
+			FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
+			for(FileStatus dataSourceFS : dataSourcesFS)
+			{
+				String dataSource = dataSourceFS.getPath().getName();
+				// Repo path = reposRootDirectory/<cluster>/<day>/*/*.evt
+				
+				// put the rotate flag
+				fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay + "/rotateDone"));
+				
+				// rotate
+				// Merge
+				String[] mergeArgs = new String[5];
+				// input
+				mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay +  "/*/*.evt";
+				// temp dir
+				mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/" + workingDay ;
+				// final output dir
+				mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay  ;
+				// final output fileName
+				mergeArgs[3] =  dataSource +"_" + workingDay ;
+				// delete rolling directory
+				mergeArgs[4] = rollingFolder + "/daily/" + workingDay + "/" + cluster + "/" + dataSource; 
+						
+				
+				log.info("DailyChukwaRecordRolling 0: " +  mergeArgs[0] );
+				log.info("DailyChukwaRecordRolling 1: " +  mergeArgs[1] );
+				log.info("DailyChukwaRecordRolling 2: " +  mergeArgs[2] );
+				log.info("DailyChukwaRecordRolling 3: " +  mergeArgs[3] );
+				log.info("DailyChukwaRecordRolling 4: " +  mergeArgs[4] );
+				
+				RecordMerger merge = new RecordMerger(conf,fs,new DailyChukwaRecordRolling(),mergeArgs,deleteRawdata);
+				List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
+				if (rollInSequence)
+				{ merge.run(); }
+				else
+				{ 
+					allMerge.add(merge);
+					merge.start();
+				}
+				
+				// join all Threads
+				if (!rollInSequence)
+				{
+					while(allMerge.size() > 0)
+					{
+						RecordMerger m = allMerge.remove(0);
+						try
+						{ m.join(); } 
+						catch (InterruptedException e) {}
+					}
+				} // End if (!rollInSequence)
+				
+				// Delete the processed dataSourceFS
+				FileUtil.fullyDelete(fs,dataSourceFS.getPath());
+				
+			} // End for(FileStatus dataSourceFS : dataSourcesFS)
+			
+			// Delete the processed clusterFs
+			FileUtil.fullyDelete(fs,clusterFs.getPath());
+			
+		} // End for(FileStatus clusterFs : clustersFS)
+		
+		// Delete the processed dayPath
+		FileUtil.fullyDelete(fs,dayPath);
+	}
+	
+	/**
+	 * @param args
+	 * @throws Exception 
+	 */
+	public static void main(String[] args) throws  Exception
+	{
+		conf = new ChukwaConfiguration();
+		String fsName = conf.get("writer.hdfs.filesystem");
+		fs = FileSystem.get(new URI(fsName), conf);
+		
+		// TODO read from config
+		String rollingFolder = "/chukwa/rolling/";
+		String chukwaMainRepository = "/chukwa/repos/";
+		String tempDir = "/chukwa/temp/dailyRolling/";
+		
+		
+		// TODO do a real parameter parsing
+		if (args.length != 4)
+		 { usage(); }
+		
+		if (!args[0].equalsIgnoreCase("rollInSequence"))
+		 { usage(); }
+		
+		if (!args[2].equalsIgnoreCase("deleteRawdata"))
+		 { usage(); }
+			
+		if (args[1].equalsIgnoreCase("true"))
+		 { rollInSequence = true; }
+		else
+		 { rollInSequence = false; }
+		
+		if (args[3].equalsIgnoreCase("true"))
+		 { deleteRawdata = true; }
+		else
+		 { deleteRawdata = false; }
+		
+
+		log.info("rollInSequence: " + rollInSequence);
+		log.info("deleteRawdata: " + deleteRawdata);
+		
+		Calendar calendar = Calendar.getInstance();	
+		int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
+		int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
+		log.info("CurrentDay: " + currentDay);
+		log.info("currentHour" + currentHour);
+	
+		Path rootFolder = new Path(rollingFolder + "/daily/") ;
+		
+		FileStatus[] daysFS = fs.listStatus(rootFolder);
+		for(FileStatus dayFS : daysFS)
+		{
+			try
+			{ 
+				int workingDay = Integer.parseInt(dayFS.getPath().getName());
+				if (  workingDay < currentDay)
+				{
+					buildDailyFiles(chukwaMainRepository, tempDir,rollingFolder,workingDay);
+				} // End if (  workingDay < currentDay)
+			} // End Try workingDay = Integer.parseInt(sdf.format(dayFS.getPath().getName()));
+			catch(NumberFormatException e)
+			{ /* Not a standard Day directory skip */ }
+			
+		} // for(FileStatus dayFS : daysFS)		
+	}
+	
+	
+	public int run(String[] args) throws Exception
+	{
+		JobConf conf = new JobConf(getConf(), DailyChukwaRecordRolling.class);
+
+		conf.setJobName("DailyChukwa-Rolling");
+		conf.setInputFormat(SequenceFileInputFormat.class);
+		
+		conf.setMapperClass(IdentityMapper.class);
+		conf.setReducerClass(IdentityReducer.class);
+
+				
+		conf.setOutputKeyClass(ChukwaRecordKey.class);
+		conf.setOutputValueClass(ChukwaRecord.class);
+		conf.setOutputFormat(SequenceFileOutputFormat.class);
+		
+		conf.set("mapred.compress.map.output", "true");
+		conf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.LzoCodec");
+		conf.set("mapred.output.compress", "true");
+		conf.set("mapred.output.compression.type", "BLOCK");
+	
+		
+		log.info("DailyChukwaRecordRolling input: " +  args[0] );
+		log.info("DailyChukwaRecordRolling output: " +  args[1] );
+		
+		
+		FileInputFormat.setInputPaths(conf, args[0]);
+		FileOutputFormat.setOutputPath(conf, new Path(args[1]));
+
+		JobClient.runJob(conf);
+		return 0;
+	}
+	
+}
\ No newline at end of file



Mime
View raw message