hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r723855 [10/23] - in /hadoop/core/trunk: ./ src/contrib/ src/contrib/chukwa/ src/contrib/chukwa/bin/ src/contrib/chukwa/conf/ src/contrib/chukwa/docs/ src/contrib/chukwa/docs/paper/ src/contrib/chukwa/hadoop-packaging/ src/contrib/chukwa/li...
Date Fri, 05 Dec 2008 20:30:21 GMT
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/Iframe.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/Iframe.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/Iframe.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/Iframe.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,49 @@
+package org.apache.hadoop.chukwa.hicc;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Enumeration;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+public class Iframe  extends HttpServlet {
+	
+    private String id;
+    private String height="100%";
+
+	public void doGet(HttpServletRequest request,
+	           HttpServletResponse response) throws IOException, ServletException {
+		if(request.getParameter("boxId")!=null) {
+			this.id=request.getParameter("boxId");
+		} else {
+			this.id="0";
+		}
+		response.setHeader("boxId", request.getParameter("boxId"));
+	    PrintWriter out = response.getWriter();
+	    StringBuffer source = new StringBuffer();
+	    String requestURL = request.getRequestURL().toString().replaceFirst("iframe/", "");
+	    source.append(requestURL);	    
+	    source.append("?");
+	    Enumeration names = request.getParameterNames();
+	    while(names.hasMoreElements()) {
+	    	String key = (String) names.nextElement();
+	    	String[] values = request.getParameterValues(key);
+	    	for(int i=0;i<values.length;i++) {
+	    	    source.append(key+"="+values[i]+"&");
+	    	}
+	    	if(key.toLowerCase().intern()=="height".intern()) {
+	    		height = request.getParameter(key);
+	    	}
+	    }
+        out.println("<html><body><iframe id=\"iframe"+ this.id +"\" "+
+             "src=\"" + source + "\" width=\"100%\" height=\"" + height + "\" "+
+             "frameborder=\"0\" style=\"overflow: hidden\"></iframe>");
+	}
+   public void doPost(HttpServletRequest request,
+           HttpServletResponse response) throws IOException, ServletException {
+	   doGet(request, response);
+   }
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/TimeHandler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/TimeHandler.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/TimeHandler.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/TimeHandler.java Fri Dec  5 12:30:14 2008
@@ -44,24 +44,29 @@
     }
     
     public TimeHandler(HttpServletRequest request, String tz) {
-    	this.tz=TimeZone.getTimeZone(tz);
+    	if(tz!=null) {
+    	    this.tz=TimeZone.getTimeZone(tz);
+    	} else {
+        	this.tz=TimeZone.getTimeZone("UTC");    		
+    	}
     	init(request);
     }
     
     public void init(HttpServletRequest request) {
+        Calendar now = Calendar.getInstance();
     	this.session = request.getSession();
     	this.request = request;
-    	String timeType = (String)session.getAttribute("time_type");
-    	if((request.getParameter("period")!=null && !request.getParameter("period").equals("")) || (timeType!=null && timeType.equals("last"))) {
-            String period = request.getParameter("period");
-        	if(period == null) {
-                period = (String) session.getAttribute("period");
-                if(period == null) {
-                    period = "last1hr";
-                    session.setAttribute("period",period);
-                }
-            }
-            Calendar now = Calendar.getInstance();
+    	String timeType = "last";
+    	if(request.getParameter("time_type")==null && session.getAttribute("time_type")==null && session.getAttribute("period")==null && request.getParameter("period")==null) {
+    		timeType = "last";
+    		end = now.getTimeInMillis();
+    		start = end - 60*60*1000;
+    		session.setAttribute("period", "last1hr");
+    		session.setAttribute("time_type", "last");
+            session.setAttribute("start", ""+start);
+            session.setAttribute("end", ""+end);
+    	} else if(request.getParameter("period")!=null && !request.getParameter("period").equals("")) {
+    		String period = request.getParameter("period");
             this.start = now.getTimeInMillis();
             this.end = now.getTimeInMillis();            
         	if(period.equals("last1hr")) {
@@ -80,15 +85,57 @@
                 start = end - (7*24*60*60*1000);
             } else if(period.equals("last30d")) {
                 start = end - (30*24*60*60*1000);
-            }
-        	if(request.getParameter("time_type")!=null && request.getParameter("time_type").equals("range")) {
-                session.setAttribute("start", ""+start);
-                session.setAttribute("end", ""+end);
-        	}
-        } else {
+            }    		
+    	} else if(request.getParameter("start")!=null && request.getParameter("end")!=null) {
+    		start = Long.parseLong(request.getParameter("start"));
+    		end = Long.parseLong(request.getParameter("end"));
+    	} else if(session.getAttribute("time_type").equals("range")) {
             start = Long.parseLong((String) session.getAttribute("start"));
-            end = Long.parseLong((String) session.getAttribute("end"));        	
-        }
+            end = Long.parseLong((String) session.getAttribute("end"));
+    	} else if(session.getAttribute("time_type").equals("last") && session.getAttribute("period")!=null){
+    		String period = (String) session.getAttribute("period");
+            this.start = now.getTimeInMillis();
+            this.end = now.getTimeInMillis();            
+        	if(period.equals("last1hr")) {
+                start = end - (60*60*1000);
+            } else if(period.equals("last2hr")) {
+                start = end - (2*60*60*1000);
+            } else if(period.equals("last3hr")) {
+                start = end - (3*60*60*1000);
+            } else if(period.equals("last6hr")) {
+                start = end - (6*60*60*1000);
+            } else if(period.equals("last12hr")) {
+                start = end - (12*60*60*1000);
+            } else if(period.equals("last24hr")) {
+                start = end - (24*60*60*1000);
+            } else if(period.equals("last7d")) {
+                start = end - (7*24*60*60*1000);
+            } else if(period.equals("last30d")) {
+                start = end - (30L*24*60*60*1000);
+            }    		
+    	}
+//    	if((request.getParameter("period")==null || request.getParameter("period").equals("")) && session.getAttribute("time_type")!=null) {
+//        	timeType = (String)session.getAttribute("time_type");
+//    	}
+//    	if((request.getParameter("period")!=null && !request.getParameter("period").equals("")) || (timeType!=null && timeType.equals("last"))) {
+//            String period = request.getParameter("period");
+//        	if(period == null) {
+//                period = (String) session.getAttribute("period");
+//                if(period == null) {
+//                    period = "last1hr";
+//                    session.setAttribute("period",period);
+//                }
+//            }
+//        	// no time specified in request nor session, set default time in session.
+//        	if(request.getParameter("time_type")!=null && request.getParameter("time_type").equals("range")) {
+//                session.setAttribute("start", ""+start);
+//                session.setAttribute("end", ""+end);
+//        	}
+//        } else {
+//        	// no time specified in request, use session time.
+//            start = Long.parseLong((String) session.getAttribute("start"));
+//            end = Long.parseLong((String) session.getAttribute("end"));        	
+//        }
         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm");
         SimpleDateFormat formatDate = new SimpleDateFormat("yyyy-MM-dd");
         SimpleDateFormat formatHour = new SimpleDateFormat("HH");
@@ -109,6 +156,12 @@
         this.endMin = formatMin.format(end);
     }
 
+    public String getStartDate(String format) {
+    	SimpleDateFormat formatter = new SimpleDateFormat(format);
+    	formatter.setTimeZone(this.tz);
+        return	formatter.format(this.start);
+    }
+    
     public String getStartDate() {
         return this.startDate;        	
     }
@@ -128,7 +181,13 @@
     public long getStartTime() {
         return start;	
     }
-    
+
+    public String getEndDate(String format) {
+    	SimpleDateFormat formatter = new SimpleDateFormat(format);
+    	formatter.setTimeZone(this.tz);
+        return	formatter.format(this.end);
+    }
+
     public String getEndDate() {
     	return this.endDate;
     }

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/Workspace.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/Workspace.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/Workspace.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/hicc/Workspace.java Fri Dec  5 12:30:14 2008
@@ -188,11 +188,14 @@
             JSONObject updateObject = new JSONObject(original);
             updateObject.put("description",jt.get("description"));
             setContents(path+"/views/"+id+".view",updateObject.toString());
+            if(!rename(id,jt.get("description").toString())) {
+            	throw new Exception("Rename view file failed");
+            }
             File deleteCache = new File(path+"/views/workspace_view_list.cache");
             deleteCache.delete();
             genViewCache(path+"/views");
             out.println("Workspace is stored successfully.");
-        } catch(JSONException e) {
+        } catch(Exception e) {
             out.println("Workspace store failed.");
         }
     }
@@ -344,6 +347,17 @@
         }
         return hash;
     }
+    
+    private boolean rename(String id, String desc) {
+    	try {
+            File view = new File(path+"/views/"+id+".view");
+            File newFile = new File(path+File.separator+"views"+File.separator+desc+".view");
+            view.renameTo(newFile);
+    	} catch(Exception e) {
+    		return false;
+    	}
+    	return true;
+    }
     private JSONObject filterViewsByPermission(String userid, JSONObject viewArray) {
         return viewArray;
     }

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java Fri Dec  5 12:30:14 2008
@@ -129,9 +129,8 @@
     <p>Do not use the colon ":" character in anywhere in the
     <b>DatePattern</b> option. The text before the colon is interpeted
     as the protocol specificaion of a URL which is probably not what
