chukwa-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject [6/8] chukwa git commit: CHUKWA-771. Improved code quality issue identified by findbugs. (Eric Yang)
Date Sun, 26 Jul 2015 02:08:59 GMT
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java b/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java
index ae9233c..5538a40 100644
--- a/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java
+++ b/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java
@@ -27,6 +27,7 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map.Entry;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.concurrent.Callable;
@@ -62,10 +63,15 @@ public class MetricDataLoader implements Callable {
   private Connection conn = null;
   private Path source = null;
 
-  private static ChukwaConfiguration conf = null;
-  private static FileSystem fs = null;
+  private ChukwaConfiguration conf = null;
+  private FileSystem fs = null;
   private String jdbc_url = "";
 
+  public MetricDataLoader(String fileName) throws IOException {
+    conf = new ChukwaConfiguration();
+    fs = FileSystem.get(conf);
+  }
+
   /** Creates a new instance of DBWriter */
   public MetricDataLoader(ChukwaConfiguration conf, FileSystem fs, String fileName) {
     source = new Path(fileName);
@@ -171,6 +177,9 @@ public class MetricDataLoader implements Callable {
     return( sb.toString()); 
   }
   
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
+      "SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE", 
+      justification = "Dynamic based upon tables in the database")
   public boolean run() throws IOException {
     boolean first=true;
     log.info("StreamName: " + source.getName());
@@ -195,7 +204,7 @@ public class MetricDataLoader implements Callable {
     try {
       Pattern p = Pattern.compile("(.*)\\-(\\d+)$");
       int batch = 0;
-      while (reader.next(key, record)) {
+      while (reader !=null && reader.next(key, record)) {
     	numOfRecords++;
         if(first) { 
           try {
@@ -336,12 +345,9 @@ public class MetricDataLoader implements Callable {
           }
 
         }
-        Iterator<String> i = hashReport.keySet().iterator();
-        while (i.hasNext()) {
-          Object iteratorNode = i.next();
-          HashMap<String, String> recordSet = hashReport.get(iteratorNode);
-          Iterator<String> fi = recordSet.keySet().iterator();
-          // Map any primary key that was not included in the report keyName
+        for(Entry<String, HashMap<String, String>> entry : hashReport.entrySet()) {
+          HashMap<String, String> recordSet = entry.getValue();
+       // Map any primary key that was not included in the report keyName
           StringBuilder sqlPriKeys = new StringBuilder();
           try {
             for (String priKey : priKeys) {
@@ -363,8 +369,9 @@ public class MetricDataLoader implements Callable {
           // Map the hash objects to database table columns
           StringBuilder sqlValues = new StringBuilder();
           boolean firstValue = true;
-          while (fi.hasNext()) {
-            String fieldKey = fi.next();
+          for(Entry<String, String> fi : recordSet.entrySet()) {
+            String fieldKey = fi.getKey();
+            String fieldValue = fi.getValue();
             if (transformer.containsKey(fieldKey) && transformer.get(fieldKey).intern()!="_delete".intern()) {
               if (!firstValue) {
                 sqlValues.append(", ");
@@ -378,12 +385,12 @@ public class MetricDataLoader implements Callable {
                   if (conversion.containsKey(conversionKey)) {
                     sqlValues.append(transformer.get(fieldKey));
                     sqlValues.append("=");
-                    sqlValues.append(recordSet.get(fieldKey));
+                    sqlValues.append(fieldValue);
                     sqlValues.append(conversion.get(conversionKey).toString());
                   } else {
                     sqlValues.append(transformer.get(fieldKey));
                     sqlValues.append("=\'");
-                    sqlValues.append(escapeQuotes(recordSet.get(fieldKey)));
+                    sqlValues.append(escapeQuotes(fieldValue));
                     sqlValues.append("\'");
                   }
                 } else if (dbSchema.get(dbTables.get(dbKey)).get(
@@ -391,8 +398,7 @@ public class MetricDataLoader implements Callable {
                   SimpleDateFormat formatter = new SimpleDateFormat(
                       "yyyy-MM-dd HH:mm:ss");
                   Date recordDate = new Date();
-                  recordDate.setTime(Long.parseLong(recordSet
-                      .get(fieldKey)));
+                  recordDate.setTime(Long.parseLong(fieldValue));
                   sqlValues.append(transformer.get(fieldKey));
                   sqlValues.append("=\"");
                   sqlValues.append(formatter.format(recordDate));
@@ -405,7 +411,7 @@ public class MetricDataLoader implements Callable {
                         transformer.get(fieldKey)) == java.sql.Types.INTEGER) {
                   long tmp = 0;
                   try {
-                    tmp = Long.parseLong(recordSet.get(fieldKey).toString());
+                    tmp = Long.parseLong(fieldValue);
                     String conversionKey = "conversion." + fieldKey;
                     if (conversion.containsKey(conversionKey)) {
                       tmp = tmp
@@ -420,7 +426,7 @@ public class MetricDataLoader implements Callable {
                   sqlValues.append(tmp);
                 } else {
                   double tmp = 0;
-                  tmp = Double.parseDouble(recordSet.get(fieldKey).toString());
+                  tmp = Double.parseDouble(fieldValue);
                   String conversionKey = "conversion." + fieldKey;
                   if (conversion.containsKey(conversionKey)) {
                     tmp = tmp
@@ -455,7 +461,6 @@ public class MetricDataLoader implements Callable {
               }
             }
           }
-
           StringBuilder sql = new StringBuilder();
           if (sqlPriKeys.length() > 0) {
             sql.append("INSERT INTO ");
@@ -587,9 +592,7 @@ public class MetricDataLoader implements Callable {
 
   public static void main(String[] args) {
     try {
-      conf = new ChukwaConfiguration();
-      fs = FileSystem.get(conf);
-      MetricDataLoader mdl = new MetricDataLoader(conf, fs, args[0]);
+      MetricDataLoader mdl = new MetricDataLoader(args[0]);
       mdl.run();
     } catch (Exception e) {
       e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoaderPool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoaderPool.java b/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoaderPool.java
index b763087..5ad5258 100644
--- a/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoaderPool.java
+++ b/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoaderPool.java
@@ -37,8 +37,8 @@ public class MetricDataLoaderPool extends DataLoaderFactory {
   protected MetricDataLoader threads[] = null;
   private static String DATA_LOADER_THREAD_LIMIT = "chukwa.data.loader.threads.limit";
   private int size = 1;
-  private static CompletionService completion = null;
-  private static ExecutorService executor = null;
+  private CompletionService completion = null;
+  private ExecutorService executor = null;
   
   public MetricDataLoaderPool() {
   }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/dataloader/SocketDataLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/dataloader/SocketDataLoader.java b/src/main/java/org/apache/hadoop/chukwa/dataloader/SocketDataLoader.java
index 1cf801c..c47bdff 100644
--- a/src/main/java/org/apache/hadoop/chukwa/dataloader/SocketDataLoader.java
+++ b/src/main/java/org/apache/hadoop/chukwa/dataloader/SocketDataLoader.java
@@ -22,6 +22,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.Socket;
 import java.net.SocketException;
+import java.nio.charset.Charset;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
@@ -100,7 +101,7 @@ public class SocketDataLoader implements Runnable {
           output.append(" all");
         }
         output.append("\n");
-        dos.write((output.toString()).getBytes());
+        dos.write((output.toString()).getBytes(Charset.forName("UTF-8")));
       } catch (SocketException e) {
         log.warn("Error while settin soTimeout to 120000");
       }
@@ -135,7 +136,7 @@ public class SocketDataLoader implements Runnable {
   /*
    * Unsubscribe from Chukwa collector and stop streaming.
    */
-  public void stop() {
+  public synchronized void stop() {
     if(s!=null) {
       try {
         dis.close();
@@ -169,7 +170,7 @@ public class SocketDataLoader implements Runnable {
    * into SDL queue.
    */
   @Override
-  public void run() {
+  public synchronized  void run() {
     try {
       Chunk c;
       while ((c = ChunkImpl.read(dis)) != null) {

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datastore/UserStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/UserStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/UserStore.java
index 196a38a..8802fcf 100755
--- a/src/main/java/org/apache/hadoop/chukwa/datastore/UserStore.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datastore/UserStore.java
@@ -20,13 +20,13 @@ package org.apache.hadoop.chukwa.datastore;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.Charset;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
-
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.hicc.HiccWebServer;
 import org.apache.hadoop.chukwa.rest.bean.UserBean;
@@ -44,10 +44,14 @@ public class UserStore {
   private static Log log = LogFactory.getLog(UserStore.class);
   private static Configuration config = new Configuration();
   private static ChukwaConfiguration chukwaConf = new ChukwaConfiguration();
-  private static String hiccPath = config.get("fs.defaultFS")+File.separator+chukwaConf.get("chukwa.data.dir")+File.separator+"hicc"+File.separator+"users";
+  private static String hiccPath = null;
+    
+  static {
+    config = HiccWebServer.getConfig();
+    hiccPath = config.get("fs.defaultFS")+File.separator+chukwaConf.get("chukwa.data.dir")+File.separator+"hicc"+File.separator+"users";
+}
   
   public UserStore() throws IllegalAccessException {
-    UserStore.config = HiccWebServer.getConfig();
   }
 
   public UserStore(String uid) throws IllegalAccessException {
@@ -73,7 +77,7 @@ public class UserStore {
         viewStream.readFully(buffer);
         viewStream.close();
         try {
-          JSONObject json = (JSONObject) JSONValue.parse(new String(buffer));
+          JSONObject json = (JSONObject) JSONValue.parse(new String(buffer, Charset.forName("UTF-8")));
           profile = new UserBean(json);
         } catch (Exception e) {
           log.error(ExceptionUtil.getStackTrace(e));
@@ -110,7 +114,7 @@ public class UserStore {
     try {
       fs = FileSystem.get(config);
       FSDataOutputStream out = fs.create(profileFile,true);
-      out.write(profile.deserialize().toString().getBytes());
+      out.write(profile.deserialize().toString().getBytes(Charset.forName("UTF-8")));
       out.close();
     } catch (IOException ex) {
       log.error(ExceptionUtil.getStackTrace(ex));
@@ -138,7 +142,7 @@ public class UserStore {
           profileStream.readFully(buffer);
           profileStream.close();
           try {
-            UserBean user = new UserBean((JSONObject) JSONValue.parse(new String(buffer)));
+            UserBean user = new UserBean((JSONObject) JSONValue.parse(new String(buffer, Charset.forName("UTF-8"))));
             list.add(user.getId());
           } catch (Exception e) {
             log.error(ExceptionUtil.getStackTrace(e));

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datastore/ViewStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/ViewStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/ViewStore.java
index 9db639f..258300c 100755
--- a/src/main/java/org/apache/hadoop/chukwa/datastore/ViewStore.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datastore/ViewStore.java
@@ -20,13 +20,12 @@ package org.apache.hadoop.chukwa.datastore;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.hicc.HiccWebServer;
 import org.apache.hadoop.chukwa.rest.bean.ViewBean;
@@ -44,15 +43,18 @@ public class ViewStore {
   private String uid = null;
   private ViewBean view = null;
   private static Log log = LogFactory.getLog(ViewStore.class);
-  private static Configuration config = new Configuration();
+  private static Configuration config = null;
   private static ChukwaConfiguration chukwaConf = new ChukwaConfiguration();
-  private static String viewPath = config.get("fs.defaultFS")+File.separator+chukwaConf.get("chukwa.data.dir")+File.separator+"hicc"+File.separator+"views";
+  private static String viewPath = null;
   private static String publicViewPath = viewPath+File.separator+"public";
   private static String usersViewPath = viewPath+File.separator+"users";
   private static String PUBLIC = "public".intern();
 
+  static {
+    config = HiccWebServer.getConfig();
+    viewPath = config.get("fs.defaultFS")+File.separator+chukwaConf.get("chukwa.data.dir")+File.separator+"hicc"+File.separator+"views";
+  }
   public ViewStore() throws IllegalAccessException {
-    ViewStore.config = HiccWebServer.getConfig();
   }
 
   public ViewStore(String uid, String vid) throws IllegalAccessException {
@@ -141,7 +143,7 @@ public class ViewStore {
         try {
           FileSystem fs = FileSystem.get(config);
           FSDataOutputStream out = fs.create(viewFile,true);
-          out.write(view.deserialize().toString().getBytes());
+          out.write(view.deserialize().toString().getBytes(Charset.forName("UTF-8")));
           out.close();
         } catch (IOException ex) {
           log.error(ExceptionUtil.getStackTrace(ex));

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datastore/WidgetStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/WidgetStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/WidgetStore.java
index 10343d5..9512824 100755
--- a/src/main/java/org/apache/hadoop/chukwa/datastore/WidgetStore.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datastore/WidgetStore.java
@@ -20,13 +20,13 @@ package org.apache.hadoop.chukwa.datastore;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.HashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
-
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.hicc.HiccWebServer;
 import org.apache.hadoop.chukwa.rest.bean.CatalogBean;
@@ -43,12 +43,15 @@ public class WidgetStore {
   private static Log log = LogFactory.getLog(WidgetStore.class);
   private static Configuration config = new Configuration();
   private static ChukwaConfiguration chukwaConf = new ChukwaConfiguration();
-  private static String hiccPath = config.get("fs.defaultFS")+File.separator+chukwaConf.get("chukwa.data.dir")+File.separator+"hicc"+File.separator+"widgets";
+  private static String hiccPath = null;
   private static CatalogBean catalog = null;
   private static HashMap<String, WidgetBean> list = new HashMap<String, WidgetBean>();
   
+  static {
+    config = HiccWebServer.getConfig();
+    hiccPath = config.get("fs.defaultFS")+File.separator+chukwaConf.get("chukwa.data.dir")+File.separator+"hicc"+File.separator+"widgets";
+  }
   public WidgetStore() throws IllegalAccessException {
-    WidgetStore.config = HiccWebServer.getConfig();
   }
 
   public void set(WidgetBean widget) throws IllegalAccessException {
@@ -94,7 +97,7 @@ public class WidgetStore {
           widgetStream.readFully(buffer);
           widgetStream.close();
           try {
-            JSONObject widgetBuffer = (JSONObject) JSONValue.parse(new String(buffer));
+            JSONObject widgetBuffer = (JSONObject) JSONValue.parse(new String(buffer, Charset.forName("UTF-8")));
             WidgetBean widget = new WidgetBean(widgetBuffer);
             catalog.addCatalog(widget);
             list.put(widget.getId(),widget);

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java b/src/main/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java
index 7bf451e..db71668 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java
@@ -30,8 +30,11 @@ import java.io.OutputStreamWriter;
 import java.net.URL;
 import java.net.HttpURLConnection;
 import java.net.MalformedURLException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.HashMap;
+import java.util.Map.Entry;
 
 /**
  * Trigger action that makes an HTTP request when executed.
@@ -129,14 +132,16 @@ public class HttpTriggerAction implements TriggerAction {
     // set headers
     boolean contentLengthExists = false;
     if (headers != null) {
-      for (String name: headers.keySet()) {
+      for(Entry<String, String> entry : headers.entrySet()) {
+        String name = entry.getKey();
+        String value = entry.getValue();
         if (log.isDebugEnabled()) {
-          log.debug("Setting header " + name + ": " + headers.get(name));
+          log.debug("Setting header " + name + ": " + value);
         }
         if (name.equalsIgnoreCase("content-length")) {
           contentLengthExists = true;
         }
-        conn.setRequestProperty(name, headers.get(name));
+        conn.setRequestProperty(name, value);
       }
     }
 
@@ -149,7 +154,7 @@ public class HttpTriggerAction implements TriggerAction {
     // send body if it exists
     if (body != null) {
       conn.setDoOutput(true);
-      OutputStreamWriter writer = new OutputStreamWriter(conn.getOutputStream());
+      OutputStreamWriter writer = new OutputStreamWriter(conn.getOutputStream(), Charset.forName("UTF-8"));
       writer.write(body);
       writer.flush();
       writer.close();
@@ -169,7 +174,7 @@ public class HttpTriggerAction implements TriggerAction {
     }
     else {
       BufferedReader reader = new BufferedReader(
-                                new InputStreamReader(conn.getInputStream()));
+                                new InputStreamReader(conn.getInputStream(), Charset.forName("UTF-8")));
       String line;
       StringBuilder sb = new StringBuilder();
       while ((line = reader.readLine()) != null) {
@@ -215,7 +220,7 @@ public class HttpTriggerAction implements TriggerAction {
       for (String header : headersSplit) {
         String[] nvp = header.split(":", 2);
         if (nvp.length < 2) {
-          log.error("Invalid HTTP header found: " + nvp);
+          log.error("Invalid HTTP header found: " + Arrays.toString(nvp));
           continue;
         }
         headerMap.put(nvp[0].trim(), nvp[1].trim());

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java b/src/main/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
index 1ef6c00..6cb0dc1 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java
@@ -33,7 +33,7 @@ import org.apache.log4j.Logger;
 
 public class ChukwaArchiveManager implements CHUKWA_CONSTANT {
   static Logger log = Logger.getLogger(ChukwaArchiveManager.class);
-  static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd");
+  SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd");
   
   static final  int ONE_HOUR = 60 * 60 * 1000;
   static final int ONE_DAY = 24*ONE_HOUR;
@@ -113,7 +113,7 @@ public class ChukwaArchiveManager implements CHUKWA_CONSTANT {
         if (maxPermittedErrorCount != -1 && errorCount >= maxPermittedErrorCount) {
           log.warn("==================\nToo many errors (" + errorCount +
                    "), Bail out!\n==================");
-          System.exit(-1);
+          break;
         }
         // /chukwa/archives/<YYYYMMDD>/dataSinkDirXXX
         //  to

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java
index d1e2b24..bebd1e5 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java
@@ -165,7 +165,7 @@ public class DailyChukwaRecordRolling extends Configured implements Tool {
               new DailyChukwaRecordRolling(), mergeArgs, deleteRawdata);
           List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
           if (rollInSequence) {
-            merge.run();
+            merge.mergeRecords();
           } else {
             allMerge.add(merge);
             merge.start();

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
index be63e16..71ac1f7 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
@@ -61,16 +61,28 @@ import org.apache.log4j.Logger;
 
 public class Demux extends Configured implements Tool {
   static Logger log = Logger.getLogger(Demux.class);
-  static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
   public static Configuration jobConf = null;
+  protected static void setJobConf(JobConf jobConf) {
+    Demux.jobConf = jobConf;
+  }
+
+  protected Configuration getJobConf() {
+    return Demux.jobConf;
+  }
 
   public static class MapClass extends MapReduceBase implements
           Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> {
 
+    private Configuration jobConf = null;
+
     @Override
     public void configure(JobConf jobConf) {
       super.configure(jobConf);
-      Demux.jobConf = jobConf;
+      setJobConf(jobConf);
+    }
+
+    private void setJobConf(JobConf jobConf) {
+      this.jobConf = jobConf;
     }
 
     public void map(ChukwaArchiveKey key, ChunkImpl chunk,
@@ -82,15 +94,15 @@ public class Demux extends Configured implements Tool {
       try {
         long duration = System.currentTimeMillis();
         if (log.isDebugEnabled()) {
-          log.debug("Entry: [" + chunk.getData() + "] EventType: ["
+          log.debug("Entry: [" + String.valueOf(chunk.getData()) + "] EventType: ["
                   + chunk.getDataType() + "]");
         }
 
-        String defaultProcessor = Demux.jobConf.get(
+        String defaultProcessor = jobConf.get(
                 "chukwa.demux.mapper.default.processor",
                 "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
 
-        String processorClass_pri = Demux.jobConf.get(chunk.getDataType(),
+        String processorClass_pri = jobConf.get(chunk.getDataType(),
                 defaultProcessor);
 
         String processorClass = processorClass_pri.split(",")[0];
@@ -125,9 +137,11 @@ public class Demux extends Configured implements Tool {
   public static class ReduceClass extends MapReduceBase implements
           Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord> {
 
+    private Configuration jobConf = null;
+
     public void configure(JobConf jobConf) {
       super.configure(jobConf);
-      Demux.jobConf = jobConf;
+      this.jobConf = jobConf;
     }
 
     public void reduce(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
@@ -143,10 +157,10 @@ public class Demux extends Configured implements Tool {
 
         String defaultProcessor_classname = "org.apache.hadoop.chukwa.extraction.demux.processor.reducer" +
                 ".IdentityReducer";
-        String defaultProcessor = Demux.jobConf.get("chukwa.demux.reducer.default.processor",
+        String defaultProcessor = jobConf.get("chukwa.demux.reducer.default.processor",
                 "," + defaultProcessor_classname);
 
-        String processClass_pri = Demux.jobConf.get(key.getReduceType(), defaultProcessor);
+        String processClass_pri = jobConf.get(key.getReduceType(), defaultProcessor);
         String[] processClass_tmps = processClass_pri.split(",");
         String processClass = null;
         if (processClass_tmps.length != 2)
@@ -199,7 +213,7 @@ public class Demux extends Configured implements Tool {
   public int run(String[] args) throws Exception {
     JobConf conf = new JobConf(new ChukwaConfiguration(), Demux.class);
 
-
+    SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
     conf.setJobName("Chukwa-Demux_" + day.format(new Date()));
     conf.setInputFormat(SequenceFileInputFormat.class);
     conf.setMapperClass(Demux.MapClass.class);

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
index 8fd155e..9fcb65b 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java
@@ -38,8 +38,8 @@ import org.apache.log4j.Logger;
 public class DemuxManager implements CHUKWA_CONSTANT {  
   static Logger log = Logger.getLogger(DemuxManager.class);
 
-  static int globalErrorcounter = 0;
-  static Date firstErrorTime = null;
+  int globalErrorcounter = 0;
+  Date firstErrorTime = null;
 
   protected int ERROR_SLEEP_TIME = 60;
   protected int NO_DATASINK_SLEEP_TIME = 20;
@@ -144,7 +144,7 @@ public class DemuxManager implements CHUKWA_CONSTANT {
          + nagiosPort + ", reportingHost:" + reportingHost);
      
      
-     if (nagiosHost == null || nagiosHost.length() == 0 || nagiosPort == 0 || reportingHost.length() == 0 || reportingHost == null) {
+     if (nagiosHost == null || nagiosHost.length() == 0 || nagiosPort == 0 || reportingHost == null || reportingHost.length() == 0) {
        sendAlert = false;
        log.warn("Alerting is OFF");
      }
@@ -159,7 +159,7 @@ public class DemuxManager implements CHUKWA_CONSTANT {
          if (maxPermittedErrorCount != -1 && globalErrorcounter >= maxPermittedErrorCount) {
            log.warn("==================\nToo many errors (" + globalErrorcounter +
                     "), Bail out!\n==================");
-           System.exit(-1);
+           break;
          }
          
          // Check for anomalies

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java
index c8f2799..b59b229 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java
@@ -121,7 +121,7 @@ public class HourlyChukwaRecordRolling extends Configured implements Tool {
               new HourlyChukwaRecordRolling(), mergeArgs, deleteRawdata);
           List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
           if (rollInSequence) {
-            merge.run();
+            merge.mergeRecords();
           } else {
             allMerge.add(merge);
             merge.start();

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
index 9685471..df1ff88 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
@@ -28,7 +28,17 @@ import java.util.List;
 
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.dataloader.DataLoaderFactory;
-import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.HDFS_DEFAULT_NAME_FIELD;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_ROOT_REPOS_DIR_FIELD;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.DEFAULT_REPOS_DIR_NAME;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.DEFAULT_POSTPROCESS_IN_ERROR_DIR_NAME;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_POSTPROCESS_MAX_ERROR_COUNT_FIELD;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.POST_DEMUX_DATA_LOADER;
+import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.POST_DEMUX_SUCCESS_ACTION;
 import org.apache.hadoop.chukwa.util.ExceptionUtil;
 import org.apache.hadoop.chukwa.util.HierarchyDataType;
 import org.apache.hadoop.chukwa.datatrigger.TriggerAction;
@@ -39,11 +49,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.log4j.Logger;
 
-public class PostProcessorManager implements CHUKWA_CONSTANT{
+public class PostProcessorManager {
   static Logger log = Logger.getLogger(PostProcessorManager.class);
   
-  protected static HashMap<String, String> dataSources = new HashMap<String, String>();
-  public static int errorCount = 0;
+  protected HashMap<String, String> dataSources = new HashMap<String, String>();
+  protected int errorCount = 0;
   
   protected int ERROR_SLEEP_TIME = 60;
   protected ChukwaConfiguration conf = null;

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java
index 4b26e45..363016b 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java
@@ -43,12 +43,16 @@ public class RecordMerger extends Thread {
     this.conf = conf;
     this.fs = fs;
     this.tool = tool;
-    this.mergeArgs = mergeArgs;
+    this.mergeArgs = mergeArgs.clone();
     this.deleteRawData = deleteRawData;
   }
 
   @Override
   public void run() {
+    mergeRecords();
+  }
+
+  void mergeRecords() {
     System.out.println("\t Running Merge! : output [" + mergeArgs[1] + "]");
     int res;
     try {

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java
index f11b727..72574eb 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java
@@ -19,7 +19,9 @@
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
 
+import java.nio.charset.Charset;
 import java.util.Calendar;
+
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
@@ -121,7 +123,7 @@ public abstract class AbstractProcessor implements MapProcessor {
 
   protected String nextLine() {
     String log = new String(bytes, startOffset, (recordOffsets[currentPos]
-        - startOffset + 1));
+        - startOffset + 1), Charset.forName("UTF-8"));
     startOffset = recordOffsets[currentPos] + 1;
     currentPos++;
     return RecordConstants.recoverRecordSeparators("\n", log);

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java
index 61ba28c..90a561c 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java
@@ -19,7 +19,9 @@
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
 
+import java.nio.charset.Charset;
 import java.util.Calendar;
+
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
@@ -60,7 +62,7 @@ public class ChunkSaver {
       DataOutputBuffer ob = new DataOutputBuffer(chunk
           .getSerializedSizeEstimate());
       chunk.write(ob);
-      record.add(Record.chunkDataField, new String(ob.getData()));
+      record.add(Record.chunkDataField, new String(ob.getData(), Charset.forName("UTF-8")));
       record.add(Record.chunkExceptionField, ExceptionUtil
           .getStackTrace(throwable));
       output.collect(key, record);
@@ -73,7 +75,7 @@ public class ChunkSaver {
             + " - source:" + chunk.getSource() + " - dataType: "
             + chunk.getDataType() + " - Stream: " + chunk.getStreamName()
             + " - SeqId: " + chunk.getSeqID() + " - Data: "
-            + new String(chunk.getData()));
+            + new String(chunk.getData(), Charset.forName("UTF-8")));
       } catch (Throwable e1) {
         e.printStackTrace();
       }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ClientTraceProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ClientTraceProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ClientTraceProcessor.java
index f249512..afc78ed 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ClientTraceProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ClientTraceProcessor.java
@@ -130,7 +130,7 @@ public class ClientTraceProcessor extends AbstractProcessor {
       rec.add(Record.tagsField, chunk.getTags());
       rec.add(Record.sourceField, chunk.getSource());
       rec.add(Record.applicationField, chunk.getStreamName());
-      rec.add("actual_time",(new Long(ms_fullresolution)).toString());
+      rec.add("actual_time",Long.toString(ms_fullresolution));
       output.collect(key, rec);
 
     } catch (ParseException e) {

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java
index 4e5765d..85db7fc 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java
@@ -118,11 +118,13 @@ public class DatanodeProcessor extends AbstractProcessor {
       } else {
         timeStamp = Long.parseLong(ttTag);
       }
-      Iterator<String> keys = obj.keySet().iterator();
+      @SuppressWarnings("unchecked")
+      Iterator<Map.Entry<String, ?>> keys = obj.entrySet().iterator();
 
       while (keys.hasNext()) {
-        String key = keys.next();
-        Object value = obj.get(key);
+        Map.Entry<String, ?> entry = keys.next();
+        String key = entry.getKey();
+        Object value = entry.getValue();
         String valueString = value == null ? "" : value.toString();
 
         // Calculate rate for some of the metrics

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java
index c04e752..924f6aa 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
+import java.nio.charset.Charset;
 import java.util.Calendar;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -67,11 +68,13 @@ public class HBaseMasterProcessor extends AbstractProcessor {
       } else {
         timeStamp = Long.parseLong(ttTag);
       }
-      Iterator<String> keys = obj.keySet().iterator();
+      @SuppressWarnings("unchecked")
+      Iterator<Map.Entry<String, ?>> keys = obj.entrySet().iterator();
 
       while (keys.hasNext()) {
-        String key = keys.next();
-        Object value = obj.get(key);
+        Map.Entry<String, ?> entry = keys.next();
+        String key = entry.getKey();
+        Object value = entry.getValue();
         String valueString = value == null ? "" : value.toString();
 
         // Calculate rate for some of the metrics
@@ -88,7 +91,7 @@ public class HBaseMasterProcessor extends AbstractProcessor {
           valueString = Long.toString(newValue);
         }
 
-        Buffer b = new Buffer(valueString.getBytes());
+        Buffer b = new Buffer(valueString.getBytes(Charset.forName("UTF-8")));
         metricsMap.put(key, b);
       }
 

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java
index 8fab057..6ea0169 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
+import java.nio.charset.Charset;
 import java.util.Calendar;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -60,13 +61,15 @@ public class HBaseRegionServerProcessor extends AbstractProcessor {
       } else {
         timeStamp = Long.parseLong(ttTag);
       }
-      Iterator<String> keys = obj.keySet().iterator();
+      @SuppressWarnings("unchecked")
+      Iterator<Map.Entry<String, ?>> keys = obj.entrySet().iterator();
 
       while (keys.hasNext()) {
-        String key = keys.next();
-        Object value = obj.get(key);
+        Map.Entry<String, ?> entry = keys.next();
+        String key = entry.getKey();
+        Object value = entry.getValue();
         String valueString = value == null ? "" : value.toString();
-        Buffer b = new Buffer(valueString.getBytes());
+        Buffer b = new Buffer(valueString.getBytes(Charset.forName("UTF-8")));
         metricsMap.put(key, b);
       }
 

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
index f671049..8351e84 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
@@ -25,6 +25,7 @@ import java.text.SimpleDateFormat;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.Iterator;
+import java.util.Map;
 
 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
@@ -91,24 +92,26 @@ public class HadoopMetricsProcessor extends AbstractProcessor {
       String contextName = null;
       String recordName = null;
 
-      Iterator<String> ki = json.keySet().iterator();
+      Iterator<Map.Entry<String, ?>> ki = json.entrySet().iterator();
       while (ki.hasNext()) {
-        String keyName = ki.next();
+        Map.Entry<String, ?> entry = ki.next();
+        String keyName = entry.getKey();
+        Object value = entry.getValue();
         if (chukwaTimestampField.intern() == keyName.intern()) {
-          d = new Date((Long) json.get(keyName));
+          d = new Date((Long) value);
           Calendar cal = Calendar.getInstance();
           cal.setTimeInMillis(d.getTime());
           cal.set(Calendar.SECOND, 0);
           cal.set(Calendar.MILLISECOND, 0);
           d.setTime(cal.getTimeInMillis());
         } else if (contextNameField.intern() == keyName.intern()) {
-          contextName = (String) json.get(keyName);
+          contextName = (String) value;
         } else if (recordNameField.intern() == keyName.intern()) {
-          recordName = (String) json.get(keyName);
-          record.add(keyName, json.get(keyName).toString());
+          recordName = (String) value;
+          record.add(keyName, value.toString());
         } else {
           if(json.get(keyName)!=null) {
-            record.add(keyName, json.get(keyName).toString());
+            record.add(keyName, value.toString());
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java
index 7e2e4e2..b910165 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
 import java.io.File;
 import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.Calendar;
 import java.util.Random;
 import java.util.regex.Matcher;
@@ -78,13 +80,14 @@ public class JobConfProcessor extends AbstractProcessor {
         = DocumentBuilderFactory.newInstance();
       //ignore all comments inside the xml file
       docBuilderFactory.setIgnoringComments(true);
+      FileOutputStream out = null;
       try {
           DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
           Document doc = null;
           String fileName = "test_"+randomNumber.nextInt();
           File tmp = new File(fileName);
-          FileOutputStream out = new FileOutputStream(tmp);
-          out.write(recordEntry.getBytes());
+          out = new FileOutputStream(tmp);
+          out.write(recordEntry.getBytes(Charset.forName("UTF-8")));
           out.close();
         doc = builder.parse(fileName);
         Element root = doc.getDocumentElement();
@@ -139,10 +142,15 @@ public class JobConfProcessor extends AbstractProcessor {
         buildGenericRecord(jobConfRecord, null, time, jobConfData);
         output.collect(key, jobConfRecord);
             
-        tmp.delete();
-      } catch(Exception e) {
-          e.printStackTrace();  
-          throw e;
+        if(!tmp.delete()) {
+          log.warn(tmp.getAbsolutePath() + " cannot be deleted.");
+        }
+      } catch(IOException e) {
+        if(out != null) {
+          out.close();
+        }
+        e.printStackTrace();  
+        throw e;
       }
   }
   

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java
index 5a2a851..fdd51f2 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map.Entry;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -38,13 +39,12 @@ public class JobLogHistoryProcessor extends AbstractProcessor {
   static Logger log = Logger.getLogger(JobLogHistoryProcessor.class);
 
   private static final String recordType = "JobLogHistory";
-  private static String internalRegex = null;
-  private static Pattern ip = null;
+  private static final String internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
+  private Pattern ip = null;
 
   private Matcher internalMatcher = null;
 
   public JobLogHistoryProcessor() {
-    internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
     ip = Pattern.compile(internalRegex);
     internalMatcher = ip.matcher("-");
   }
@@ -331,10 +331,8 @@ public class JobLogHistoryProcessor extends AbstractProcessor {
           record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
         }
 
-        Iterator<String> it = keys.keySet().iterator();
-        while (it.hasNext()) {
-          String field = it.next();
-          record.add(field, keys.get(field));
+        for(Entry<String, String> entry : keys.entrySet()) {
+          record.add(entry.getKey(), entry.getValue());
         }
 
         output.collect(key, record);

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java
index c2f7b52..d42c329 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java
@@ -22,6 +22,8 @@ import java.util.Calendar;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.TimeZone;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -113,12 +115,9 @@ public class JobTrackerProcessor extends AbstractProcessor {
       } else {
         timeStamp = Long.parseLong(ttTag);
       }
-      Iterator<String> keys = obj.keySet().iterator();
-
-      while (keys.hasNext()) {
-        String key = keys.next();
-        Object value = obj.get(key);
-        String valueString = value == null ? "" : value.toString();
+      for(Entry<String, Object> entry : (Set<Entry<String, Object>>) obj.entrySet()) {
+        String key = entry.getKey();
+        String valueString = entry.getValue().toString();
 
         // Calculate rate for some of the metrics
         if (rateMap.containsKey(key)) {

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java
index 79291a1..3962628 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java
@@ -19,13 +19,15 @@
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
 import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
-
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
 
@@ -67,10 +69,9 @@ public class Log4JMetricsContextProcessor extends AbstractProcessor {
         recordType += "_" + recordName;
       }
 
-      Iterator<String> ki = json.keySet().iterator();
-      while (ki.hasNext()) {
-        String key = ki.next();
-        String value = String.valueOf(json.get(key));
+      for(Entry<String, Object> entry : (Set<Map.Entry>) json.entrySet()) {
+        String key = entry.getKey();
+        String value = String.valueOf(entry.getValue());
         if(value != null) {
           chukwaRecord.add(key, value);
         }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java
index 272980a..2cb0980 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java
@@ -34,13 +34,12 @@ public class Log4jJobHistoryProcessor extends AbstractProcessor {
   static Logger log = Logger.getLogger(Log4jJobHistoryProcessor.class);
 
   private static final String recordType = "JobLogHistory";
-  private static String internalRegex = null;
-  private static Pattern ip = null;
+  private static String internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
+  private Pattern ip = null;
 
   private Matcher internalMatcher = null;
 
   public Log4jJobHistoryProcessor() {
-    internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
     ip = Pattern.compile(internalRegex);
     internalMatcher = ip.matcher("-");
   }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java
index 400bd78..5b75939 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java
@@ -22,7 +22,7 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 
 public class LogEntry {
-	private final static SimpleDateFormat sdf = new SimpleDateFormat(
+	private SimpleDateFormat sdf = new SimpleDateFormat(
 			"yyyy-MM-dd HH:mm");
 
 	private Date date;
@@ -43,11 +43,11 @@ public class LogEntry {
 	}
 
 	public Date getDate() {
-		return date;
+		return (Date) date.clone();
 	}
 
 	public void setDate(Date date) {
-		this.date = date;
+		this.date = (Date) date.clone();
 	}
 
 	public String getLogLevel() {

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java
index 1e6e9d7..075ab5c 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java
@@ -131,11 +131,13 @@ public class NamenodeProcessor extends AbstractProcessor {
       } else {
         timeStamp = Long.parseLong(ttTag);
       }
-      Iterator<String> keys = obj.keySet().iterator();
+      @SuppressWarnings("unchecked")
+      Iterator<Map.Entry<String, ?>> keys = obj.entrySet().iterator();
 
       while (keys.hasNext()) {
-        String key = keys.next();
-        Object value = obj.get(key);
+        Map.Entry<String, ?> entry = keys.next();
+        String key = entry.getKey();
+        Object value = entry.getValue();
         String valueString = (value == null) ? "" : value.toString();
 
         // These metrics are string types with JSON structure. So we parse them

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java
index 4c643a2..d38673c 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java
@@ -49,13 +49,6 @@ public class SysLog extends AbstractProcessor {
       throws Throwable {
     try {
       String dStr = recordEntry.substring(0, 15);
-      int start = 15;
-      int idx = recordEntry.indexOf(' ', start);
-      start = idx + 1;
-      idx = recordEntry.indexOf(' ', start);
-      String body = recordEntry.substring(idx + 1);
-      body = body.replaceAll("\n", "");
-
       Calendar convertDate = Calendar.getInstance();
       Date d = sdf.parse(dStr);
       int year = convertDate.get(Calendar.YEAR);

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SystemMetrics.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SystemMetrics.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SystemMetrics.java
index e293543..3fa1816 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SystemMetrics.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SystemMetrics.java
@@ -24,6 +24,7 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
 import java.util.Calendar;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.TimeZone;
 
 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
@@ -69,14 +70,17 @@ public class SystemMetrics extends AbstractProcessor {
     	  continue;
       }
       actualSize++;
-      Iterator<String> keys = cpu.keySet().iterator();
       combined = combined + Double.parseDouble(cpu.get("combined").toString());
       user = user + Double.parseDouble(cpu.get("user").toString());
       sys = sys + Double.parseDouble(cpu.get("sys").toString());
       idle = idle + Double.parseDouble(cpu.get("idle").toString());
+      @SuppressWarnings("unchecked")
+      Iterator<Map.Entry<String, ?>> keys = cpu.entrySet().iterator();
       while(keys.hasNext()) {
-        String key = keys.next();
-        record.add(key + "." + i, cpu.get(key).toString());
+        Map.Entry<String, ?> entry = keys.next();
+        String key = entry.getKey();
+        Object value = entry.getValue();
+        record.add(key + "." + i, value.toString());
       }
     }
     combined = combined / actualSize;
@@ -101,20 +105,26 @@ public class SystemMetrics extends AbstractProcessor {
 
     record = new ChukwaRecord();
     JSONObject memory = (JSONObject) json.get("memory");
-    Iterator<String> memKeys = memory.keySet().iterator();
+    @SuppressWarnings("unchecked")
+    Iterator<Map.Entry<String, ?>> memKeys = memory.entrySet().iterator();
     while(memKeys.hasNext()) {
-      String key = memKeys.next();
-      record.add(key, memory.get(key).toString());
+      Map.Entry<String, ?> entry = memKeys.next();
+      String key = entry.getKey();
+      Object value = entry.getValue();
+      record.add(key, value.toString());
     }
     buildGenericRecord(record, null, cal.getTimeInMillis(), "memory");
     output.collect(key, record);    
 
     record = new ChukwaRecord();
     JSONObject swap = (JSONObject) json.get("swap");
-    Iterator<String> swapKeys = swap.keySet().iterator();
+    @SuppressWarnings("unchecked")
+    Iterator<Map.Entry<String, ?>> swapKeys = swap.entrySet().iterator();
     while(swapKeys.hasNext()) {
-      String key = swapKeys.next();
-      record.add(key, swap.get(key).toString());
+      Map.Entry<String, ?> entry = swapKeys.next();
+      String key = entry.getKey();
+      Object value = entry.getValue();
+      record.add(key, value.toString());
     }
     buildGenericRecord(record, null, cal.getTimeInMillis(), "swap");
     output.collect(key, record);
@@ -131,25 +141,28 @@ public class SystemMetrics extends AbstractProcessor {
     JSONArray netList = (JSONArray) json.get("network");
     for(int i = 0;i < netList.size(); i++) {
       JSONObject netIf = (JSONObject) netList.get(i);
-      Iterator<String> keys = netIf.keySet().iterator();
+      @SuppressWarnings("unchecked")
+      Iterator<Map.Entry<String, ?>> keys = netIf.entrySet().iterator();
       while(keys.hasNext()) {
-        String key = keys.next();
-        record.add(key + "." + i, netIf.get(key).toString());
+        Map.Entry<String, ?> entry = keys.next();
+        String key = entry.getKey();
+        Object value = entry.getValue();
+        record.add(key + "." + i, value.toString());
         if(i!=0) {
           if(key.equals("RxBytes")) {
-            rxBytes = rxBytes + (Long) netIf.get(key);
+            rxBytes = rxBytes + (Long) value;
           } else if(key.equals("RxDropped")) {
-            rxDropped = rxDropped + (Long) netIf.get(key);
+            rxDropped = rxDropped + (Long) value;
           } else if(key.equals("RxErrors")) {          
-            rxErrors = rxErrors + (Long) netIf.get(key);
+            rxErrors = rxErrors + (Long) value;
           } else if(key.equals("RxPackets")) {
-            rxPackets = rxPackets + (Long) netIf.get(key);
+            rxPackets = rxPackets + (Long) value;
           } else if(key.equals("TxBytes")) {
-            txBytes = txBytes + (Long) netIf.get(key);
+            txBytes = txBytes + (Long) value;
           } else if(key.equals("TxCollisions")) {
-            txCollisions = txCollisions + (Long) netIf.get(key);
+            txCollisions = txCollisions + (Long) value;
           } else if(key.equals("TxErrors")) {
-            txErrors = txErrors + (Long) netIf.get(key);
+            txErrors = txErrors + (Long) value;
           } else if(key.equals("TxPackets")) {
             txPackets = txPackets + (Long) netIf.get(key);
           }
@@ -177,22 +190,25 @@ public class SystemMetrics extends AbstractProcessor {
     JSONArray diskList = (JSONArray) json.get("disk");
     for(int i = 0;i < diskList.size(); i++) {
       JSONObject disk = (JSONObject) diskList.get(i);
-      Iterator<String> keys = disk.keySet().iterator();
+      @SuppressWarnings("unchecked")
+      Iterator<Map.Entry<String, ?>> keys = disk.entrySet().iterator();
       while(keys.hasNext()) {
-        String key = keys.next();
-        record.add(key + "." + i, disk.get(key).toString());
+        Map.Entry<String, ?> entry = keys.next();
+        String key = entry.getKey();
+        Object value = entry.getValue();
+        record.add(key + "." + i, value.toString());
         if(key.equals("ReadBytes")) {
-          readBytes = readBytes + (Long) disk.get("ReadBytes");
+          readBytes = readBytes + (Long) value;
         } else if(key.equals("Reads")) {
-          reads = reads + (Long) disk.get("Reads");
+          reads = reads + (Long) value;
         } else if(key.equals("WriteBytes")) {
-          writeBytes = writeBytes + (Long) disk.get("WriteBytes");
+          writeBytes = writeBytes + (Long) value;
         } else if(key.equals("Writes")) {
-          writes = writes + (Long) disk.get("Writes");
+          writes = writes + (Long) value;
         }  else if(key.equals("Total")) {
-          total = total + (Long) disk.get("Total");
+          total = total + (Long) value;
         } else if(key.equals("Used")) {
-          used = used + (Long) disk.get("Used");
+          used = used + (Long) value;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java
index 417fbb5..fe050ed 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java
@@ -77,11 +77,13 @@ public class ZookeeperProcessor extends AbstractProcessor {
       } else {
         timeStamp = Long.parseLong(ttTag);
       }
-      Iterator<String> keys = ((JSONObject) obj).keySet().iterator();
+      @SuppressWarnings("unchecked")
+      Iterator<Map.Entry<String, ?>> keys = ((JSONObject) obj).entrySet().iterator();
 
       while (keys.hasNext()) {
-        String key = keys.next();
-        Object value = obj.get(key);
+        Map.Entry<String, ?> entry = keys.next();
+        String key = entry.getKey();
+        Object value = entry.getValue();
         String valueString = value == null ? "" : value.toString();
 
         if (metricsMap.containsKey(key)) {

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ClientTrace.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ClientTrace.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ClientTrace.java
index 4002c6c..5c658dc 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ClientTrace.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ClientTrace.java
@@ -48,7 +48,7 @@ public class ClientTrace implements ReduceProcessor {
       while (values.hasNext()) {
         /* aggregate bytes for current key */
         rec = values.next();
-        bytes += Long.valueOf(rec.getValue("bytes"));
+        bytes += Long.parseLong(rec.getValue("bytes"));
         
         /* output raw values to different data type for uses which
          * require detailed per-operation data */
@@ -70,7 +70,7 @@ public class ClientTrace implements ReduceProcessor {
 
       String[] k = key.getKey().split("/");
       emit.add(k[1] + "_" + k[2], String.valueOf(bytes));
-      emit.setTime(Long.valueOf(k[3]));
+      emit.setTime(Long.parseLong(k[3]));
       output.collect(key, emit);
 
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java
index 5e8814c..c5050f2 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
+
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.chukwa.extraction.engine.Record;
@@ -73,10 +75,12 @@ public class MRJobReduceProcessor implements ReduceProcessor {
       newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
       newRecord.setTime(initTime);
       newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
-      Iterator<String> it = data.keySet().iterator();
+      Iterator<Map.Entry<String, String>> it = data.entrySet().iterator();
       while (it.hasNext()) {
-        String field = it.next();
-        newRecord.add(field, data.get(field));
+        Map.Entry<String, ?> entry = it.next();
+        String field = entry.getKey();
+        String value = entry.getValue().toString();
+        newRecord.add(field, value);
       }
 
       output.collect(newKey, newRecord);

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java
index 4fdb365..a884cc1 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java
@@ -19,10 +19,12 @@
 package org.apache.hadoop.chukwa.extraction.engine;
 
 
+import java.nio.charset.Charset;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+
 import org.apache.hadoop.record.Buffer;
 
 public class ChukwaRecord extends ChukwaRecordJT implements Record {
@@ -32,10 +34,10 @@ public class ChukwaRecord extends ChukwaRecordJT implements Record {
   public void add(String key, String value) {
     synchronized (this) {
       if (this.mapFields == null) {
-        this.mapFields = new TreeMap<String, org.apache.hadoop.record.Buffer>();
+        this.mapFields = new TreeMap<String, Buffer>();
       }
     }
-    this.mapFields.put(key, new Buffer(value.getBytes()));
+    this.mapFields.put(key, new Buffer(value.getBytes(Charset.forName("UTF-8"))));
   }
 
   public String[] getFields() {
@@ -44,7 +46,7 @@ public class ChukwaRecord extends ChukwaRecordJT implements Record {
 
   public String getValue(String field) {
     if (this.mapFields.containsKey(field)) {
-      return new String(this.mapFields.get(field).get());
+      return new String(this.mapFields.get(field).get(), Charset.forName("UTF-8"));
     } else {
       return null;
     }
@@ -77,7 +79,7 @@ public class ChukwaRecord extends ChukwaRecordJT implements Record {
     while (it.hasNext()) {
       entry = it.next();
       key = entry.getKey().intern();
-      val = new String(entry.getValue().get());
+      val = new String(entry.getValue().get(), Charset.forName("UTF-8"));
       if (key.intern() == Record.bodyField.intern()) {
         hasBody = true;
         bodyVal = val;

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordJT.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordJT.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordJT.java
index b2660a2..04cca36 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordJT.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordJT.java
@@ -19,8 +19,11 @@
 // File generated by hadoop record compiler. Do not edit.
 package org.apache.hadoop.chukwa.extraction.engine;
 
+import java.io.Serializable;
 
-public class ChukwaRecordJT extends org.apache.hadoop.record.Record {
+
+public class ChukwaRecordJT extends org.apache.hadoop.record.Record implements Serializable {
+  private static final long serialVersionUID = 15015L;
   private static final org.apache.hadoop.record.meta.RecordTypeInfo _rio_recTypeInfo;
   private static org.apache.hadoop.record.meta.RecordTypeInfo _rio_rtiFilter;
   private static int[] _rio_rtiFilterFields;
@@ -236,6 +239,7 @@ public class ChukwaRecordJT extends org.apache.hadoop.record.Record {
   }
 
   public Object clone() throws CloneNotSupportedException {
+    super.clone();
     ChukwaRecordJT _rio_other = new ChukwaRecordJT();
     _rio_other.time = this.time;
     _rio_other.mapFields = (java.util.TreeMap<String, org.apache.hadoop.record.Buffer>) this.mapFields
@@ -258,7 +262,7 @@ public class ChukwaRecordJT extends org.apache.hadoop.record.Record {
   }
 
   public static class Comparator extends
-      org.apache.hadoop.record.RecordComparator {
+      org.apache.hadoop.record.RecordComparator implements Serializable {
     public Comparator() {
       super(ChukwaRecordJT.class);
     }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java
index 7bc6718..0e602d7 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java
@@ -178,6 +178,7 @@ public class ChukwaRecordKey extends org.apache.hadoop.record.Record {
   }
 
   public Object clone() throws CloneNotSupportedException {
+    super.clone();
     ChukwaRecordKey _rio_other = new ChukwaRecordKey();
     _rio_other.reduceType = this.reduceType;
     _rio_other.key = this.key;

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/Token.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/Token.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/Token.java
index 5b71a61..eb14414 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/Token.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/Token.java
@@ -21,4 +21,8 @@ package org.apache.hadoop.chukwa.extraction.engine;
 public class Token {
   public String key = null;
   public boolean hasMore = false;
+  
+  public boolean getMore() {
+    return hasMore;
+  }
 }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java
index 68dbb2c..dc0c576 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java
@@ -37,7 +37,7 @@ public class DsDirectory {
   private DataConfig dataConfig = null;
 
   private static FileSystem fs = null;
-  private static Configuration conf = null;
+  private Configuration conf = null;
 
   private DsDirectory() {
     dataConfig = new DataConfig();

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java
index c2602b4..bc0f83d 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java
@@ -45,6 +45,9 @@ import org.apache.commons.logging.LogFactory;
 public class DatabaseDS implements DataSource {
   private static final Log log = LogFactory.getLog(DatabaseDS.class);
 
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
+      "SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE", 
+      justification = "Dynamic based upon tables in the database")
   public SearchResult search(SearchResult result, String cluster,
       String dataSource, long t0, long t1, String filter, Token token)
       throws DataSourceException {
@@ -60,8 +63,6 @@ public class DatabaseDS implements DataSource {
       timeField = "LAUNCH_TIME";
     } else if (dataSource.equalsIgnoreCase("HodJob")) {
       timeField = "StartTime";
-    } else if (dataSource.equalsIgnoreCase("QueueInfo")) {
-      timeField = "timestamp";
     } else {
       timeField = "timestamp";
     }
@@ -88,13 +89,16 @@ public class DatabaseDS implements DataSource {
         int col = rmeta.getColumnCount();
         while (rs.next()) {
           ChukwaRecord event = new ChukwaRecord();
-          String cell = "";
+          StringBuilder cell = new StringBuilder();;
           long timestamp = 0;
 
           for (int i = 1; i < col; i++) {
             String value = rs.getString(i);
             if (value != null) {
-              cell = cell + " " + rmeta.getColumnName(i) + ":" + value;
+              cell.append(" ");
+              cell.append(rmeta.getColumnName(i));
+              cell.append(":");
+              cell.append(value);
             }
             if (rmeta.getColumnName(i).equals(timeField)) {
               timestamp = rs.getLong(i);
@@ -111,7 +115,7 @@ public class DatabaseDS implements DataSource {
             continue;
           }
 
-          event.add(Record.bodyField, cell);
+          event.add(Record.bodyField, cell.toString());
           event.add(Record.sourceField, cluster + "." + dataSource);
           if (records.containsKey(timestamp)) {
             records.get(timestamp).add(event);

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java
index bb1797b..f197420 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java
@@ -34,4 +34,12 @@ public class ChukwaDSInternalResult {
   String fileName = null;
 
   ChukwaRecordKey key = null;
+  
+  public ChukwaRecordKey getKey() {
+    return key;
+  }
+  
+  protected void setKey(ChukwaRecordKey key) {
+    this.key = key;
+  }
 }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaFileParser.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaFileParser.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaFileParser.java
index dc23ef6..799390b 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaFileParser.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaFileParser.java
@@ -131,12 +131,14 @@ public class ChukwaFileParser {
         }
 
       } while (line != null);
-    } catch (Exception e) {
+    } catch (IOException e) {
       e.printStackTrace();
     } finally {
       System.out.println("File: " + fileName + " Line count: " + lineCount);
       try {
-        dataIS.close();
+        if(dataIS != null) {
+          dataIS.close();
+        }
       } catch (IOException e) {
       }
     }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java
index 7c9e02c..93fdd2a 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java
@@ -26,7 +26,10 @@ import java.util.Date;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.TreeMap;
+
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
@@ -150,7 +153,7 @@ public class ChukwaRecordDataSource implements DataSource {
           {
             log.debug("check for hours");
             for (int hour = 0; hour < 24; hour++) {
-              if (workingDay == res.day && hour < workingHour) {
+              if (workingDay.equals(res.day) && hour < workingHour) {
                 continue;
               }
               log.debug(" Hour?  -->" + filePath + dataSource + "/"
@@ -349,7 +352,7 @@ public class ChukwaRecordDataSource implements DataSource {
         }
       }
 
-    } catch (Exception e) {
+    } catch (IOException e) {
       e.printStackTrace();
     } finally {
       try {
@@ -375,7 +378,10 @@ public class ChukwaRecordDataSource implements DataSource {
       contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
           + "/" + workingHour + "/rotateDone"));
       break;
-
+    default:
+      contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
+          + "/rotateDone"));
+      break;
     }
     return contains;
   }
@@ -400,7 +406,10 @@ public class ChukwaRecordDataSource implements DataSource {
       contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay
           + "/" + workingHour + "/" + raw));
       break;
-
+    default:
+      contains = fs
+          .exists(new Path(rootFolder + dataSource + "/" + workingDay));
+      break;
     }
     return contains;
   }
@@ -440,6 +449,10 @@ public class ChukwaRecordDataSource implements DataSource {
           + raws[rawIndex] + "/" + dataSource + "_" + day + "_" + hour + "_"
           + raws[rawIndex] + "." + spill + ".evt";
       break;
+    default:
+      fileName = rootFolder + "/" + dataSource + "/" + day + "/" + dataSource
+          + "_" + day + "." + spill + ".evt";
+      break;
     }
     log.debug("buildFileName :" + fileName);
     return fileName;
@@ -473,12 +486,10 @@ public class ChukwaRecordDataSource implements DataSource {
 
     ds.search(result, cluster, dataSource, t0, t1, filter, token);
     TreeMap<Long, List<Record>> records = result.getRecords();
-    Iterator<Long> it = records.keySet().iterator();
-
-    while (it.hasNext()) {
-      long ts = it.next();
+    for(Entry<Long, List<Record>> entry : records.entrySet()) {      
+      long ts = entry.getKey();
       System.out.println("\n\nTimestamp: " + new Date(ts));
-      List<Record> list = records.get(ts);
+      List<Record> list = entry.getValue();
       for (int i = 0; i < list.size(); i++) {
         System.out.println(list.get(i));
       }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java
index dbaadc2..59b8dcd 100644
--- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java
+++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java
@@ -118,7 +118,7 @@ public class ChukwaSequenceFileParser {
         }
 
       }
-    } catch (Exception e) {
+    } catch (IOException e) {
       e.printStackTrace();
     } finally {
       System.out.println("File: " + fileName + " Line count: " + lineCount);

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java b/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java
index f721978..48c578a 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java
@@ -20,13 +20,13 @@ package org.apache.hadoop.chukwa.hicc;
 
 
 import java.net.*;
+import java.nio.charset.Charset;
 import java.text.ParseException;
 import java.io.*;
 
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
-
 import org.apache.log4j.Logger;
 import org.apache.hadoop.chukwa.util.ExceptionUtil;
 
@@ -43,7 +43,7 @@ public class JSONLoader {
       // FileReader always assumes default encoding is OK!
       URL yahoo = new URL(source);
       BufferedReader in = new BufferedReader(new InputStreamReader(yahoo
-          .openStream()));
+          .openStream(), Charset.forName("UTF-8")));
 
       String inputLine;
 

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/hicc/OfflineTimeHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/OfflineTimeHandler.java b/src/main/java/org/apache/hadoop/chukwa/hicc/OfflineTimeHandler.java
index b3e7bf5..1e98989 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/OfflineTimeHandler.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/OfflineTimeHandler.java
@@ -52,10 +52,8 @@ public class OfflineTimeHandler {
 
   public void init(HashMap<String, String> map) {
     Calendar now = Calendar.getInstance();
-    if (map == null || (map != null 
-        && map.get("time_type") == null
-        && map.get("time_type") == null
-        && map.get("period") == null)) {
+    if (map == null || 
+        (map.get("time_type") == null && map.get("period") == null)) {
       end = now.getTimeInMillis();
       start = end - 60 * 60 * 1000;
     } else if (map.get("period") != null

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java b/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java
index dbb6707..19e537d 100644
--- a/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java
+++ b/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java
@@ -20,12 +20,12 @@ package org.apache.hadoop.chukwa.hicc;
 
 
 import java.io.*;
+import java.nio.charset.Charset;
 import java.util.*;
 
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.chukwa.util.ExceptionUtil;
@@ -43,7 +43,7 @@ public class Views {
     try {
       // use buffering, reading one line at a time
       // FileReader always assumes default encoding is OK!
-      BufferedReader input = new BufferedReader(new FileReader(aFile));
+      BufferedReader input = new BufferedReader(new InputStreamReader(new FileInputStream(aFile.getAbsolutePath()), Charset.forName("UTF-8")));
       try {
         String line = null; // not declared within while loop
         /*


Mime
View raw message