chukwa-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject [7/8] chukwa git commit: CHUKWA-771. Improved code quality issue identified by findbugs. (Eric Yang)
Date Sun, 26 Jul 2015 02:09:00 GMT
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
index 14cf514..4bfae2d 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
@@ -35,8 +35,6 @@ import org.apache.log4j.Logger;
 class FileAdaptorTailer extends Thread {
   static Logger log = Logger.getLogger(FileAdaptorTailer.class);
   private List<FileAdaptor> adaptors = null;
-  private static Configuration conf = null;
-  private Object lock = new Object();
   
   /**
    * How often to call each adaptor.
@@ -46,16 +44,13 @@ class FileAdaptorTailer extends Thread {
 
   
   public FileAdaptorTailer() {
-    
-    if (conf == null) {
-      ChukwaAgent agent = ChukwaAgent.getAgent();
-      if (agent != null) {
-        conf = agent.getConfiguration();
-        if (conf != null) {
-          SAMPLE_PERIOD_MS = conf.getInt(
-              "chukwaAgent.adaptor.context.switch.time",
-              DEFAULT_SAMPLE_PERIOD_MS);
-        }
+    ChukwaAgent agent = ChukwaAgent.getAgent();
+    if (agent != null) {
+      Configuration conf = agent.getConfiguration();
+      if (conf != null) {
+        SAMPLE_PERIOD_MS = conf.getInt(
+            "chukwaAgent.adaptor.context.switch.time",
+            DEFAULT_SAMPLE_PERIOD_MS);
       }
     }
     
@@ -70,17 +65,6 @@ class FileAdaptorTailer extends Thread {
     while(true) {
       try {
 
-        while (adaptors.size() == 0) {
-          synchronized (lock) {
-            try {
-              log.info("Waiting queue is empty");
-              lock.wait();
-            } catch (InterruptedException e) {
-              // do nothing
-            }
-          }
-        }
-        
         long startTime = System.currentTimeMillis();
         for (FileAdaptor adaptor: adaptors) {
           log.info("calling sendFile for " + adaptor.toWatch.getCanonicalPath());
@@ -100,9 +84,6 @@ class FileAdaptorTailer extends Thread {
   
   public void addFileAdaptor(FileAdaptor adaptor) {
     adaptors.add(adaptor);
-    synchronized (lock) {
-      lock.notifyAll();
-    }
   }
   
   public void removeFileAdaptor(FileAdaptor adaptor) {
@@ -119,7 +100,7 @@ public class FileAdaptor extends AbstractAdaptor {
   static FileAdaptorTailer tailer = null;
   
   static final int DEFAULT_TIMEOUT_PERIOD = 5*60*1000;
-  static int TIMEOUT_PERIOD = DEFAULT_TIMEOUT_PERIOD;
+  int TIMEOUT_PERIOD = DEFAULT_TIMEOUT_PERIOD;
   
   static {
     tailer = new FileAdaptorTailer();

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java
index c07f6fa..5f4928a 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.chukwa.datacollection.adaptor;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.charset.Charset;
@@ -105,9 +104,16 @@ public class JMXAdaptor extends AbstractAdaptor{
 			while(!shutdown){
 				try{					
 					BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(jmx_pw_file.getAbsolutePath()), Charset.forName("UTF-8")));
-					String[] creds = br.readLine().split(" ");
-					Map<String, String[]> env = new HashMap<String, String[]>();			
-					env.put(JMXConnector.CREDENTIALS, creds);
+					String buffer = br.readLine();
+					String[] creds = null;
+					if(buffer != null ) {
+					  creds = buffer.split(" ");
+					}
+					br.close();
+					Map<String, String[]> env = new HashMap<String, String[]>();
+					if(creds!=null) {
+					  env.put(JMXConnector.CREDENTIALS, creds);
+					}
 					jmxc = JMXConnectorFactory.connect(url, env);
 					mbsc = jmxc.getMBeanServerConnection();							
 					if(timer == null) {
@@ -131,7 +137,7 @@ public class JMXAdaptor extends AbstractAdaptor{
 					timer.cancel();
 					timer = null;
 					shutdown = true;
-				}						
+				}
 			}
 		}
 		
@@ -181,7 +187,7 @@ public class JMXAdaptor extends AbstractAdaptor{
 							Descriptor d = mb.getDescriptor();
 							val = mbsc.getAttribute(oname, mb.getName());
 							if(d.getFieldNames().length > 0){ //this is an open mbean
-								OpenType openType = (OpenType)d.getFieldValue("openType");	
+								OpenType<?> openType = (OpenType<?>)d.getFieldValue("openType");	
 								
 								if(openType.isArray()){									
 									Object[] valarray = (Object[])val;									

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java
index 39af580..5011f70 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java
@@ -50,7 +50,7 @@ public class OozieAdaptor extends AbstractAdaptor {
   private final ScheduledExecutorService scheduler = Executors
       .newScheduledThreadPool(1);
   private static final long initialDelay = 60; // seconds
-  private static long periodicity = 60; // seconds
+  private long periodicity = 60; // seconds
   private ScheduledFuture<?> scheduledCollectorThread;
 
   @Override

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java
index b37be9c..072c151 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java
@@ -140,16 +140,14 @@ public class SocketAdaptor extends AbstractAdaptor {
           }
         }
       } catch(java.io.EOFException e) {
-        log.info("Caught java.io.EOFException closing conneciton.");
+        log.debug("Caught java.io.EOFException:", e);
       } catch(java.net.SocketException e) {
-        log.info("Caught java.net.SocketException closing conneciton.");
+        log.debug("Caught java.net.SocketException:", e);
       } catch(InterruptedIOException e) {
         Thread.currentThread().interrupt();
-        log.info("Caught java.io.InterruptedIOException: "+e);
-        log.info("Closing connection.");
+        log.debug("Caught java.io.InterruptedIOException: ", e);
       } catch(IOException e) {
-        log.info("Caught java.io.IOException: "+e);
-        log.info("Closing connection.");
+        log.debug("Caught java.io.IOException: "+e);
       } catch(Exception e) {
         log.error("Unexpected exception. Closing conneciton.", e);
       } finally {

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java
index 07f6c66..50dec64 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java
@@ -76,7 +76,7 @@ public class SyslogAdaptor extends UDPAdaptor {
         facility = facility / 8;
         dataType = facilityMap.get(facility); 
       } catch (NumberFormatException nfe) {
-        log.warn("Unsupported format detected by SyslogAdaptor:"+trimmedBuf);
+        log.warn("Unsupported format detected by SyslogAdaptor:"+Arrays.toString(trimmedBuf));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java
index dcf0600..cb04aae 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.chukwa.datacollection.adaptor;
 
-import java.util.*;
 import java.io.*;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.ChunkImpl;
@@ -37,7 +36,7 @@ public class WriteaheadBuffered extends AbstractWrapper {
   
   
   @Override
-  public synchronized void add(Chunk event) throws InterruptedException {
+  public void add(Chunk event) throws InterruptedException {
     try {
       event.write(outToDisk);
       outToDisk.flush();
@@ -85,7 +84,7 @@ public class WriteaheadBuffered extends AbstractWrapper {
   }
   
   @Override
-  public synchronized void committed(long l) {
+  public void committed(long l) {
 
     try {
       long bytesOutstanding = highestSentOffset - l;
@@ -93,7 +92,10 @@ public class WriteaheadBuffered extends AbstractWrapper {
         fSize = 0;
         outToDisk.close();
         File outBufTmp = new File(outBuf.getAbsoluteFile(), outBuf.getName() + ".tmp");
-        outBuf.renameTo(outBufTmp);
+        if(!outBuf.renameTo(outBufTmp)) {
+          log.warn("Cannot rename temp file "+outBuf.getAbsolutePath()+
+              " to "+outBufTmp.getAbsolutePath());
+        };
         outToDisk = new DataOutputStream(new FileOutputStream(outBuf, false));
         DataInputStream dis = new DataInputStream(new FileInputStream(outBufTmp));
         while(dis.available() > 0) {
@@ -104,7 +106,9 @@ public class WriteaheadBuffered extends AbstractWrapper {
           }
         }
         dis.close();
-        outBufTmp.delete();
+        if(!outBufTmp.delete()) {
+          log.warn("Can not delete temp file: "+outBufTmp.getAbsolutePath());
+        };
       }
     } catch(IOException e) {
       log.error(e);
@@ -114,8 +118,11 @@ public class WriteaheadBuffered extends AbstractWrapper {
   
   @Override
   public long shutdown(AdaptorShutdownPolicy p) throws AdaptorException {
-    if(p != RESTARTING)
-      outBuf.delete();    
+    if(p != RESTARTING) {
+      if(outBuf.delete()) {
+        log.warn("Cannot delete output buffer file:"+outBuf.getAbsolutePath());
+      };
+    }
     return inner.shutdown(p);
   }
 

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
index 5fea073..9fc25b9 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.io.File;
 import org.apache.hadoop.chukwa.datacollection.adaptor.*;
-import org.apache.hadoop.chukwa.util.ExceptionUtil;
 
 /**
  * An adaptor that repeatedly tails a specified file, sending the new bytes.
@@ -118,7 +117,7 @@ public class FileTailingAdaptor extends LWFTAdaptor {
    * @param eq the queue to write Chunks into
    */
   @Override
-  public synchronized boolean tailFile()
+  public boolean tailFile()
       throws InterruptedException {
     boolean hasMoreData = false;
 

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
index 9da09d5..dc867d5 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
@@ -51,7 +51,7 @@ public class LWFTAdaptor extends AbstractAdaptor {
   public static final String MAX_READ_SIZE_OPT = 
       "chukwaAgent.fileTailingAdaptor.maxReadSize";
 
-  static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE;
+  int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE;
   
   static Logger log;
   static FileTailer tailer;
@@ -200,7 +200,7 @@ public class LWFTAdaptor extends AbstractAdaptor {
     return hasMoreData;
   }
   
-  public synchronized boolean tailFile()
+  public boolean tailFile()
   throws InterruptedException {
     boolean hasMoreData = false;
     try {

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java
index 2fa82fe..cd8d53f 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java
@@ -26,6 +26,8 @@ import java.util.regex.Pattern;
 import java.util.Collections;
 import java.util.LinkedList;
 
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
 /**
  * Checkpoint state:
  *   date modified of most-recently tailed file, offset of first byte of that file,
@@ -54,6 +56,22 @@ public class RCheckFTAdaptor extends LWFTAdaptor implements FileFilter {
       //just a heuristic that hasn't been tuned yet
       else return (o.f.getName().compareTo(f.getName()));
     }
+    
+    @Override
+    public boolean equals(Object o) {
+      if(o instanceof FPair) {
+        return mod == ((FPair) o).mod;
+      } else {
+        return false;
+      }
+    }
+    
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder(643, 1321).
+          append(this.mod).
+          toHashCode();
+    }
   }
   
   long prevFileLastModDate = 0;
@@ -129,7 +147,7 @@ public class RCheckFTAdaptor extends LWFTAdaptor implements FileFilter {
   }
   
   @Override
-  public synchronized boolean tailFile()
+  public boolean tailFile()
   throws InterruptedException {
     boolean hasMoreData = false;
     try {

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/HttpStatusChecker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/HttpStatusChecker.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/HttpStatusChecker.java
index 082dd58..e924172 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/HttpStatusChecker.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/HttpStatusChecker.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.chukwa.datacollection.adaptor.heartbeat;
 
+import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.net.URL;
 
@@ -54,7 +55,7 @@ public class HttpStatusChecker implements StatusChecker {
       connection = (HttpURLConnection)url.openConnection();
       connection.connect();
       status.put("status", "running");
-    } catch (Exception e) {
+    } catch (IOException e) {
       status.put("status", "stopped");    
     } finally {
       if(connection != null){

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java
index 991cdaf..79f8db6 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
 
+import java.nio.charset.Charset;
+
 import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor;
 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
@@ -100,7 +102,7 @@ public class JMSAdaptor extends AbstractAdaptor {
         bytesReceived += bytes.length;
 
         if (log.isDebugEnabled()) {
-          log.debug("Adding Chunk from JMS message: " + new String(bytes));
+          log.debug("Adding Chunk from JMS message: " + new String(bytes, Charset.forName("UTF-8")));
         }
 
         Chunk c = new ChunkImpl(type, source, bytesReceived, bytes, JMSAdaptor.this);
@@ -142,6 +144,7 @@ public class JMSAdaptor extends AbstractAdaptor {
 
     String transformerName = null;
     String transformerConfs = null;
+    StringBuilder transformerConfsBuffer = new StringBuilder();
     for (int i = 1; i < tokens.length; i++) {
       String value = tokens[i];
       if ("-t".equals(value)) {
@@ -168,17 +171,19 @@ public class JMSAdaptor extends AbstractAdaptor {
         transformerName = tokens[++i];
       }
       else if ("-p".equals(value)) {
-        transformerConfs = tokens[++i];
+        transformerConfsBuffer.append(tokens[++i]);
+        transformerConfs = transformerConfsBuffer.toString();
 
         // transformerConfs can have multiple words
-        if (transformerConfs.startsWith("\"")) {
+        if (transformerConfsBuffer.toString().startsWith("\"")) {
           for(int j = i + 1; j < tokens.length; j++) {
-            transformerConfs = transformerConfs + " " + tokens[++i];
+            transformerConfsBuffer.append(" ");
+            transformerConfsBuffer.append(tokens[++i]);
             if(tokens[j].endsWith("\"")) {
               break;
             }
           }
-          transformerConfs = trimQuotes(transformerConfs);
+          transformerConfs = trimQuotes(transformerConfsBuffer.toString());
         }
       }
     }
@@ -196,7 +201,7 @@ public class JMSAdaptor extends AbstractAdaptor {
     // create transformer
     if (transformerName != null) {
       try {
-        Class classDefinition = Class.forName(transformerName);
+        Class<?> classDefinition = Class.forName(transformerName);
         Object object = classDefinition.newInstance();
         transformer = (JMSMessageTransformer)object;
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java
index b0ef917..facff2d 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java
@@ -73,14 +73,16 @@ public class JMSMessagePropertyTransformer implements JMSMessageTransformer {
       String token = tokens[i];
 
       if ("-d".equals(token) && i <= tokens.length - 2) {
-        String value = tokens[++i];
+        StringBuilder value = new StringBuilder();
+        value.append(tokens[++i]);
 
         // we lost all spaces with the split, so we have to put them back, yuck.
         while (i <= tokens.length - 2 && !tokens[i + 1].startsWith("-")) {
-          value = value + " " + tokens[++i];
+          value.append(" ");
+          value.append(tokens[++i]);
         }
 
-        delimiter = trimSingleQuotes(value);
+        delimiter = trimSingleQuotes(value.toString());
       }
       else if ("-r".equals(token) && i <= tokens.length - 2) {
         // requiredPropertyNames = null means all are required.

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java
index 9f98f4a..52d4cb8 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
 
+import java.nio.charset.Charset;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -44,7 +46,7 @@ public class JMSTextMessageTransformer implements JMSMessageTransformer {
 
     String text = ((TextMessage)message).getText();
     if (text != null && text.length() > 0) {
-      return text.getBytes();
+      return text.getBytes(Charset.forName("UTF-8"));
     }
 
     return null;

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java
index cde2868..88ba9bc 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java
@@ -21,7 +21,6 @@ import java.util.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
-import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
 import org.apache.hadoop.chukwa.datacollection.sender.AsyncAckSender;
 import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
 import org.apache.log4j.Logger;
@@ -52,9 +51,8 @@ public class AdaptorResetThread extends Thread {
   
   public AdaptorResetThread(Configuration conf, ChukwaAgent a) {
         //
-    timeout =  conf.getInt(SeqFileWriter.ROTATE_INTERVAL_OPT, timeout/3) 
-        + conf.getInt(AsyncAckSender.POLLPERIOD_OPT, timeout/3) 
-        + conf.getInt(CommitCheckServlet.SCANPERIOD_OPT, timeout/3);
+    timeout =  conf.getInt(SeqFileWriter.ROTATE_INTERVAL_OPT, timeout/2) 
+        + conf.getInt(AsyncAckSender.POLLPERIOD_OPT, timeout/2);
     
     timeout = conf.getInt(TIMEOUT_OPT, timeout); //unless overridden
      

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
index dda7888..d024180 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
@@ -74,7 +74,7 @@ public class AgentControlSocketListener extends Thread {
         InputStream in = connection.getInputStream();
         BufferedReader br = new BufferedReader(new InputStreamReader(in, Charset.forName("UTF-8")));
         PrintStream out = new PrintStream(new BufferedOutputStream(connection
-            .getOutputStream()));
+            .getOutputStream()), true, "UTF-8");
         String cmd = null;
         while ((cmd = br.readLine()) != null) {
           processCommand(cmd, out);

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorConfig.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorConfig.java
index 78c307e..03ed635 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorConfig.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorConfig.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.chukwa.datacollection.agent.rest;
 
-import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlAccessorType;

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java
index 70edc2a..dc44975 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java
@@ -21,9 +21,6 @@ import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
 import org.apache.hadoop.chukwa.datacollection.OffsetStatsManager;
-import org.apache.hadoop.chukwa.util.ExceptionUtil;
-import org.apache.commons.lang.StringEscapeUtils;
-import org.json.simple.JSONObject;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -31,15 +28,11 @@ import javax.ws.rs.Path;
 import javax.ws.rs.GET;
 import javax.ws.rs.Produces;
 import javax.ws.rs.PathParam;
-import javax.ws.rs.QueryParam;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.POST;
 import javax.ws.rs.Consumes;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.MediaType;
-import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletResponse;
 
 import java.text.DecimalFormat;
@@ -54,7 +47,7 @@ import java.util.Map;
 public class AdaptorController {
 
   private static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat();
-  private static final Log log = LogFactory.getLog(AdaptorController.class);
+  private static final Log LOG = LogFactory.getLog(AdaptorController.class);
 
   static {
     DECIMAL_FORMAT.setMinimumFractionDigits(2);
@@ -95,6 +88,8 @@ public class AdaptorController {
       String adaptorId = agent.processAddCommandE(addCommand.toString());
       return doGetAdaptor(adaptorId);
     } catch (AdaptorException e) {
+      LOG.warn("Could not add adaptor for data type: '" + ac.getDataType() +
+          "', error: " + e.getMessage());
       return badRequestResponse("Could not add adaptor for data type: '" + ac.getDataType() +
               "', error: " + e.getMessage());
     }
@@ -180,7 +175,7 @@ public class AdaptorController {
   protected AdaptorInfo buildAdaptor(String adaptorId) {
     ChukwaAgent agent = ChukwaAgent.getAgent();
     Adaptor adaptor = agent.getAdaptor(adaptorId);
-    OffsetStatsManager adaptorStats = agent.getAdaptorStatsManager();
+    OffsetStatsManager<Adaptor> adaptorStats = agent.getAdaptorStatsManager();
 
     AdaptorInfo info = new AdaptorInfo();
     info.setId(adaptorId);
@@ -205,7 +200,7 @@ public class AdaptorController {
     AdaptorList list = new AdaptorList();
     for(String name : adaptorMap.keySet()) {
       Adaptor adaptor = agent.getAdaptor(name);
-      OffsetStatsManager adaptorStats = agent.getAdaptorStatsManager();
+      OffsetStatsManager<Adaptor> adaptorStats = agent.getAdaptorStatsManager();
 
       AdaptorInfo info = new AdaptorInfo();
       info.setId(name);

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
index d4c2df4..df1616b 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
@@ -20,13 +20,18 @@ package org.apache.hadoop.chukwa.datacollection.collector.servlet;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.net.URI;
+
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+
 import org.apache.log4j.Logger;
+
 import java.util.*;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
 import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
 import org.apache.hadoop.chukwa.extraction.archive.SinkArchiver;
@@ -39,8 +44,8 @@ public class CommitCheckServlet extends HttpServlet {
   private static final long serialVersionUID = -4627538252371890849L;
   
   protected final static Logger log = Logger.getLogger(CommitCheckServlet.class);
-  CommitCheckThread commitCheck;
-  Configuration conf;
+  transient CommitCheckThread commitCheck;
+  transient Configuration conf;
     //interval at which to scan the filesystem, ms
   public static final String SCANPERIOD_OPT = "chukwaCollector.asyncAcks.scanperiod";
   
@@ -78,7 +83,7 @@ public class CommitCheckServlet extends HttpServlet {
   protected void doGet(HttpServletRequest req, HttpServletResponse resp)
       throws ServletException, IOException  {
   
-    PrintStream out = new PrintStream(resp.getOutputStream());
+    PrintStream out = new PrintStream(resp.getOutputStream(), true, "UTF-8");
     resp.setStatus(200);
 
     out.println("<html><body><h2>Commit status</h2><ul>");
@@ -98,6 +103,7 @@ public class CommitCheckServlet extends HttpServlet {
    * For now, instead, we'll just do an ls in a bunch of places.
    */
   private static class CommitCheckThread extends Thread implements CHUKWA_CONSTANT {
+
     int checkInterval = 1000 * 30;
     volatile boolean running = true;
     final Collection<Path> pathsToSearch;
@@ -116,15 +122,30 @@ public class CommitCheckServlet extends HttpServlet {
         this.purgeTime = time;
         this.len = len;
       }
-      
+
+      @Override
+      public boolean equals (Object o) {
+        if(o == null || !(o instanceof PurgeTask)) {
+          return false;
+        }
+        PurgeTask other = (PurgeTask) o;
+        return this.hashCode() == other.hashCode();
+      }
+
+      @Override
       public int compareTo(PurgeTask p) {
         if(purgeTime < p.purgeTime)
           return -1;
-        else if (purgeTime == p.purgeTime)
+        else if (this.equals(p))
           return 0;
         else
           return 1;
       }
+
+      @Override
+      public int hashCode() {
+        return new HashCodeBuilder(3221, 4271).append(purgeTime).toHashCode();
+      }
     }
     
     

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
index 613fa3e..dfbe53a 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
@@ -22,11 +22,15 @@ import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+
 import org.apache.log4j.Logger;
+
 import java.io.*;
+import java.nio.charset.Charset;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.*;
+
 import org.apache.hadoop.chukwa.*;
 import org.apache.hadoop.chukwa.datacollection.writer.ExtractorWriter;
 import org.apache.hadoop.conf.Configuration;
@@ -61,8 +65,8 @@ public class LogDisplayServlet extends HttpServlet {
   public static final String BUF_SIZE_OPT = "chukwaCollector.showLogs.buffer";
   long BUF_SIZE = 1024* 1024;
   
-  Configuration conf;
-  Map<String, Deque<Chunk>> chunksBySID = new HashMap<String, Deque<Chunk>>();
+  transient Configuration conf;
+  transient Map<String, Deque<Chunk>> chunksBySID;
   Queue<String> receivedSIDs = new LinkedList<String>();
   long totalStoredSize = 0;
 
@@ -71,12 +75,20 @@ public class LogDisplayServlet extends HttpServlet {
   
   public LogDisplayServlet() {
     conf = new Configuration();
-    ExtractorWriter.recipient = this;
+    chunksBySID = new HashMap<String, Deque<Chunk>>();
+    ExtractorWriter.setRecipient(this);
   }
   
   public LogDisplayServlet(Configuration c) {
     conf = c;
-    ExtractorWriter.recipient = this;
+    chunksBySID = new HashMap<String, Deque<Chunk>>();
+    ExtractorWriter.setRecipient(this);
+  }
+
+  public LogDisplayServlet(Configuration c, Map<String, Deque<Chunk>> chunksBySID) {
+    conf = c;
+    this.chunksBySID = chunksBySID;
+    ExtractorWriter.setRecipient(this);
   }
 
   public void init(ServletConfig servletConf) throws ServletException {
@@ -93,9 +105,9 @@ public class LogDisplayServlet extends HttpServlet {
       MessageDigest md;
       md = MessageDigest.getInstance("MD5");
   
-      md.update(c.getSource().getBytes());
-      md.update(c.getStreamName().getBytes());
-      md.update(c.getTags().getBytes());
+      md.update(c.getSource().getBytes(Charset.forName("UTF-8")));
+      md.update(c.getStreamName().getBytes(Charset.forName("UTF-8")));
+      md.update(c.getTags().getBytes(Charset.forName("UTF-8")));
       StringBuilder sb = new StringBuilder();
       byte[] bytes = md.digest();
       for(int i=0; i < bytes.length; ++i) {
@@ -106,7 +118,6 @@ public class LogDisplayServlet extends HttpServlet {
       return sb.toString();
     } catch(NoSuchAlgorithmException n) {
       log.fatal(n);
-      System.exit(0);
       return null;
     }
   }
@@ -146,7 +157,7 @@ public class LogDisplayServlet extends HttpServlet {
   protected synchronized void doGet(HttpServletRequest req, HttpServletResponse resp)
       throws ServletException, IOException  {
   
-    PrintStream out = new PrintStream(new BufferedOutputStream(resp.getOutputStream()));
+    PrintStream out = new PrintStream(new BufferedOutputStream(resp.getOutputStream()), true, "UTF-8");
     resp.setStatus(200);
     String path = req.getServletPath();
     String streamID = req.getParameter("sid");

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
index 5c3ea71..0a78f2f 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
@@ -54,14 +54,14 @@ public class ServletCollector extends HttpServlet {
    * If a chunk is committed; then the ack will start with the following string.
    */
   public static final String ACK_PREFIX = "ok: ";
-  org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter writer = null;
+  transient ChukwaWriter writer = null;
 
   private static final long serialVersionUID = 6286162898591407111L;
-  Logger log = Logger.getRootLogger();// .getLogger(ServletCollector.class);
+  transient Logger log = Logger.getLogger(ServletCollector.class);
   
-  static boolean COMPRESS;
-  static String CODEC_NAME;
-  static CompressionCodec codec;
+  boolean COMPRESS;
+  String CODEC_NAME;
+  transient CompressionCodec codec;
 
   public void setWriter(ChukwaWriter w) {
     writer = w;
@@ -76,7 +76,7 @@ public class ServletCollector extends HttpServlet {
   int numberchunks = 0;
   long lifetimechunks = 0;
 
-  Configuration conf;
+  transient Configuration conf;
 
   public ServletCollector(Configuration c) {
     conf = c;
@@ -151,7 +151,6 @@ public class ServletCollector extends HttpServlet {
   protected void accept(HttpServletRequest req, HttpServletResponse resp)
       throws ServletException {
     numberHTTPConnection++;
-    ServletDiagnostics diagnosticPage = new ServletDiagnostics();
     final long currentTime = System.currentTimeMillis();
     try {
 
@@ -173,10 +172,6 @@ public class ServletCollector extends HttpServlet {
       final int numEvents = di.readInt();
       // log.info("saw " + numEvents+ " in request");
 
-      if (FANCY_DIAGNOSTICS) {
-        diagnosticPage.sawPost(req.getRemoteHost(), numEvents, currentTime);
-      }
-
       List<Chunk> events = new LinkedList<Chunk>();
       StringBuilder sb = new StringBuilder();
 
@@ -184,9 +179,6 @@ public class ServletCollector extends HttpServlet {
         ChunkImpl logEvent = ChunkImpl.read(di);
         events.add(logEvent);
 
-        if (FANCY_DIAGNOSTICS) {
-          diagnosticPage.sawChunk(logEvent, i);
-        }
       }
 
       int responseStatus = HttpServletResponse.SC_OK;
@@ -226,10 +218,6 @@ public class ServletCollector extends HttpServlet {
         l_out.println("can't write: no writer");
       }
 
-      if (FANCY_DIAGNOSTICS) {
-        diagnosticPage.doneWithPost();
-      }
-
       resp.setStatus(responseStatus);
 
     } catch (Throwable e) {
@@ -251,7 +239,7 @@ public class ServletCollector extends HttpServlet {
 
 
     log.info("new GET from " + req.getRemoteHost() + " at " + System.currentTimeMillis());
-    PrintStream out = new PrintStream(resp.getOutputStream());
+    PrintStream out = new PrintStream(resp.getOutputStream(), true, "UTF-8");
     resp.setStatus(200);
 
     String pingAtt = req.getParameter("ping");
@@ -264,8 +252,6 @@ public class ServletCollector extends HttpServlet {
       out.println("lifetimechunks:" + lifetimechunks);
     } else {
       out.println("<html><body><h2>Chukwa servlet running</h2>");
-      if (FANCY_DIAGNOSTICS)
-        ServletDiagnostics.printPage(out);
       out.println("</body></html>");
     }
 

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java
index a9bd744..29c1fb5 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java
@@ -28,7 +28,7 @@ public class ChunkCatcherConnector implements Connector {
   
   Timer tm;
   
-  class Interruptor extends TimerTask {
+  static class Interruptor extends TimerTask {
     Thread targ;
     volatile boolean deactivate = false;
     Interruptor(Thread t) {

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java
index b998139..929d871 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java
@@ -59,7 +59,7 @@ public class PipelineConnector implements Connector, Runnable {
 
   ChunkQueue chunkQueue;
 
-  private static volatile ChukwaAgent agent = null;
+  private ChukwaAgent agent = null;
 
   private volatile boolean stopMe = false;
   protected ChukwaWriter writers = null;

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java
index 3bb0dd7..e542b2f 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java
@@ -41,6 +41,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
 import org.apache.hadoop.chukwa.datacollection.DataFactory;
@@ -55,7 +57,7 @@ public class HttpConnector implements Connector, Runnable {
   static Logger log = Logger.getLogger(HttpConnector.class);
 
   Timer statTimer = null;
-  volatile int chunkCount = 0;
+  AtomicInteger chunkCount = new AtomicInteger();
   
   int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
   int MIN_POST_INTERVAL = 5 * 1000;
@@ -78,8 +80,8 @@ public class HttpConnector implements Connector, Runnable {
     statTimer = new Timer();
     statTimer.schedule(new TimerTask() {
       public void run() {
-        int count = chunkCount;
-        chunkCount = 0;
+        int count = chunkCount.get();
+        chunkCount.set(0);
         log.info("# http chunks ACK'ed since last report: " + count);
       }
     }, 100, 60 * 1000);
@@ -170,7 +172,7 @@ public class HttpConnector implements Connector, Runnable {
         // checkpoint the chunks which were committed
         for (ChukwaHttpSender.CommitListEntry cle : results) {
           agent.reportCommit(cle.adaptor, cle.uuid);
-          chunkCount++;
+          chunkCount.set(chunkCount.get()+1);;
         }
 
         long now = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java
index 6f818e4..2b6617d 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java
@@ -23,12 +23,10 @@ import org.apache.hadoop.chukwa.datacollection.agent.*;
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
 import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
 import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
-import org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender.CommitListEntry;
 import java.util.*;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import org.apache.hadoop.conf.*;
-import org.apache.commons.httpclient.*;
 import org.apache.commons.httpclient.methods.GetMethod;
 import org.apache.commons.httpclient.methods.PostMethod;
 //import com.google.common.collect.SortedSetMultimap;
@@ -74,7 +72,7 @@ public class AsyncAckSender extends ChukwaHttpSender{
       int c = o.aName.compareTo(this.aName);
       if(c != 0)
         return c;
-      c = fname.compareTo(this.fname);
+      c = o.fname.compareTo(this.fname);
       if(c != 0)
         return c;
       if(o.start < start)
@@ -83,7 +81,19 @@ public class AsyncAckSender extends ChukwaHttpSender{
         return -1;
       else return 0;
     }
-    
+
+    @Override
+    public boolean equals(Object o) {
+      if(!(o instanceof DelayedCommit)) {
+        return false;
+      }
+      DelayedCommit dc = (DelayedCommit) o;
+      if(this.aName.equals(dc.aName)) {
+        return true;
+      }
+      return false;
+    }
+
     public String toString() {
       return adaptor +" commits from" + start + " to " + uuid + " when " + fname + " hits " + fOffset;
     }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
index 1c8c3d2..76727fe 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
@@ -88,9 +88,9 @@ public class ChukwaHttpSender implements ChukwaSender {
 
   protected Iterator<String> collectors;
   
-  static boolean COMPRESS;
-  static String CODEC_NAME;
-  static CompressionCodec codec;
+  boolean COMPRESS;
+  String CODEC_NAME;
+  CompressionCodec codec;
 
   static {
     connectionManager = new MultiThreadedHttpConnectionManager();
@@ -112,9 +112,13 @@ public class ChukwaHttpSender implements ChukwaSender {
   // FIXME: probably we're better off with an EventListRequestEntity
   static class BuffersRequestEntity implements RequestEntity {
     List<DataOutputBuffer> buffers;
+    boolean compress;
+    CompressionCodec codec;
 
-    public BuffersRequestEntity(List<DataOutputBuffer> buf) {
+    public BuffersRequestEntity(List<DataOutputBuffer> buf, boolean compress, CompressionCodec codec) {
       buffers = buf;
+      this.compress = compress;
+      this.codec = codec;
     }
     
     private long getUncompressedContentLenght(){
@@ -125,7 +129,7 @@ public class ChukwaHttpSender implements ChukwaSender {
     }
 
     public long getContentLength() {
-    	if( COMPRESS) {
+    	if(compress) {
     		return -1;
     	}
     	else {
@@ -148,7 +152,7 @@ public class ChukwaHttpSender implements ChukwaSender {
     }
     
     public void writeRequest(OutputStream out) throws IOException {
-      if( COMPRESS) {
+      if(compress) {
           CompressionOutputStream cos = codec.createOutputStream(out);
           DataOutputStream dos = new DataOutputStream( cos);
           doWriteRequest( dos);
@@ -239,7 +243,7 @@ public class ChukwaHttpSender implements ChukwaSender {
     toSend.clear();
 
     // collect all serialized chunks into a single buffer to send
-    RequestEntity postData = new BuffersRequestEntity(serializedEvents);
+    RequestEntity postData = new BuffersRequestEntity(serializedEvents, COMPRESS, codec);
 
     PostMethod method = new PostMethod();
     method.setRequestEntity(postData);

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java
index 15cb20a..c636ad2 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.chukwa.datacollection.sender;
 
 
 import java.io.*;
+import java.nio.charset.Charset;
 import java.util.*;
+
 import org.apache.hadoop.conf.Configuration;
 
 /***
@@ -40,14 +42,14 @@ public class RetryListOfCollectors implements Iterator<String>, Cloneable {
   long lastLookAtFirstNode;
   int nextCollector = 0;
   private String portNo;
-  Configuration conf;
   public static final String RETRY_RATE_OPT = "chukwaAgent.connector.retryRate";
 
   public RetryListOfCollectors(File collectorFile, Configuration conf)
       throws IOException {
     this(conf);
     try {
-      BufferedReader br = new BufferedReader(new FileReader(collectorFile));
+      FileInputStream fis = new FileInputStream(collectorFile);
+      BufferedReader br = new BufferedReader(new InputStreamReader(fis, Charset.forName("UTF-8")));
       String line, parsedline;
       while ((line = br.readLine()) != null) {
         parsedline = canonicalizeLine(line);
@@ -104,7 +106,6 @@ public class RetryListOfCollectors implements Iterator<String>, Cloneable {
   
   public RetryListOfCollectors(Configuration conf) {
     collectors = new ArrayList<String>();
-    this.conf = conf;
     portNo = conf.get("chukwaCollector.http.port", "8080");
     maxRetryRateMs = conf.getInt(RETRY_RATE_OPT, 15 * 1000);
     lastLookAtFirstNode = 0;

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java
index 7ba7a29..7c2e755 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.datacollection.*;
 import org.apache.hadoop.chukwa.datacollection.agent.*;
 import org.apache.hadoop.chukwa.datacollection.connector.Connector;
+
+import java.nio.charset.Charset;
 import java.util.*;
 
 /**
@@ -64,7 +66,7 @@ public class ConsoleOutConnector extends Thread implements Connector {
               System.out.println("data length was " + e.getData().length
                   + ", not printing");
             else
-              System.out.println(new String(e.getData()));
+              System.out.println(new String(e.getData(), Charset.forName("UTF-8")));
           }
 
           agent.reportCommit(e.getInitiator(), e.getSeqID());

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java
index b7215c9..d52d58f 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java
@@ -28,8 +28,6 @@ import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.datacollection.writer.*;
-import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
-import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter.StatReportingTask;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,17 +43,7 @@ public class FilePerPostWriter extends SeqFileWriter {
 
   String baseName;
   AtomicLong counter = new AtomicLong(0);
-  
-  protected FileSystem fs = null;
-  protected Configuration conf = null;
-
-  protected String outputDir = null;
-//  private Calendar calendar = Calendar.getInstance();
 
-  protected Path currentPath = null;
-  protected String currentFileName = null;
-
-  
   @Override
   public synchronized CommitStatus add(List<Chunk> chunks) throws WriterException {
 
@@ -83,12 +71,10 @@ public class FilePerPostWriter extends SeqFileWriter {
             + "/" + chunk.getStreamName());
         archiveKey.setSeqId(chunk.getSeqID());
 
-        if (chunk != null) {
           // compute size for stats
           dataSize += chunk.getData().length;
           bytesThisRotate += chunk.getData().length;
           seqFileWriter.append(archiveKey, chunk);
-        }
 
       }
       
@@ -129,7 +115,13 @@ public class FilePerPostWriter extends SeqFileWriter {
     } catch(Exception e) {
       throw new WriterException(e);
     }
-      
   }
 
+  protected String getCurrentFileName() {
+    return currentFileName;
+  }
+  
+  protected Path getCurrentPath() {
+    return currentPath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
index 7aeab22..e00229a 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
@@ -25,10 +25,13 @@ import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollecto
 import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
 import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
 import org.apache.hadoop.chukwa.datacollection.writer.ConsoleWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
 import org.mortbay.jetty.Server;
 import org.mortbay.jetty.servlet.Context;
 import org.mortbay.jetty.servlet.ServletHolder;
+
 import java.io.*;
 import java.util.*;
 
@@ -36,6 +39,7 @@ public class FileTailerStressTest {
 
   static final int DELAY_MIN = 10 * 1000;
   static final int DELAY_RANGE = 2 * 1000;
+  static final Logger log = Logger.getLogger(FileTailerStressTest.class);
 
   static class OccasionalWriterThread extends Thread {
     File file;
@@ -45,9 +49,9 @@ public class FileTailerStressTest {
     }
 
     public void run() {
+      PrintWriter out = null;
       try {
-        FileOutputStream fos = new FileOutputStream(file);
-        PrintWriter out = new PrintWriter(fos);
+        out = new PrintWriter(file.getAbsolutePath(), "UTF-8");
         Random rand = new Random();
         while (true) {
           int delay = rand.nextInt(DELAY_RANGE) + DELAY_MIN;
@@ -59,6 +63,9 @@ public class FileTailerStressTest {
       } catch (IOException e) {
         e.printStackTrace();
       } catch (InterruptedException e) {
+        if(out != null) {
+          out.close();
+        }
       }
     }
   }
@@ -91,7 +98,9 @@ public class FileTailerStressTest {
       ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
 
       File workdir = new File("/tmp/stresstest/");
-      workdir.mkdir();
+      if(!workdir.mkdir()) {
+        log.warn("Error creating working directory:" + workdir.getAbsolutePath());
+      }
       for (int i = 0; i < FILES_TO_USE; ++i) {
         File newTestF = new File("/tmp/stresstest/" + i);
 
@@ -102,7 +111,9 @@ public class FileTailerStressTest {
 
       Thread.sleep(60 * 1000);
       System.out.println("cleaning up");
-      workdir.delete();
+      if(!workdir.delete()) {
+        log.warn("Error clean up working directory:" + workdir.getAbsolutePath());
+      }
     } catch (Exception e) {
       e.printStackTrace();
     }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java
index f48ba9a..db1be4d 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java
@@ -19,7 +19,11 @@
 package org.apache.hadoop.chukwa.datacollection.test;
 
 
+import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.conf.Configuration;
@@ -65,14 +69,16 @@ public class SinkFileValidator {
 
         if (evt.getData().length > 1000) {
           System.out.println("got event; data: "
-              + new String(evt.getData(), 0, 1000));
+              + new String(evt.getData(), 0, 1000, Charset.forName("UTF-8")));
           System.out.println("....[truncating]");
         } else
-          System.out.println("got event; data: " + new String(evt.getData()));
+          System.out.println("got event; data: " + new String(evt.getData(), Charset.forName("UTF-8")));
         events++;
       }
       System.out.println("file looks OK!");
-    } catch (Exception e) {
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (URISyntaxException e) {
       e.printStackTrace();
     }
 

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
index 177d013..d8f5335 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
@@ -19,9 +19,11 @@
 package org.apache.hadoop.chukwa.datacollection.writer;
 
 
+import java.nio.charset.Charset;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
+
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.conf.Configuration;
 
@@ -84,7 +86,7 @@ public class ConsoleWriter implements ChukwaWriter {
         System.out.print(data.getDataType());
         System.out.print(") ");
         System.out.print(new String(data.getData(), startOffset, offset
-            - startOffset + 1));
+            - startOffset + 1, Charset.forName("UTF-8")));
         startOffset = offset + 1;
       }
     }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java
index cefb42b..2cfd216 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java
@@ -57,7 +57,7 @@ public class Dedup extends PipelineableWriter {
     final HashSet<EntryType> hs;
     final Queue<EntryType> toDrop;
     final int maxSize;
-    volatile long dupchunks = 0;
+    long dupchunks = 0;
 
     public FixedSizeCache(int size) {
       maxSize = size;

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java
index 1a681f5..a8a281e 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 
 public class ExtractorWriter extends PipelineableWriter {
 
-  public static LogDisplayServlet recipient;
+  private static LogDisplayServlet recipient;
   
   @Override
   public void close() throws WriterException {
@@ -44,4 +44,8 @@ public class ExtractorWriter extends PipelineableWriter {
       return ChukwaWriter.COMMIT_OK;
   }
 
+  public static void setRecipient(LogDisplayServlet logDisplayServlet) {
+    recipient = logDisplayServlet;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
index f2d4252..4d9e2a0 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
@@ -44,9 +44,6 @@ public class InMemoryWriter implements ChukwaWriter {
       e.printStackTrace();
       throw new WriterException(e);
     }
-    synchronized (this) {
-      notify();
-    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
index e30362d..141be20 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
@@ -111,7 +111,11 @@ public class PipelineStageWriter implements ChukwaWriter {
             writer = (ChukwaWriter) st; // one stage pipeline
         }
         return;
-      } catch (Exception e) {
+      } catch (IOException | 
+          WriterException | 
+          ClassNotFoundException | 
+          IllegalAccessException | 
+          InstantiationException e) {
         // if anything went wrong (missing class, etc) we wind up here.
         log.error("failed to set up pipeline, defaulting to SeqFileWriter", e);
         // fall through to default case

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
index 3c0d268..3803a2e 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
@@ -47,7 +47,7 @@ import org.apache.log4j.Logger;
  */
 public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter {
   static Logger log = Logger.getLogger(SeqFileWriter.class);
-  public static boolean ENABLE_ROTATION_ON_CLOSE = true;
+  private static boolean ENABLE_ROTATION_ON_CLOSE = true;
 
   protected int STAT_INTERVAL_SECONDS = 30;
   private int rotateInterval = 1000 * 60 * 5;
@@ -60,7 +60,7 @@ public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter {
   public static final String IF_FIXED_INTERVAL_OPT = "chukwaCollector.isFixedTimeRotatorScheme";
   public static final String FIXED_INTERVAL_OFFSET_OPT = "chukwaCollector.fixedTimeIntervalOffset";
   public static final String OUTPUT_DIR_OPT= "chukwaCollector.outputDir";
-  protected static String localHostAddr = null;
+  public String localHostAddr = null;
   
   protected final Semaphore lock = new Semaphore(1, true);
   
@@ -85,7 +85,7 @@ public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter {
   protected volatile long bytesThisRotate = 0;
   protected volatile boolean isRunning = false;
 
-  static {
+  public SeqFileWriter() {
     try {
       localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_";
     } catch (UnknownHostException e) {
@@ -93,8 +93,6 @@ public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter {
     }
   }
   
-  public SeqFileWriter() {}
-  
   public long getBytesWritten() {
     return dataSize;
   }
@@ -135,7 +133,7 @@ public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter {
     try {
       fs = FileSystem.get(new URI(fsname), conf);
       if (fs == null) {
-        log.error("can't connect to HDFS at " + fs.getUri() + " bail out!");
+        log.error("can't connect to HDFS.");
       }
     } catch (Throwable e) {
       log.error(
@@ -324,49 +322,45 @@ public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter {
       throw new WriterException("Collector not ready");
     }
 
-    if (chunks != null) {
-      ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
-      
-      if (System.currentTimeMillis() >= nextTimePeriodComputation) {
-        computeTimePeriod();
-      }
-      try {
-        lock.acquire();
-        for (Chunk chunk : chunks) {
-          archiveKey.setTimePartition(timePeriod);
-          archiveKey.setDataType(chunk.getDataType());
-          archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
-              + "/" + chunk.getStreamName());
-          archiveKey.setSeqId(chunk.getSeqID());
-
-          if (chunk != null) {
-            seqFileWriter.append(archiveKey, chunk);
-
-            // compute size for stats only if append succeeded. Note though that
-            // seqFileWriter.append can continue taking data for quite some time
-            // after HDFS goes down while the client is trying to reconnect. Hence
-            // these stats might not reflect reality during an HDFS outage.
-            dataSize += chunk.getData().length;
-            bytesThisRotate += chunk.getData().length;
-
-            String futureName = currentPath.getName().replace(".chukwa", ".done");
-            result.addPend(futureName, currentOutputStr.getPos());
-          }
+    ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
+    
+    if (System.currentTimeMillis() >= nextTimePeriodComputation) {
+      computeTimePeriod();
+    }
+    try {
+      lock.acquire();
+      for (Chunk chunk : chunks) {
+        archiveKey.setTimePartition(timePeriod);
+        archiveKey.setDataType(chunk.getDataType());
+        archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource()
+            + "/" + chunk.getStreamName());
+        archiveKey.setSeqId(chunk.getSeqID());
+
+        seqFileWriter.append(archiveKey, chunk);
+
+        // compute size for stats only if append succeeded. Note though that
+        // seqFileWriter.append can continue taking data for quite some time
+        // after HDFS goes down while the client is trying to reconnect. Hence
+        // these stats might not reflect reality during an HDFS outage.
+        dataSize += chunk.getData().length;
+        bytesThisRotate += chunk.getData().length;
+
+        String futureName = currentPath.getName().replace(".chukwa", ".done");
+        result.addPend(futureName, currentOutputStr.getPos());
 
-        }
-      }
-      catch (IOException e) {
-        log.error("IOException when trying to write a chunk, Collector will return error and keep running.", e);
-        return COMMIT_FAIL;
-      }
-      catch (Throwable e) {
-        // We don't want to loose anything
-        log.fatal("IOException when trying to write a chunk, Collector is going to exit!", e);
-        isRunning = false;
-      } finally {
-        lock.release();
       }
     }
+    catch (IOException e) {
+      log.error("IOException when trying to write a chunk, Collector will return error and keep running.", e);
+      return COMMIT_FAIL;
+    }
+    catch (Throwable e) {
+      // We don't want to loose anything
+      log.fatal("IOException when trying to write a chunk, Collector is going to exit!", e);
+      isRunning = false;
+    } finally {
+      lock.release();
+    }
     return result;
   }
 
@@ -405,5 +399,9 @@ public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter {
         lock.release();
     }
   }
+  
+  public static void setEnableRotationOnClose(boolean b) {
+    ENABLE_ROTATION_ON_CLOSE = b;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
index 0249b4f..88ec861 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
@@ -20,14 +20,18 @@ package org.apache.hadoop.chukwa.datacollection.writer;
 import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ArrayBlockingQueue;
+
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.util.Filter;
 import org.apache.hadoop.chukwa.util.RegexUtil.CheckedPatternSyntaxException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
+
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.nio.charset.Charset;
 import java.io.*;
+
 import org.apache.hadoop.chukwa.util.ExceptionUtil;
 
 /**
@@ -145,7 +149,7 @@ public class SocketTeeWriter extends PipelineableWriter {
           else {
             byte[] data = c.getData();
             byte[] header = (c.getSource()+ " " + c.getDataType() + " " + c.getStreamName()+ " "+  
-                c.getSeqID()+"\n").getBytes(); 
+                c.getSeqID()+"\n").getBytes(Charset.forName("UTF-8")); 
             out.writeInt(data.length+ header.length);
             out.write(header);
             out.write(data);
@@ -170,9 +174,12 @@ public class SocketTeeWriter extends PipelineableWriter {
        try { //inner try catches bad command syntax errors
         sock.setSoTimeout(timeout);
         sock.setKeepAlive(USE_KEEPALIVE);
-        in = new BufferedReader(new InputStreamReader(sock.getInputStream()));
+        in = new BufferedReader(new InputStreamReader(sock.getInputStream(), Charset.forName("UTF-8")));
         out = new DataOutputStream(sock.getOutputStream());
         String cmd = in.readLine();
+        if(cmd==null) {
+          throw new IllegalArgumentException("No input found.");
+        }
         if(!cmd.contains(" ")) {
           
           throw new IllegalArgumentException(
@@ -198,8 +205,8 @@ public class SocketTeeWriter extends PipelineableWriter {
           try {
             rules = new Filter(cmdAfterSpace);
           } catch (CheckedPatternSyntaxException pse) {
-            out.write("Error parsing command as a regex: ".getBytes());
-            out.write(pse.getMessage().getBytes());
+            out.write("Error parsing command as a regex: ".getBytes(Charset.forName("UTF-8")));
+            out.write(pse.getMessage().getBytes(Charset.forName("UTF-8")));
             out.writeByte('\n');
             out.close();
             in.close();
@@ -212,10 +219,10 @@ public class SocketTeeWriter extends PipelineableWriter {
         synchronized(tees) {
           tees.add(this);
         }
-        out.write("OK\n".getBytes());
+        out.write("OK\n".getBytes(Charset.forName("UTF-8")));
         log.info("tee to " + sock.getInetAddress() + " established");
       } catch(IllegalArgumentException e) {
-          out.write(e.toString().getBytes());
+          out.write(e.toString().getBytes(Charset.forName("UTF-8")));
           out.writeByte('\n');
           out.close();
           in.close();
@@ -239,8 +246,11 @@ public class SocketTeeWriter extends PipelineableWriter {
     public void handle(Chunk c) {
       
       //don't ever block; just ignore this chunk if we don't have room for it.
-      if(rules.matches(c)) 
-        sendQ.offer(c);
+      if(rules.matches(c)) {
+        if(!sendQ.offer(c)) {
+          log.debug("Queue is full.");
+        }
+      }
     }
   }
 
@@ -249,7 +259,6 @@ public class SocketTeeWriter extends PipelineableWriter {
   
   SocketListenThread listenThread;
   List<Tee> tees;
-  ChukwaWriter next;
   
   @Override
   public void setNextStage(ChukwaWriter next) {

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java
index 02e7907..e0ffdc4 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java
@@ -22,6 +22,7 @@ import java.io.FileNotFoundException;
 import java.net.URI;
 import java.util.concurrent.BlockingQueue;
 
+import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
 import org.apache.hadoop.chukwa.util.CopySequenceFile;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -85,8 +86,8 @@ public class LocalToRemoteHdfsMover extends Thread {
 
     remoteFs = FileSystem.get(new URI(fsname), conf);
     if (remoteFs == null && exitIfHDFSNotavailable) {
-      log.error("can't connect to HDFS at " + remoteFs.getUri() + " bail out!");
-      System.exit(-1);
+      log.error("can't connect to HDFS.");
+      throw new WriterException("can't connect to HDFS.");
     } 
     
     localFs = FileSystem.getLocal(conf);

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java
index bb0fdf6..14d9ab8 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
 import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -148,7 +149,7 @@ public class LocalWriter implements ChukwaWriter {
       }
     } catch (Throwable e) {
       log.fatal("Cannot initialize LocalWriter", e);
-      System.exit(-1);
+      throw new WriterException(e);
     }
 
     
@@ -184,7 +185,11 @@ public class LocalWriter implements ChukwaWriter {
 
   private class RotateTask extends TimerTask {
         public void run() {
-          rotate();
+          try {
+            rotate();
+          } catch(WriterException e) {
+            log.error(ExceptionUtil.getStackTrace(e));
+          }
       };
   }
   
@@ -245,11 +250,9 @@ public class LocalWriter implements ChukwaWriter {
                 + "/" + chunk.getStreamName());
             archiveKey.setSeqId(chunk.getSeqID());
 
-            if (chunk != null) {
-              seqFileWriter.append(archiveKey, chunk);
-              // compute size for stats
-              dataSize += chunk.getData().length;
-            }
+            seqFileWriter.append(archiveKey, chunk);
+            // compute size for stats
+            dataSize += chunk.getData().length;
           }
         }// End synchro
         long end = System.currentTimeMillis();
@@ -264,7 +267,6 @@ public class LocalWriter implements ChukwaWriter {
         if (writeChunkRetries < 0) {
           log
               .fatal("Too many IOException when trying to write a chunk, Collector is going to exit!");
-          System.exit(-1);
         }
         throw new WriterException(e);
       }
@@ -272,7 +274,7 @@ public class LocalWriter implements ChukwaWriter {
     return COMMIT_OK;
   }
 
-  protected void rotate() {
+  protected void rotate() throws WriterException {
     isRunning = true;
     calendar.setTimeInMillis(System.currentTimeMillis());
     log.info("start Date [" + calendar.getTime() + "]");
@@ -316,10 +318,7 @@ public class LocalWriter implements ChukwaWriter {
             SequenceFile.CompressionType.NONE, null);
 
       } catch (IOException e) {
-        log.fatal("IO Exception in rotate. Exiting!", e);
-        // Shutting down the collector
-        // Watchdog will re-start it automatically
-        System.exit(-1);
+        log.fatal("IO Exception in rotate: ", e);
       }
     }
  
@@ -336,8 +335,8 @@ public class LocalWriter implements ChukwaWriter {
     }
   
     if (freeSpace < minFreeAvailable) {
-      log.fatal("No space left on device, Bail out!");
-      System.exit(-1);
+      log.fatal("No space left on device.");
+      throw new WriterException("No space left on device.");
     } 
     
     log.debug("finished rotate()");

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
index bf64b24..40a6ff0 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java
@@ -19,12 +19,10 @@ package org.apache.hadoop.chukwa.datacollection.writer.solr;
 
 import java.io.IOException;
 import java.nio.charset.Charset;
-import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
-import java.util.TimeZone;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -106,6 +104,9 @@ public class SolrWriter extends PipelineableWriter {
         if(data.contains("mapredice")) {
           doc.addField(SERVICE, "mapreduce");
         }
+        if(data.contains("hbase")) {
+          doc.addField(SERVICE, "hbase");
+        }
         try {
           Date d = sdf.parse(data);
           doc.addField(DATE, d, 1.0f);

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/dataloader/DataLoaderFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/dataloader/DataLoaderFactory.java b/src/main/java/org/apache/hadoop/chukwa/dataloader/DataLoaderFactory.java
index c09d1ee..3b8b946 100644
--- a/src/main/java/org/apache/hadoop/chukwa/dataloader/DataLoaderFactory.java
+++ b/src/main/java/org/apache/hadoop/chukwa/dataloader/DataLoaderFactory.java
@@ -19,14 +19,16 @@
 package org.apache.hadoop.chukwa.dataloader;
 
 import java.io.IOException;
+import java.util.Arrays;
+
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 
 public abstract class DataLoaderFactory {
 
-  static ChukwaConfiguration conf = null;
-  static FileSystem fs = null;
+  ChukwaConfiguration conf = null;
+  FileSystem fs = null;
   protected FileStatus[] source = null;
 
   public DataLoaderFactory() {
@@ -37,9 +39,20 @@ public abstract class DataLoaderFactory {
    * @throws IOException
    */
   public void load(ChukwaConfiguration conf, FileSystem fs, FileStatus[] src) throws IOException {
-    this.source=src;
+    this.source=src.clone();
     this.conf=conf;
     this.fs=fs;
   }
 
+  public FileStatus[] getSource() {
+    return Arrays.copyOf(source, source.length);
+  }
+  
+  protected FileSystem getFileSystem() {
+    return fs;
+  }
+  
+  protected ChukwaConfiguration getConf() {
+    return conf;
+  }
 }

http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/dataloader/FSMDataLoader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/dataloader/FSMDataLoader.java b/src/main/java/org/apache/hadoop/chukwa/dataloader/FSMDataLoader.java
index 336a09b..009dd2b 100644
--- a/src/main/java/org/apache/hadoop/chukwa/dataloader/FSMDataLoader.java
+++ b/src/main/java/org/apache/hadoop/chukwa/dataloader/FSMDataLoader.java
@@ -43,8 +43,8 @@ public class FSMDataLoader extends DataLoaderFactory {
   protected MetricDataLoader threads[] = null;
   private static String DATA_LOADER_THREAD_LIMIT = "chukwa.data.loader.threads.limit";
   private int size = 1;
-  private static CompletionService completion = null;
-  private static ExecutorService executor = null;
+  private CompletionService completion = null;
+  private ExecutorService executor = null;
   private static String[] mappers = {
     "org.apache.hadoop.chukwa.analysis.salsa.fsm.DataNodeClientTraceMapper",
     "org.apache.hadoop.chukwa.analysis.salsa.fsm.TaskTrackerClientTraceMapper",


Mime
View raw message