-    you want. 
+    you want. */
 
-*/
 
 public class ChukwaDailyRollingFileAppender extends FileAppender {
 

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java Fri Dec  5 12:30:14 2008
@@ -18,22 +18,30 @@
 package org.apache.hadoop.chukwa.inputtools.log4j;
 
 import java.io.*;
-
+import java.util.Enumeration;
+import java.util.logging.LogManager;
+import java.util.Properties;
+import org.apache.hadoop.mapred.TaskLogAppender;
 import org.apache.hadoop.metrics.ContextFactory;
 import org.apache.hadoop.metrics.MetricsException;
 import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
 import org.apache.hadoop.metrics.spi.OutputRecord;
+import org.apache.log4j.Appender;
 import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
 import org.json.JSONException;
 import org.json.JSONObject;
 
 public class Log4JMetricsContext extends AbstractMetricsContext {
 
   Logger out = null; //Logger.getLogger(Log4JMetricsContext.class);
+  static final Object lock = new Object();
   
   /* Configuration attribute names */
 //  protected static final String FILE_NAME_PROPERTY = "fileName";
   protected static final String PERIOD_PROPERTY = "period";
+  private static final String metricsLogDir = System.getProperty("hadoop.log.dir");
+  private static final String user = System.getProperty("user.name");
 
     
   /** Creates a new instance of FileContext */
@@ -47,7 +55,7 @@
       file = new File(fileName);
     }
     */
-    out = Logger.getLogger("chukwa.hadoop.metrics."+contextName);
+    
     String periodStr = getAttribute(PERIOD_PROPERTY);
     if (periodStr != null) {
       int period = 0;
@@ -64,8 +72,42 @@
   
   @Override
   protected void emitRecord(String contextName, String recordName, OutputRecord outRec)
-      throws IOException
-  {
+      throws IOException {
+	  if (out == null) {
+		  synchronized(lock) {
+			  if (out == null) {
+				  java.util.Properties properties = new java.util.Properties();
+				  properties.load(this.getClass().getClassLoader().getResourceAsStream("chukwa-hadoop-metrics-log4j.properties"));
+				  Logger logger = Logger.getLogger(Log4JMetricsContext.class);
+                  logger.setAdditivity(false);
+				  PatternLayout layout = new PatternLayout(properties.getProperty("log4j.appender.chukwa."+contextName+".layout.ConversionPattern"));
+				      org.apache.hadoop.chukwa.inputtools.log4j.ChukwaDailyRollingFileAppender appender =
+				        new org.apache.hadoop.chukwa.inputtools.log4j.ChukwaDailyRollingFileAppender();
+				  appender.setName("chukwa."+contextName);
+				  appender.setLayout(layout);
+				  appender.setAppend(false);
+				  if(properties.getProperty("log4j.appender.chukwa."+contextName+".Dir")!=null) {
+					  String logName = properties.getProperty("log4j.appender.chukwa."+contextName+".Dir")+File.separator+"chukwa-"+user+"-"+contextName+".log";
+					  // FIXME: Hack to make the log file readable by chukwa user. 
+					  if(System.getProperty("os.name").intern()=="Linux".intern()) {
+						  Runtime.getRuntime().exec("chmod 640 "+logName);
+					  }
+				      appender.setFile(logName);					  
+				  } else {
+				      appender.setFile(metricsLogDir+File.separator+"chukwa-"+user+"-"+contextName+".log");
+				  }
+				  appender.activateOptions();
+				  appender.setRecordType(properties.getProperty("log4j.appender.chukwa."+contextName+".recordType"));
+				  appender.setChukwaClientHostname(properties.getProperty("log4j.appender.chukwa."+contextName+".chukwaClientHostname"));
+				  appender.setChukwaClientPortNum(Integer.parseInt(properties.getProperty("log4j.appender.chukwa."+contextName+".chukwaClientPortNum")));
+				  appender.setDatePattern(properties.getProperty("log4j.appender.chukwa."+contextName+".DatePattern"));
+				  logger.addAppender(appender);
+				  out = logger;
+			  }
+		  }
+	  }
+	  
+	  
 	JSONObject json = new JSONObject();
     try {
 		json.put("contextName", contextName);

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java Fri Dec  5 12:30:14 2008
@@ -256,7 +256,7 @@
             	     log.info(data);
             	     num++;   
 			      }
-		     } 	 
+		     }
 			 Timestamp startTimedb=new Timestamp(startTimeValue);
 			 Timestamp endTimedb=new Timestamp(endTimeValue);
 			 StringBuffer data=new StringBuffer();
@@ -267,7 +267,7 @@
 			 data.append(", TimeQueued=").append(timeQueued);
 			 data.append(", NumOfMachines=").append(num);
 			 data.append(", EndTime=").append(endTimedb);
-    	     //log.info(data);
+    	     log.info(data);
 			 qstatfinished=true;
 			 
 	      } else{
@@ -289,28 +289,19 @@
 	 
 	 private boolean loadTraceJobData(String hodId) throws IOException,SQLException{
 		 TreeMap<String,String> aJobData=currentHodJobs.get(hodId);
-		 //String queue=aJobData.get("queue");
 		 String userId=aJobData.get("userId");
 		 String process=aJobData.get("process");
-		 //String numOfMachine=aJobData.get("numOfMachine");
 		 
-		 //StringBuffer traceJobsb=new StringBuffer();
 		 StringBuffer sb=new StringBuffer();
 		 sb.append(torqueBinDir).append("/tracejob -n 10 -l -m -s ").append(hodId);
-	   	 //ProcessBuilder pb= new ProcessBuilder(getQueueInfoCommand.toString());
 		 String[] traceJobCommand=new String [3];
 		 traceJobCommand[0]="ssh";
 		 traceJobCommand[1]=torqueServer;
 		 traceJobCommand[2]=sb.toString();
 		 
          String command=traceJobCommand[0]+" "+traceJobCommand[1]+" "+traceJobCommand[2];
-		 //System.out.println(command);
 		 ProcessBuilder pb= new ProcessBuilder(traceJobCommand);
          
-         //String testCommand="/home/lyyang/work/chukwa/src/java/org/apache/hadoop/chukwa/ikit/sleeping";
-         //ProcessBuilder pb= new ProcessBuilder(testCommand);
-		 //pb.redirectErrorStream(false);
-
 		 Process p=pb.start();
 		 
 		 Timer timeout=new Timer();
@@ -321,17 +312,6 @@
 		 ErStreamHandler errorHandler=new ErStreamHandler(p.getErrorStream(),command,false);
 		 errorHandler.start();
 		 String line=null;
-		 /*
-		 BufferedReader error = new BufferedReader (new InputStreamReader(p.getErrorStream()));
-		 String line = null;
-		 boolean start=false;
-         TreeSet<String> jobsInTorque=new TreeSet<String>();
-         String errorLine = null;;
-         while((errorLine=error.readLine())!=null) {
-        	 //discard the error message;
-        	 ;
-         }
-         */
          String exit_status=null;
          String hosts=null;
          long timeQueued=-1;
@@ -370,43 +350,33 @@
 			      Timestamp endTimedb=new Timestamp(endTimeValue*1000);
 			      
 			      exit_status=jobData.get("Exit_status");
-			      //if (process.equals("0")){
-			    	  hosts=jobData.get("exec_host");
-			    	  String [] items2=hosts.split("[+]");
-			    	  int num=0;
-			    	  for (int i=0;i<items2.length;i++) {
-			    		  String machinetemp=items2[i];
-			    		  if (machinetemp.length()>=3){
-	            		 
-			    			  String machine=items2[i].substring(0,items2[i].length()-2);
-			    			  StringBuffer data=new StringBuffer();
-			    			  data.append("HodId=").append(hodId);
-			    			  data.append(", Machine=").append(machine);
-		            	      if(domain!=null) {
-			            	   	 data.append(".").append(domain);
-			            	  }
-			    			  log.info(data.toString());
-			    			  num++;
-			    		  }  
-			    	  }
+			      hosts=jobData.get("exec_host");
+			      String [] items2=hosts.split("[+]");
+			      int num=0;
+			      for (int i=0;i<items2.length;i++) {
+			          String machinetemp=items2[i];
+			          if (machinetemp.length()>=3){	            		 
+			    		 String machine=items2[i].substring(0,items2[i].length()-2);
+			    	     StringBuffer data=new StringBuffer();
+			    	     data.append("HodId=").append(hodId);
+			             data.append(", Machine=").append(machine);
+		            	 if(domain!=null) {
+			            	  data.append(".").append(domain);
+			             }
+			    	     log.info(data.toString());
+			    	     num++;
+			    	  }  
+			      }
 			      
-			    	  StringBuffer data=new StringBuffer();
-			    	  data.append("HodID=").append(hodId);
-			    	  data.append(", UserId=").append(userId);		
-			    	  data.append(", Status=").append(exit_status);
-			    	  data.append(", TimeQueued=").append(timeQueued);
-			    	  data.append(", StartTime=").append(startTimedb);
-			    	  data.append(", EndTime=").append(endTimedb);
-			    	  data.append(", NumOfMachines=").append(num);
-			          log.info(data.toString());
-//			      } else{
-//			    	  StringBuffer data=new StringBuffer();
-//			    	  data.append("HodID=").append(hodId);
-//			    	  data.append(", TimeQueued=").append(timeQueued);
-//			    	  data.append(", EndTime=").append(endTimedb);
-//			    	  data.append(", Status=").append(exit_status);
-//			    	  log.info(data.toString());
-//			      }
+			      StringBuffer data=new StringBuffer();
+			      data.append("HodID=").append(hodId);
+			      data.append(", UserId=").append(userId);		
+			      data.append(", Status=").append(exit_status);
+		    	  data.append(", TimeQueued=").append(timeQueued);
+		    	  data.append(", StartTime=").append(startTimedb);
+		    	  data.append(", EndTime=").append(endTimedb);
+		    	  data.append(", NumOfMachines=").append(num);
+		          log.info(data.toString());
 				  findResult=true;
 		          log.debug(" hod info for job "+hodId+" has been loaded ");
 			 }//if
@@ -431,28 +401,11 @@
             
             log.debug("did not find tracejob info for job "+hodId+", after "+traceCheckCountValue+" times checking");
             if (traceCheckCountValue>=2){ 
-            	tracedone= true;
-            	
-//                StringBuffer deletesb1=new StringBuffer();
-//                deletesb1.append(" Delete from ").append(hodJobTable);
-//                deletesb1.append(" where hodid='").append(hodId).append("'");
-//                String delete1=deletesb1.toString();
-//                
-////                dbWriter.execute(delete1);
-//                
-//                StringBuffer deletesb2=new StringBuffer();
-//                deletesb2.append(" Delete from  ").append(hodMachineTable);
-//                deletesb2.append(" where hodid='").append(hodId).append("'");
-//                String delete2=deletesb2.toString();
-////                dbWriter.execute(delete2);
+            	tracedone= true;            	
             }
         }
-        boolean finished=findResult|tracedone;
-       
-	   
+        boolean finished=findResult|tracedone;	   
         return finished;
-      
-    //  return true;   
 	 }
 	 
 		 
@@ -469,8 +422,6 @@
 		 while (hodIdsIt.hasNext()){
 			 String hodId=(String) hodIdsIt.next();
 			 TreeMap<String,String> aJobData=currentHodJobs.get(hodId);
-			 //String queue=aJobData.get("queue");
-			 //String numOfMachine=aJobData.get("numOfMachine");
 			 String status=aJobData.get("status");
 			 String process=aJobData.get("process");
 			 if (process.equals("0") && (status.equals("R") ||status.equals("E"))){
@@ -502,24 +453,22 @@
 		 } //while
 		 
 	 }
-	 
-	 
+	 	 
 	 private void handle_jobData() throws SQLException{		 
-		 try{
+		 try {
 		     getHodJobInfo();
 		 }catch (IOException ex){
 			 log.error("getQueueInfo Error:"+ex.getMessage());
 			 return;
 		 }
-		 try{    
+		 try {    
 	         process_data();
 		 } catch (SQLException ex){
 			 log.error("process_data Error:"+ex.getMessage());
 			 throw ex;
 		 }
 	 }
-     
-	 
+     	 
 	 public void run_forever() throws SQLException{    	            
      	  while(true){
           	  handle_jobData();
@@ -531,11 +480,7 @@
               }
           }
      }
-     
-	 
+
 	 public void shutdown(){
      }
-   	  
-	
-
 }

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java Fri Dec  5 12:30:14 2008
@@ -35,19 +35,21 @@
   private long offset;
   private int bytesPerSec;
   private ChunkReceiver dest;
+  private long adaptorID;
   
   private volatile boolean stopping = false;
   public String getCurrentStatus() throws AdaptorException {
     return Integer.toString(bytesPerSec);
   }
 
-  public void start(String type, String status, long offset, ChunkReceiver dest) throws AdaptorException
+  public void start(long adaptor, String type, String status, long offset, ChunkReceiver dest) throws AdaptorException
   {
     try{
       bytesPerSec = Integer.parseInt(status);
     } catch(NumberFormatException e) {
       throw new AdaptorException("bad argument to const rate adaptor: " + status);
     }
+    this.adaptorID = adaptor;
     this.offset = offset;
     this.type = type;
     this.dest = dest;
@@ -55,6 +57,10 @@
     super.start();  //this is a Thread.start
   }
   
+  public String getStreamName() {
+	  return ""+bytesPerSec;
+  }
+  
   public void run()
   {
     Random r = new Random();

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java Fri Dec  5 12:30:14 2008
@@ -27,7 +27,6 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
 
 public class DatabaseWriter {
     private static Log log = LogFactory.getLog(DatabaseWriter.class);
@@ -36,18 +35,19 @@
     private ResultSet rs = null;
 
     public DatabaseWriter(String host, String user, String password) {
-    	DataConfig mdlConfig = new DataConfig();
-    	String jdbc_url = "jdbc:mysql://"+host+"/";
-        if(user!=null) {
+    	String jdbc_url = System.getenv("JDBC_URL_PREFIX")+host+"/";
+    	
+		if(user!=null) {
             jdbc_url = jdbc_url + "?user=" + user;
             if(password!=null) {
                 jdbc_url = jdbc_url + "&password=" + password;
             }
-        }
+		}
         try {
             // The newInstance() call is a work around for some
             // broken Java implementations
-            Class.forName("com.mysql.jdbc.Driver").newInstance();
+            String jdbcDriver = System.getenv("JDBC_DRIVER");
+            Class.forName(jdbcDriver).newInstance();
         } catch (Exception ex) {
             // handle the error
             log.error(ex,ex);
@@ -60,19 +60,14 @@
         }
     }
 
-    public DatabaseWriter() {
-    	DataConfig mdlConfig = new DataConfig();
-    	String jdbc_url = "jdbc:mysql://"+mdlConfig.get("jdbc.host")+"/"+mdlConfig.get("jdbc.db");
-        if(mdlConfig.get("jdbc.user")!=null) {
-            jdbc_url = jdbc_url + "?user=" + mdlConfig.get("jdbc.user");
-            if(mdlConfig.get("jdbc.password")!=null) {
-                jdbc_url = jdbc_url + "&password=" + mdlConfig.get("jdbc.password");
-            }
-        }
+    public DatabaseWriter(String cluster) {
+    	ClusterConfig cc = new ClusterConfig();
+    	String jdbc_url = cc.getURL(cluster);
         try {
             // The newInstance() call is a work around for some
             // broken Java implementations
-            Class.forName("com.mysql.jdbc.Driver").newInstance();
+        	String jdbcDriver = System.getenv("JDBC_DRIVER");
+            Class.forName(jdbcDriver).newInstance();
         } catch (Exception ex) {
             // handle the error
             log.error(ex,ex);
@@ -84,6 +79,7 @@
             log.error(ex,ex);
         }
     }
+    
     public void execute(String query) {
         try {
             stmt = conn.createStatement(); 
@@ -106,17 +102,18 @@
             }
         }
     }
-    public ResultSet query(String query) {
+    public ResultSet query(String query) throws SQLException {
         try {
             stmt = conn.createStatement(); 
             rs = stmt.executeQuery(query);
         } catch (SQLException ex) {
             // handle any errors
-            log.error(ex, ex);
-            log.error("SQL Statement:" + query);
-            log.error("SQLException: " + ex.getMessage());
-            log.error("SQLState: " + ex.getSQLState());
-            log.error("VendorError: " + ex.getErrorCode());
+            log.debug(ex, ex);
+            log.debug("SQL Statement:" + query);
+            log.debug("SQLException: " + ex.getMessage());
+            log.debug("SQLState: " + ex.getSQLState());
+            log.debug("VendorError: " + ex.getErrorCode());
+            throw ex;
         } finally {
         }
         return rs;
@@ -151,7 +148,7 @@
             conn = null;
         }
     }
-    public String formatTimeStamp(long timestamp) {
+    public static String formatTimeStamp(long timestamp) {
         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
         String format = formatter.format(timestamp);
 

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DumpArchive.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DumpArchive.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DumpArchive.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DumpArchive.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,61 @@
+package org.apache.hadoop.chukwa.util;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+
+public class DumpArchive
+{
+
+	/**
+	 * @param args
+	 * @throws URISyntaxException 
+	 * @throws IOException 
+	 */
+	public static void main(String[] args) throws IOException, URISyntaxException
+	{
+		System.out.println("Input file:" + args[0]);
+
+		ChukwaConfiguration conf = new ChukwaConfiguration();
+		String fsName = conf.get("writer.hdfs.filesystem");
+		FileSystem fs = FileSystem.get(new URI(fsName), conf);
+
+		SequenceFile.Reader r = 
+			new SequenceFile.Reader(fs,new Path(args[0]), conf);
+		
+		ChukwaArchiveKey key = new ChukwaArchiveKey();
+		ChunkImpl chunk = ChunkImpl.getBlankChunk();
+		try
+		{
+			while (r.next(key, chunk))
+			{
+				System.out.println("\nTimePartition: " + key.getTimePartition());
+				System.out.println("DataType: " + key.getDataType());
+				System.out.println("StreamName: " + key.getStreamName());
+				System.out.println("SeqId: " + key.getSeqId());
+				System.out.println("\t\t =============== ");
+				
+				System.out.println("Cluster : " + chunk.getTags());
+				System.out.println("DataType : " + chunk.getDataType());
+				System.out.println("Source : " + chunk.getSource());
+				System.out.println("Application : " + chunk.getApplication());
+				System.out.println("SeqID : " + chunk.getSeqID());
+				System.out.println("Data : " + new String(chunk.getData()));
+			}
+		} 
+		catch (Exception e)
+		{
+			e.printStackTrace();
+		} 
+	
+
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DumpDataType.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DumpDataType.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DumpDataType.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DumpDataType.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,59 @@
+package org.apache.hadoop.chukwa.util;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+
+public class DumpDataType
+{
+
+	/**
+	 * @param args
+	 * @throws URISyntaxException 
+	 * @throws IOException 
+	 */
+	public static void main(String[] args) throws IOException, URISyntaxException
+	{
+		System.err.println("Input file:" + args[0]);
+		System.err.println("DataType:" + args[1]);
+		System.err.println("Source:" + args[2]);
+		
+		ChukwaConfiguration conf = new ChukwaConfiguration();
+		String fsName = conf.get("writer.hdfs.filesystem");
+		FileSystem fs = FileSystem.get(new URI(fsName), conf);
+
+		SequenceFile.Reader r = 
+			new SequenceFile.Reader(fs,new Path(args[0]), conf);
+		
+		ChukwaArchiveKey key = new ChukwaArchiveKey();
+		ChunkImpl chunk = ChunkImpl.getBlankChunk();
+		try
+		{
+			while (r.next(key, chunk))
+			{
+				if (args[1].equalsIgnoreCase(chunk.getDataType()))
+				{
+					if (args[2].equalsIgnoreCase("ALL") || args[2].equalsIgnoreCase(chunk.getSource()))
+					{
+						System.out.print(new String(chunk.getData()));
+					}	
+				}
+				
+			}
+		} 
+		catch (Exception e)
+		{
+			e.printStackTrace();
+		} 
+	
+
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DumpRecord.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DumpRecord.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DumpRecord.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DumpRecord.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,62 @@
+package org.apache.hadoop.chukwa.util;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+
+public class DumpRecord
+{
+
+	/**
+	 * @param args
+	 * @throws URISyntaxException 
+	 * @throws IOException 
+	 */
+	public static void main(String[] args) throws IOException, URISyntaxException
+	{
+		System.out.println("Input file:" + args[0]);
+
+		ChukwaConfiguration conf = new ChukwaConfiguration();
+		String fsName = conf.get("writer.hdfs.filesystem");
+		FileSystem fs = FileSystem.get(new URI(fsName), conf);
+
+		SequenceFile.Reader r = 
+			new SequenceFile.Reader(fs,new Path(args[0]), conf);
+		
+		ChukwaRecordKey key = new ChukwaRecordKey();
+		ChukwaRecord record = new ChukwaRecord();
+		try
+		{
+			while (r.next(key, record))
+			{
+				System.out.println("\t ===== KEY   ===== ");
+				
+				System.out.println("DataType: " + key.getReduceType());
+				System.out.println("\nKey: " + key.getKey());
+				System.out.println("\t ===== Value =====");
+				
+				String[] fields = record.getFields();
+				System.out.println("Timestamp : " + record.getTime());
+				for (String field: fields)
+				{
+					System.out.println("[" +field +"] :" + record.getValue(field));
+				}
+			}
+		} 
+		catch (Exception e)
+		{
+			e.printStackTrace();
+		} 
+	
+
+
+	}
+
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java Fri Dec  5 12:30:14 2008
@@ -35,20 +35,26 @@
   private long offset;
   private String type;
   ChunkReceiver dest;
+  private long adaptorID;
   
   public String getCurrentStatus() throws AdaptorException {
     return "";
   }
 
-  public void start(String type, String status, long offset, ChunkReceiver dest) throws AdaptorException
+  public void start(long adaptor, String type, String status, long offset, ChunkReceiver dest) throws AdaptorException
   {
     this.setName("MaxRateSender adaptor");
+    this.adaptorID = adaptor;
     this.offset = offset;
     this.type = type;
     this.dest = dest;
     super.start();  //this is a Thread.start
   }
   
+  public String getStreamName() {
+	  return type;
+  }
+  
   public void run()
   {
     Random r = new Random();

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/PidFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/PidFile.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/PidFile.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/PidFile.java Fri Dec  5 12:30:14 2008
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.chukwa.util;
 
-import java.io.IOException;
-import java.io.File;
 import java.io.*;
 import java.lang.management.ManagementFactory;
 import java.nio.channels.*;
@@ -51,7 +49,10 @@
   	     String pid=items[0];
 	     String chukwaPath=System.getProperty("CHUKWA_HOME");
 	     StringBuffer pidFilesb=new StringBuffer();
-	     String pidDir = chukwaPath+File.separator+"var"+File.separator+"run";
+	     String pidDir = System.getenv("CHUKWA_PID_DIR");
+	     if (pidDir == null) {
+	       pidDir = chukwaPath+File.separator+"var"+File.separator+"run";
+	     }
 	     pidFilesb.append(pidDir).append(File.separator).append(name).append(".pid");
 	     try{
 	    	 File existsFile = new File(pidDir);
@@ -87,7 +88,11 @@
 	public void clean(){
         String chukwaPath=System.getenv("CHUKWA_HOME");
         StringBuffer pidFilesb=new StringBuffer();
-        pidFilesb.append(chukwaPath).append("/var/run/").append(name).append(".pid"); 
+        String pidDir = System.getenv("CHUKWA_PID_DIR");
+        if (pidDir == null) {
+          pidDir = chukwaPath+File.separator+"var"+File.separator+"run";
+        }
+        pidFilesb.append(pidDir).append(File.separator).append(name).append(".pid"); 
         String pidFileName=pidFilesb.toString();
 
         File pidFile=new File(pidFileName);

Modified: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/TestChunkBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/TestChunkBuilder.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/TestChunkBuilder.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/TestChunkBuilder.java Fri Dec  5 12:30:14 2008
@@ -35,4 +35,5 @@
     assertEquals(5, chunk.getRecordOffsets()[1]);
     assertEquals(8, chunk.getRecordOffsets()[2]);
   }
+
 }

Modified: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java Fri Dec  5 12:30:14 2008
@@ -20,9 +20,14 @@
 import java.io.*;
 
 import junit.framework.TestCase;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 
+import java.util.Map;
+import java.util.Iterator;
 import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.adaptor.*;
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
 import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
 
 public class TestFileTailingAdaptors extends TestCase {
@@ -33,31 +38,52 @@
   }
   
   public void testRawAdaptor() throws IOException, InterruptedException, ChukwaAgent.AlreadyRunningException {
+
     ChukwaAgent  agent = new ChukwaAgent();
-    
-    File testFile = makeTestFile("/tmp/chukwaTest");
-    agent.processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.FileTailingAdaptor" +
+    // Remove any adaptor left over from previous run
+    ChukwaConfiguration cc = new ChukwaConfiguration();
+    int portno = cc.getInt("chukwaAgent.control.port", 9093);
+    ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
+    cli.removeAll();
+    // sleep for some time to make sure we don't get chunk from existing streams
+    Thread.sleep(5000);
+    File testFile = makeTestFile("/tmp/chukwaRawTest",80);
+    long adaptorId = agent.processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.FileTailingAdaptor" +
         " raw " + testFile + " 0");
-    assertTrue(agent.adaptorCount() == 1);
+    assertTrue(adaptorId != -1);
     Chunk c = chunks.waitForAChunk();
+    while(!c.getDataType().equals("raw")) {
+        c = chunks.waitForAChunk();
+    }
     assertTrue(c.getDataType().equals("raw"));
     assertTrue(c.getRecordOffsets().length == 1);
-    assertTrue(c.getSeqID() == testFile.length());     
+    assertTrue(c.getSeqID() == testFile.length());
+    agent.stopAdaptor(adaptorId, false);
     agent.shutdown();
   }
 
 
   public void testCrSepAdaptor() throws IOException, InterruptedException, ChukwaAgent.AlreadyRunningException {
     ChukwaAgent  agent = new ChukwaAgent();
-    File testFile = makeTestFile("/tmp/chukwaTest");
-    agent.processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8" +
+    // Remove any adaptor left over from previous run
+    ChukwaConfiguration cc = new ChukwaConfiguration();
+    int portno = cc.getInt("chukwaAgent.control.port", 9093);
+    ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
+    cli.removeAll();
+    // sleep for some time to make sure we don't get chunk from existing streams
+    Thread.sleep(5000);
+    File testFile = makeTestFile("/tmp/chukwaCrSepTest",80);
+    long adaptorId = agent.processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8" +
         " lines " + testFile + " 0");
-    assertTrue(agent.adaptorCount() == 1);
+    assertTrue(adaptorId != -1);
     System.out.println("getting a chunk...");
     Chunk c = chunks.waitForAChunk(); 
     System.out.println("got chunk");
+    while(!c.getDataType().equals("lines")) {
+        c = chunks.waitForAChunk();
+    }
     assertTrue(c.getSeqID() == testFile.length());    
-    
+System.out.println(c.getRecordOffsets().length);
     assertTrue(c.getRecordOffsets().length == 80);
     int recStart = 0;
     for(int rec = 0 ; rec < c.getRecordOffsets().length; ++rec) {
@@ -67,15 +93,65 @@
       recStart = c.getRecordOffsets()[rec] +1;
     }
     assertTrue(c.getDataType().equals("lines"));    
+    agent.stopAdaptor(adaptorId, false);
+    agent.shutdown();
+  }
+  
+  public void testLogRotate() throws IOException, InterruptedException, ChukwaAgent.AlreadyRunningException {
+    ChukwaAgent  agent = new ChukwaAgent();
+    // Remove any adaptor left over from previous run
+    ChukwaConfiguration cc = new ChukwaConfiguration();
+    int portno = cc.getInt("chukwaAgent.control.port", 9093);
+    ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
+    cli.removeAll();
+    // sleep for some time to make sure we don't get chunk from existing streams
+    Thread.sleep(5000);
+    File testFile = makeTestFile("/tmp/chukwaLogRotateTest",80);
+    long adaptorId = agent.processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8" +
+        " lines " + testFile + " 0");
+    assertTrue(adaptorId != -1);
+    System.out.println("getting a chunk...");
+    Chunk c = chunks.waitForAChunk(); 
+    System.out.println("got chunk");
+    while(!c.getDataType().equals("lines")) {
+        c = chunks.waitForAChunk();
+    }
+    assertTrue(c.getSeqID() == testFile.length());	  
+    assertTrue(c.getRecordOffsets().length == 80);
+    int recStart = 0;
+    for(int rec = 0 ; rec < c.getRecordOffsets().length; ++rec) {
+      String record = new String(c.getData(), recStart, c.getRecordOffsets()[rec] - recStart+1);
+      System.out.println("record "+ rec+ " was: " + record);
+      assertTrue(record.equals(rec + " abcdefghijklmnopqrstuvwxyz\n"));
+      recStart = c.getRecordOffsets()[rec] +1;
+    }
+    assertTrue(c.getDataType().equals("lines"));
+    testFile = makeTestFile("/tmp/chukwaLogRotateTest",40);
+    c = chunks.waitForAChunk(); 
+    System.out.println("got chunk");
+    while(!c.getDataType().equals("lines")) {
+        c = chunks.waitForAChunk();
+    }
+    //assertTrue(c.getSeqID() == testFile.length());	  
+    assertTrue(c.getRecordOffsets().length == 40);
+    recStart = 0;
+    for(int rec = 0 ; rec < c.getRecordOffsets().length; ++rec) {
+      String record = new String(c.getData(), recStart, c.getRecordOffsets()[rec] - recStart+1);
+      System.out.println("record "+ rec+ " was: " + record);
+      assertTrue(record.equals(rec + " abcdefghijklmnopqrstuvwxyz\n"));
+      recStart = c.getRecordOffsets()[rec] +1;
+    }
+    assertTrue(c.getDataType().equals("lines"));
+    agent.stopAdaptor(adaptorId, false);
     agent.shutdown();
   }
   
-  private File makeTestFile(String name) throws IOException {
+  private File makeTestFile(String name, int size) throws IOException {
     File tmpOutput = new File(name);
     FileOutputStream fos = new FileOutputStream(tmpOutput);
     
     PrintWriter pw = new PrintWriter(fos);
-    for(int i = 0; i < 80; ++i) {
+    for(int i = 0; i < size; ++i) {
       pw.print(i + " ");
       pw.println("abcdefghijklmnopqrstuvwxyz");
     }

Modified: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java Fri Dec  5 12:30:14 2008
@@ -22,8 +22,11 @@
 import java.io.IOException;
 import java.io.PrintWriter;
 
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.adaptor.*;
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
 import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
 
 import junit.framework.TestCase;
@@ -38,17 +41,27 @@
   
   public void testStartAtOffset() throws IOException, InterruptedException, ChukwaAgent.AlreadyRunningException {
     ChukwaAgent  agent = new ChukwaAgent();
+    // Remove any adaptor left over from previous run
+    ChukwaConfiguration cc = new ChukwaConfiguration();
+    int portno = cc.getInt("chukwaAgent.control.port", 9093);
+    ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
+    cli.removeAll();
+    // sleep for some time to make sure we don't get chunk from existing streams
+    Thread.sleep(5000);
     File testFile = makeTestFile();
-    int startOffset = 50;
-    agent.processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8 " +
+    int startOffset = 0;  // skip first line
+    long adaptorId = agent.processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8 " +
          "lines "+ startOffset+ " " + testFile + " " + startOffset);
-    assertTrue(agent.adaptorCount() == 1);
+    assertTrue(adaptorId != -1);
     System.out.println("getting a chunk...");
     Chunk c = chunks.waitForAChunk(); 
     System.out.println("got chunk");
+    while(!c.getDataType().equals("lines")) {
+        c = chunks.waitForAChunk();
+    }
     assertTrue(c.getSeqID() == testFile.length() + startOffset);    
-    
-    assertTrue(c.getRecordOffsets().length == 80);//80 lines in test file
+    System.out.println("RecordOffsets length:"+c.getRecordOffsets().length);
+    assertTrue(c.getRecordOffsets().length == 80); // 80 lines in the file.
     int recStart = 0;
     for(int rec = 0 ; rec < c.getRecordOffsets().length; ++rec) {
       String record = new String(c.getData(), recStart, c.getRecordOffsets()[rec] - recStart+1);
@@ -56,20 +69,31 @@
       assertTrue(record.equals(rec + " abcdefghijklmnopqrstuvwxyz\n"));
       recStart = c.getRecordOffsets()[rec] +1;
     }
-    assertTrue(c.getDataType().equals("lines"));    
+    assertTrue(c.getDataType().equals("lines"));
+    agent.stopAdaptor(adaptorId, false);
     agent.shutdown();
   }
   
   public void testStartAfterOffset() throws IOException, InterruptedException, ChukwaAgent.AlreadyRunningException {
     ChukwaAgent  agent = new ChukwaAgent();
+    // Remove any adaptor left over from previous run
+    ChukwaConfiguration cc = new ChukwaConfiguration();
+    int portno = cc.getInt("chukwaAgent.control.port", 9093);
+    ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
+    cli.removeAll();
+    // sleep for some time to make sure we don't get chunk from existing streams
+    Thread.sleep(5000);
     File testFile = makeTestFile();
-    int startOffset = 50;
-    agent.processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8 " +
+    int startOffset = 0;
+    long adaptorId = agent.processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8 " +
          "lines "+ startOffset+ " " + testFile + " " + (startOffset + 29) );
-    assertTrue(agent.adaptorCount() == 1);
+    assertTrue(adaptorId != -1);
     System.out.println("getting a chunk...");
     Chunk c = chunks.waitForAChunk(); 
     System.out.println("got chunk");
+    while(!c.getDataType().equals("lines")) {
+        c = chunks.waitForAChunk();
+    }
     assertTrue(c.getSeqID() == testFile.length() + startOffset);    
     
     assertTrue(c.getRecordOffsets().length == 79);//80 lines in test file, minus the one we skipped
@@ -81,6 +105,7 @@
       recStart = c.getRecordOffsets()[rec] +1;
     }
     assertTrue(c.getDataType().equals("lines"));    
+    agent.stopAdaptor(adaptorId, false);
     agent.shutdown();
   }
   

Modified: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgent.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgent.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgent.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgent.java Fri Dec  5 12:30:14 2008
@@ -41,11 +41,10 @@
       ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
       
       for(int i=1; i < 20; ++i) {
-        cli.add("org.apache.hadoop.chukwa.util.ConstRateAdaptor", "raw" + i, "20000", 0);
-        assertTrue(agent.adaptorCount() == 1);
+        long adaptorId = cli.add("org.apache.hadoop.chukwa.util.ConstRateAdaptor", "raw" + i, "2000" + i, 0);
+        assertTrue(adaptorId != -1);
         Thread.sleep(2000);   
         cli.removeAll();
-        assertTrue(agent.adaptorCount() == 0);
       }
       agent.shutdown();
       conn.shutdown();
@@ -61,20 +60,20 @@
       ChukwaAgent agent = new ChukwaAgent();
       ConsoleOutConnector conn = new ConsoleOutConnector(agent, true);
       conn.start();
-      
+      int count = agent.adaptorCount();
       for(int trial=0; trial < 20; ++trial) {
         ArrayList<Long> runningAdaptors = new ArrayList<Long>();
        
         for(int i = 1; i < 7; ++i) {
-          long l = agent.processCommand("add org.apache.hadoop.chukwa.util.ConstRateAdaptor raw"+i+ " 20000 0");
-          assertTrue(agent.adaptorCount() == i); 
+          long l = agent.processCommand("add org.apache.hadoop.chukwa.util.ConstRateAdaptor raw"+i+ " 2000"+i+" 0");
           assertTrue(l != -1);
           runningAdaptors.add(l);
         }
         Thread.sleep(1000);   
         for(Long l: runningAdaptors)
-          agent.stopAdaptor(l, true);
-        assertTrue(agent.adaptorCount() == 0);
+          agent.stopAdaptor(l, false);
+        Thread.sleep(5000);
+        assertTrue(agent.adaptorCount() == count);
       }
       agent.shutdown();
     } catch(Exception e) {
@@ -82,5 +81,9 @@
       fail(e.toString());
     }
   }
+  
+  public void testLogRotate() {
+
+  }
 
 }

Modified: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java Fri Dec  5 12:30:14 2008
@@ -37,4 +37,10 @@
   public void shutdown() {
   }
 
+  @Override
+  public void reloadConfiguration()
+  {
+  	System.out.println("reloadConfiguration");
+  }
+
 }

Added: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestClientAck.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestClientAck.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestClientAck.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestClientAck.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,24 @@
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+import org.apache.hadoop.chukwa.datacollection.writer.ClientAck;
+
+public class TestClientAck extends TestCase
+{
+
+  public void testWait4AckTimeOut()
+  {
+    ClientAck clientAck = new ClientAck();
+    long startDate = System.currentTimeMillis();
+    clientAck.wait4Ack();
+    long now = System.currentTimeMillis();
+    long duration = now - startDate ;
+    duration = duration - clientAck.getTimeOut();
+    
+    Assert.assertTrue("should not wait nore than " 
+        + clientAck.getTimeOut() + " + 7sec" , duration < 7000);
+    Assert.assertEquals(ClientAck.KO_LOCK, clientAck.getStatus());
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChukwaTestOutputCollector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChukwaTestOutputCollector.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChukwaTestOutputCollector.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChukwaTestOutputCollector.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,36 @@
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.OutputCollector;
+
+public class ChukwaTestOutputCollector<K,V> implements OutputCollector<K,V>
+{
+	public HashMap<K, V> data = new HashMap<K, V>();
+	
+	public void collect(K key, V value) throws IOException
+	{
+		data.put(key, value);
+	}
+
+	@Override
+	public String toString()
+	{
+		Iterator<K> it = data.keySet().iterator();
+		K key = null;
+		V value = null;
+		StringBuilder sb = new StringBuilder();
+		
+		while(it.hasNext())
+		{
+			key = it.next();
+			value = data.get(key);
+			sb.append("Key[").append(key).append("] value[").append(value).append("]\n");
+		}
+		return sb.toString();
+	}
+
+	
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestAbtractProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestAbtractProcessor.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestAbtractProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestAbtractProcessor.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,60 @@
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.ChunkBuilder;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.util.RecordConstants;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+public class TestAbtractProcessor extends TestCase
+{
+
+	String[] data = {"dsjsjbsfjds\ndsafsfasd\n","asdgHSAJGDGYDGGHAgd7364rt3478tc4\nhr473rt346t\n","e	gqd	yeegyxuyexfg\n"};
+	
+	public void testParse()
+	{
+		
+
+		ChunkBuilder cb = new ChunkBuilder();
+		cb.addRecord(RecordConstants.escapeAllButLastRecordSeparator("\n", data[0]).getBytes());
+		cb.addRecord(RecordConstants.escapeAllButLastRecordSeparator("\n", data[1]).getBytes());
+		cb.addRecord(RecordConstants.escapeAllButLastRecordSeparator("\n", data[2]).getBytes());
+		Chunk chunk = cb.getChunk();	
+		OutputCollector<ChukwaRecordKey, ChukwaRecord> output = new ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord>();
+		TProcessor p = new TProcessor();
+		p.data = data;
+		p.process(null,chunk, output, null);
+	}
+
+
+}
+
+
+class TProcessor extends AbstractProcessor
+{
+	String[] data = null;
+	int count = 0;
+	
+	@Override
+	protected void parse(String recordEntry,
+			OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+	{
+		if (!recordEntry.equals(data[count]))
+		{
+			System.out.println("[" + recordEntry +"]");
+			System.out.println("[" + data[count] +"]");
+			throw new RuntimeException("not the same record");
+		}
+		count ++;
+	}
+
+	public String getDataType()
+	{
+		// TODO Auto-generated method stub
+		return null;
+	}
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestHadoopLogProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestHadoopLogProcessor.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestHadoopLogProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestHadoopLogProcessor.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/*
+ * Test code for verifying that the log processors work properly.
+ * 
+ * Currently more or less just a stub
+ */
+public class TestHadoopLogProcessor extends TestCase{
+  
+  long serializedSize = 0;
+  OutputCollector<ChukwaRecordKey, ChukwaRecord> nullcollector = new OutputCollector<ChukwaRecordKey, ChukwaRecord>() {
+    public void collect(ChukwaRecordKey arg0, ChukwaRecord arg1) throws IOException
+    {
+      serializedSize += arg1.toString().length();
+    }
+  };
+
+  
+  
+  public void testHLPParseTimes() {
+    HadoopLogProcessor hlp = new HadoopLogProcessor();
+  
+    int LINES = 50000;
+    long bytes = 0;
+    long ts_start = System.currentTimeMillis();
+    for(int i =0; i < LINES; ++i) {
+      Chunk c = getNewChunk();
+      bytes += c.getData().length;
+      hlp.process(null,c, nullcollector, Reporter.NULL);
+ //     hlp.parse(line, nullcollector, Reporter.NULL);
+    }
+    long time = (System.currentTimeMillis() - ts_start);
+    System.out.println("parse took " + time + " milliseconds");
+    System.out.println("aka " + time * 1.0 / LINES + " ms per line or " + 
+        time *1000.0 / bytes  + " ms per kilobyte of log data");
+    System.out.println("output records had total length of " + serializedSize);
+  }
+  
+
+  java.util.Random r = new java.util.Random();
+  public Chunk getNewChunk() {
+    int ms = r.nextInt(1000);
+    String line = "2008-05-29 10:42:22,"+ ms + " INFO org.apache.hadoop.dfs.DataNode: Some text goes here" +r.nextInt() + "\n";
+    ChunkImpl c = new ChunkImpl("HadoopLogProcessor", "test" ,line.length() -1 ,line.getBytes() , null );
+       
+    return c;
+  }
+  
+  /*
+  public File writeATestFile(int lines) throws IOException {
+    File output = new File("/tmp/hlpparsetest");
+    FileOutputStream fos = new FileOutputStream(output);
+    PrintStream out = new PrintStream(fos);
+    
+    for(int i =0; i < lines; ++i) {
+      out.println("2008-05-29 10:42:22,724 INFO org.apache.hadoop.dfs.DataNode: Some text goes here" +i);
+  
+    }
+    out.close();
+    return output;
+  }*/
+}
+

Modified: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/inputtools/log4j/TestChukwaAppender.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/inputtools/log4j/TestChukwaAppender.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/inputtools/log4j/TestChukwaAppender.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/inputtools/log4j/TestChukwaAppender.java Fri Dec  5 12:30:14 2008
@@ -29,8 +29,7 @@
 
 public class TestChukwaAppender extends TestCase {
   
-  @SuppressWarnings("deprecation")
-public void testChukwaAppender() {
+  public void testChukwaAppender() {
     try {
     
     ChukwaAgent agent = new ChukwaAgent();

Added: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/validationframework/ChukwaAgentToCollectorValidator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/validationframework/ChukwaAgentToCollectorValidator.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/validationframework/ChukwaAgentToCollectorValidator.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/validationframework/ChukwaAgentToCollectorValidator.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,269 @@
+package org.apache.hadoop.chukwa.validationframework;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent.AlreadyRunningException;
+import org.apache.hadoop.chukwa.datacollection.collector.CollectorStub;
+import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
+import org.apache.hadoop.chukwa.validationframework.interceptor.ChunkDumper;
+import org.apache.hadoop.chukwa.validationframework.interceptor.SetupTestClasses;
+import org.apache.hadoop.chukwa.validationframework.util.DataOperations;
+
+public class ChukwaAgentToCollectorValidator
+{
+	public static final int ADD = 100;
+	public static final int VALIDATE = 200;
+	
+	
+	private static void usage()
+	{
+		System.out.println("usage ...");
+		System.exit(-1);
+	}
+	/**
+	 * @param args
+	 * @throws Throwable 
+	 * @throws AlreadyRunningException
+	 * @throws IOException 
+	 */
+	public static void main(String[] args) throws Throwable
+	{
+		if (args.length != 2)
+		{
+			usage();
+		}
+		
+		
+		int command = -1;
+		
+		if ("-add".equalsIgnoreCase(args[0]))
+		{
+			command = ChukwaAgentToCollectorValidator.ADD;
+		}
+		else if ("-validate".equalsIgnoreCase(args[0]))
+		{
+			command = ChukwaAgentToCollectorValidator.VALIDATE;
+		}
+		else
+		{
+			usage();
+		}
+		
+		String chukwaTestRepository = System.getenv("chukwaTestRepository");
+		if (chukwaTestRepository == null)
+		{
+		  chukwaTestRepository = "/tmp/chukwaTestRepository/";
+		}
+		
+		if (!chukwaTestRepository.endsWith("/"))
+		{
+		  chukwaTestRepository += "/";
+		}
+		
+		String fileName = args[1];
+		
+		String name = null;
+		if (fileName.indexOf("/") >= 0)
+		{
+		  name = fileName.substring(fileName.lastIndexOf("/"));
+		}
+		else
+		{
+		  name = fileName;
+		}
+		  
+		String chukwaTestDirectory =  chukwaTestRepository + name ;
+		String inputFile = chukwaTestDirectory + "/input/" + name;
+		String outputDir = null;
+		
+		if (command == ChukwaAgentToCollectorValidator.ADD)
+		{
+			File dir = new File(chukwaTestDirectory + "/input/");
+			if (dir.exists())
+			{
+				throw new RuntimeException("a test with the same input file is already there, remove it first");
+			}
+			dir.mkdirs();
+			DataOperations.copyFile(fileName, inputFile);
+			outputDir = "/gold";
+		}
+		else
+		{
+		  outputDir = "/" + System.currentTimeMillis();
+		}
+		
+		System.out.println("chukwaTestDirectory [" + chukwaTestDirectory + "]");
+		System.out.println("command [" + ( (command == ChukwaAgentToCollectorValidator.ADD)?"ADD":"VALIDATE") + "]");
+		System.out.println("fileName [" + inputFile + "]");
+		
+
+		 ChukwaConfiguration conf = new ChukwaConfiguration(true);
+	   String collectorOutputDir = conf.get("chukwaCollector.outputDir");
+		
+	   
+		prepareAndSendData(chukwaTestDirectory+ outputDir,inputFile,collectorOutputDir);
+		extractRawLog(chukwaTestDirectory+ outputDir,name,collectorOutputDir);
+		boolean rawLogTestResult = validateRawLogs(chukwaTestDirectory+outputDir,name) ;
+		
+		
+		boolean binLogTestResult = true;
+		
+	  if (command == ChukwaAgentToCollectorValidator.VALIDATE)
+	  {
+	    binLogTestResult = validateOutputs(chukwaTestDirectory+outputDir,name);
+	  }
+		
+		
+		if (rawLogTestResult == true && binLogTestResult == true)
+		{
+		  System.out.println("test OK");
+		  System.exit(10);
+		}
+		else
+		{
+		  System.out.println("test KO");
+		  throw new RuntimeException("test failed for file [" + name +"]" );
+		}
+	}
+
+	public static void prepareAndSendData(String dataRootFolder,String inputFile,String dataSinkDirectory) throws Throwable
+	{
+
+	  ChunkDumper.testRepositoryDumpDir = dataRootFolder + "/";
+
+	  SetupTestClasses.setupClasses();
+
+	  // clean up the collector outputDir.
+	  File collectorDir = new File(dataSinkDirectory);
+	  String[] files = collectorDir.list();
+	  for(String f: files)
+	  {
+	    File file = new File(dataSinkDirectory+ File.separator +f);
+	    file.delete();
+	    System.out.println("Deleting previous collectors files: " + f);
+	  }
+	  
+	  System.out.println("Starting agent");
+    String[] agentArgs = new String[0];
+    ChukwaAgent.main(agentArgs);
+    
+    // Start the collector
+    System.out.println("Starting collector");
+    CollectorStub.main(new String[0]);
+    
+    // Start the agent
+    ChukwaAgent agent = ChukwaAgent.getAgent();
+    
+     
+    int portno = 9093; // Default
+    ChukwaAgentController cli = new ChukwaAgentController("localhost",portno);
+    // ADD 
+    // org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped 
+    // SysLog 
+    // 0 /var/log/messages 
+    // 0
+    System.out.println("Adding adaptor");
+    long adaptor = cli.add("org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped",
+        "AutomatedTestType", "0 " + inputFile, 0);
+    
+    cli.remove(adaptor);
+    System.out.println("Adaptor removed");
+    agent.shutdown();
+    System.out.println("Shutting down agent");
+    CollectorStub.jettyServer.stop();
+    System.out.println("Shutting down collector");
+    Thread.sleep(2000);
+	}
+	
+	public static void extractRawLog(String dataRootFolder,String fileName,String dataSinkDirectory) throws Exception
+	{
+	  // Adaptor output
+	  DataOperations.extractRawLogFromDump(dataRootFolder + "/adaptor/", fileName);
+	  // Sender output
+	  DataOperations.extractRawLogFromDump(dataRootFolder + "/sender/", fileName);
+	  
+	  // Collector output
+	  File dir = new File(dataRootFolder  + "/collector/");
+	  dir.mkdirs();
+	  
+	  File dataSinkDir = new File(dataSinkDirectory);
+	  String[] doneFiles = dataSinkDir.list();
+    // Move done file to the final directory
+    for(String f: doneFiles)
+    {
+      String outputFile = null;
+      if (f.endsWith(".done"))
+      {
+        outputFile = fileName +".done";
+      }
+      else
+      {
+        outputFile = fileName + ".crc";
+      }
+      System.out.println("Moving that file ["  + dataSinkDirectory+ File.separator +f + "] to ["  +dataRootFolder  + "/collector/" + outputFile +"]");
+     DataOperations.copyFile(dataSinkDirectory+ File.separator +f, dataRootFolder  + "/collector/" + outputFile);
+     }
+  
+    DataOperations.extractRawLogFromdataSink(ChunkDumper.testRepositoryDumpDir  + "/collector/",fileName);
+	}
+	
+	
+	public static boolean validateRawLogs(String dataRootFolder,String fileName) 
+	{
+	  boolean result = true;
+	  // Validate Adaptor
+	  boolean adaptorMD5 = DataOperations.validateMD5(dataRootFolder + "/../input/" + fileName, dataRootFolder + "/adaptor/" + fileName + ".raw");
+	  if (!adaptorMD5)
+	  {
+	    System.out.println("Adaptor validation failed");
+	    result = false;
+	  }
+	  // Validate Sender
+	  boolean senderMD5 = DataOperations.validateMD5(dataRootFolder + "/../input/" + fileName, dataRootFolder + "/sender/" + fileName + ".raw");
+	  if (!senderMD5)
+	  {
+	    System.out.println("Sender validation failed");
+	    result = false;
+	  }
+	  // Validate DataSink
+	  boolean collectorMD5 = DataOperations.validateMD5(dataRootFolder + "/../input/" + fileName , dataRootFolder + "/collector/" + fileName + ".raw");
+	  if (!collectorMD5)
+	  {
+	    System.out.println("collector validation failed");
+	    result = false;
+	  }
+	
+	  return result;
+	}
+
+	public static boolean validateOutputs(String dataRootFolder,String fileName)
+	{
+	   boolean result = true;
+	    // Validate Adaptor
+	    boolean adaptorMD5 = DataOperations.validateMD5(dataRootFolder + "/../gold/adaptor/" + fileName + ".bin", dataRootFolder + "/adaptor/" + fileName + ".bin");
+	    if (!adaptorMD5)
+	    {
+	      System.out.println("Adaptor bin validation failed");
+	      result = false;
+	    }
+	    // Validate Sender
+	    boolean senderMD5 = DataOperations.validateMD5(dataRootFolder + "/../gold/sender/" + fileName+ ".bin", dataRootFolder + "/sender/" + fileName + ".bin");
+	    if (!senderMD5)
+	    {
+	      System.out.println("Sender bin validation failed");
+	      result = false;
+	    }
+	    // Validate DataSink
+//	    boolean collectorMD5 = DataOperations.validateRawLog(dataRootFolder + "/../gold/collector/" + fileName + ".done", dataRootFolder + "/collector/" + fileName + ".done");
+//	    if (!collectorMD5)
+//	    {
+//	      System.out.println("collector bin validation failed");
+//	      result = false;
+//	    }
+	  
+	    return result;
+	}
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/validationframework/DemuxDirectoryValidator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/validationframework/DemuxDirectoryValidator.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/validationframework/DemuxDirectoryValidator.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/validationframework/DemuxDirectoryValidator.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.validationframework;
+
+import java.io.File;
+import java.net.URI;
+
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.validationframework.util.DataOperations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class DemuxDirectoryValidator
+{
+
+  static Configuration conf = null;
+  static FileSystem fs = null;
+  
+  public static void usage()
+  {
+    System.out.println("Usage ...");
+    System.exit(-1);
+  }
+  
+  public static void validate(boolean isLocal,FileSystem fs,Configuration conf,String[] directories)
+  {
+    DemuxDirectoryValidator.fs = fs;
+    DemuxDirectoryValidator.conf = conf;
+    try
+    {
+      if (isLocal)
+      {
+        compareLocalDirectory(directories[0],directories[1]);
+      }
+      else
+      {
+        DemuxDirectoryValidator.fs = fs;
+        compareHDFSDirectory(directories[0],directories[1]);
+      }
+    }
+    catch(Exception e)
+    {
+      e.printStackTrace();
+      throw new RuntimeException ("Validation failed! [" 
+          + directories[0] +"][" + directories[1] + "]" ,e);
+    }
+  }
+  
+  /**
+   * @param args
+   */
+  public static void main(String[] args)
+  {
+
+    if (args.length != 3)
+    {
+      usage();
+    }
+    
+    String demuxGoldDirectory = args[1];
+    String demuxTestDirectory = args[2];
+    boolean isLocal = true;
+    
+    if ("-local".equalsIgnoreCase(args[0]))
+    {
+      compareLocalDirectory(demuxGoldDirectory,demuxTestDirectory);
+    }
+    else if ("-hdfs".equalsIgnoreCase(args[0]))
+    {
+      isLocal = false;
+      conf = new ChukwaConfiguration();
+      String fsName = conf.get("writer.hdfs.filesystem");
+      try
+      {
+        fs = FileSystem.get(new URI(fsName), conf);
+      } catch (Exception e)
+      {
+        e.printStackTrace();
+        throw new RuntimeException(e);
+      } 
+    }
+    else
+    {
+      System.out.println("Wrong first argument");
+      usage();
+    }
+    
+    String[] dirs = { demuxGoldDirectory,demuxTestDirectory};
+    validate(isLocal,fs,conf,dirs);
+    
+    System.out.println("Gold and test directories are equivalent");
+    System.exit(10);
+  }
+  
+  public static void compareHDFSDirectory(String gold,String test)
+  {
+    try
+    {
+      Path goldDirectory = new Path(gold);
+      FileStatus[] goldFiles = fs.listStatus(goldDirectory);
+
+      //      Path testDirectory = new Path(test);
+//      FileStatus[] testFiles = fs.listStatus(testDirectory);
+//      
+      
+      for(int i=0;i<goldFiles.length;i++)
+      {
+        
+        //Skip the crc files
+        if (goldFiles[i].getPath().getName().endsWith(".crc")  )
+        { continue; }
+        
+        System.out.println("Testing [" + goldFiles[i].getPath().getName().intern() + "]" );
+        
+//        if (goldFiles[i].getPath().getName().intern() != testFiles[i].getPath().getName().intern())
+//        {
+//          throw new RuntimeException("Gold & test dirrectories [" + gold +"/" +goldFiles[i].getPath().getName() +"] are not the same");
+//        }
+    
+        if (goldFiles[i].isDir())
+        {
+          //Skip the _logs directory
+          if (goldFiles[i].getPath().getName().equalsIgnoreCase("_logs"))
+          { continue; }
+          
+          compareHDFSDirectory(gold +"/" +goldFiles[i].getPath().getName(),test +"/" +goldFiles[i].getPath().getName());
+        }
+        else
+        {
+          boolean isTheSme = DataOperations.validateChukwaRecords(fs,conf,goldFiles[i].getPath(), new Path(test + "/" +goldFiles[i].getPath().getName()));
+          if (!isTheSme)
+          {
+//            System.out.println("MD5 failed on [" + gold +"/" +goldFiles[i] +"]");
+            throw new RuntimeException("ChukwaRecords validation error: for Gold & test [" + gold +"/" +goldFiles[i].getPath().getName()+"] [" 
+                + test +"/" +goldFiles[i].getPath().getName()+ "] are not the same");
+          }
+        }
+      }
+    } catch (Exception e)
+    {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+
+  }
+
+  public static void compareLocalDirectory(String gold,String test)
+  {
+    File goldDirectory = new File(gold);
+    String[] goldFiles = goldDirectory.list();
+    File testDirectory = new File(test);
+    String[] testFiles = testDirectory.list();
+    
+    for(int i=0;i<goldFiles.length;i++)
+    {
+      if (goldFiles[i].intern() != testFiles[i].intern())
+      {
+        throw new RuntimeException("Gold & test dirrectories [" + gold +"/" +goldFiles[i] +"] are not the same");
+      }
+      File g = new File(gold +"/" +goldFiles[i]);
+      if (g.isDirectory())
+      {
+        //Skip the _logs directory
+        if (goldFiles[i].equalsIgnoreCase("_logs"))
+        { continue; }
+       
+        compareLocalDirectory(gold +"/" +goldFiles[i],test +"/" +goldFiles[i]);
+      }
+      else
+      {
+        boolean md5 = DataOperations.validateMD5(gold +"/" +goldFiles[i], test +"/" +goldFiles[i]);
+        if (!md5)
+        {
+//          System.out.println("MD5 failed on [" + gold +"/" +goldFiles[i] +"]");
+          throw new RuntimeException("MD5 for Gold & test [" + gold +"/" +goldFiles[i]+"] are not the same");
+        }
+      }
+    }
+    
+    
+  }
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/validationframework/TestDemux.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/validationframework/TestDemux.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/validationframework/TestDemux.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/validationframework/TestDemux.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,114 @@
+package org.apache.hadoop.chukwa.validationframework;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.chukwa.extraction.demux.Demux;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.util.ToolRunner;
+
+public class TestDemux extends TestCase
+{
+
+  int NUM_HADOOP_SLAVES = 2;
+  private String DEMUX_INPUT_PATH = "demuxHdfsData/input";
+  private String DEMUX_GOLD_PATH = "demuxHdfsData/gold";
+  private String DEMUX_OUTPUT_PATH = "demuxHdfsData/output_";
+  
+  Configuration conf = null;
+  FileSystem fileSys = null;
+  MiniDFSCluster dfs = null;
+  MiniMRCluster mr = null;
+  
+  protected void setUp() throws Exception
+  {
+   
+    
+    conf = new Configuration();
+    System.setProperty("hadoop.log.dir", "/tmp/");
+    dfs = new MiniDFSCluster(conf,NUM_HADOOP_SLAVES,true,null,null);
+    dfs.waitClusterUp();
+    
+    fileSys = dfs.getFileSystem();
+    mr = new MiniMRCluster(NUM_HADOOP_SLAVES, fileSys.getUri().toString(), 1);
+   
+    unpackStorage();
+  }
+  
+    void unpackStorage() throws IOException 
+    {
+      String tarFile = System.getProperty("test.demux.data") +
+                       "/demuxData.tgz";
+      String dataDir = System.getProperty("test.build.data");
+      File dfsDir = new File(dataDir, "chukwa");
+      if ( dfsDir.exists() && !FileUtil.fullyDelete(dfsDir) ) {
+        throw new IOException("Could not delete dfs directory '" + dfsDir + "'");
+      }
+      
+      FileUtil.unTar(new File(tarFile), new File(dataDir));
+
+      // Copy to HDFS
+      FileUtil.copy(new File(System.getProperty("test.build.data") +"/demuxData/"),
+          fileSys, new Path("demuxHdfsData"), false, conf);
+      
+      // Set input/output directories
+      DEMUX_OUTPUT_PATH = DEMUX_OUTPUT_PATH + System.currentTimeMillis();
+
+      System.out.println("DEMUX_INPUT_PATH: " +DEMUX_INPUT_PATH);
+      System.out.println("DEMUX_OUTPUT_PATH: " +DEMUX_OUTPUT_PATH);
+//      
+//      FileStatus[] testFiles = fileSys.listStatus(new Path("demuxHdfsData"));
+//      for (FileStatus f: testFiles)
+//      {
+//        System.out.println(f.getPath().toString());
+//      }
+     
+
+  }
+  
+  public void testDemux() {
+    try
+    {
+      
+      String[] sortArgs = {DEMUX_INPUT_PATH.toString(), DEMUX_OUTPUT_PATH.toString()};
+      int res = ToolRunner.run(mr.createJobConf(), new Demux(), sortArgs);
+      Assert.assertEquals(res, 0);
+      
+      String[] directories = new String[2];
+      directories[0] = DEMUX_GOLD_PATH;
+      directories[1] = DEMUX_OUTPUT_PATH;
+      
+      DemuxDirectoryValidator.validate(false, fileSys, conf,directories);
+      
+    }
+    catch(Exception e)
+    {
+      e.printStackTrace();
+      Assert.fail("Exception in TestDemux: " + e);
+    }
+  }
+  protected void tearDown() throws Exception
+  {
+    if (dfs != null)
+    {
+      try { dfs.shutdown(); }
+      catch(Exception e) 
+      { /* do nothing since we're not testing the MiniDFSCluster */ }  
+    }
+    if (mr != null)
+    {
+      try { mr.shutdown(); }
+      catch(Exception e) 
+      { /* do nothing since we're not testing the MiniDFSCluster */ }  
+    }
+  }
+
+}



Mime
View raw message