flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject git commit: FLUME-1676: ExecSource should provide a configurable charset
Date Thu, 13 Dec 2012 19:38:40 GMT
Updated Branches:
  refs/heads/flume-1.4 1a9eee470 -> a96dc82c4


FLUME-1676: ExecSource should provide a configurable charset

(Nitin Verma via Brock Noland


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a96dc82c
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a96dc82c
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a96dc82c

Branch: refs/heads/flume-1.4
Commit: a96dc82c4255a080abcdce52681147f77b6c7165
Parents: 1a9eee4
Author: Brock Noland <brock@apache.org>
Authored: Thu Dec 13 13:37:32 2012 -0600
Committer: Brock Noland <brock@apache.org>
Committed: Thu Dec 13 13:37:44 2012 -0600

----------------------------------------------------------------------
 .../java/org/apache/flume/source/ExecSource.java   |   20 +++++++++++---
 .../source/ExecSourceConfigurationConstants.java   |    8 ++++++
 2 files changed, 23 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/a96dc82c/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
index 46f672f..495b03f 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
@@ -42,6 +42,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import java.nio.charset.Charset;
 
 /**
  * <p>
@@ -149,6 +150,7 @@ Configurable {
   private boolean logStderr;
   private Integer bufferCount;
   private ExecRunnable runner;
+  private Charset charset;
 
   @Override
   public void start() {
@@ -158,7 +160,7 @@ Configurable {
     counterGroup = new CounterGroup();
 
     runner = new ExecRunnable(command, getChannelProcessor(), counterGroup,
-        restart, restartThrottle, logStderr, bufferCount);
+        restart, restartThrottle, logStderr, bufferCount, charset);
 
     // FIXME: Use a callback-like executor / future to signal us upon failure.
     runnerFuture = executor.submit(runner);
@@ -224,13 +226,16 @@ Configurable {
 
     bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE,
         ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE);
+
+    charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET,
+        ExecSourceConfigurationConstants.DEFAULT_CHARSET));
   }
 
   private static class ExecRunnable implements Runnable {
 
     public ExecRunnable(String command, ChannelProcessor channelProcessor,
         CounterGroup counterGroup, boolean restart, long restartThrottle,
-        boolean logStderr, int bufferCount) {
+        boolean logStderr, int bufferCount, Charset charset) {
       this.command = command;
       this.channelProcessor = channelProcessor;
       this.counterGroup = counterGroup;
@@ -238,6 +243,7 @@ Configurable {
       this.bufferCount = bufferCount;
       this.restart = restart;
       this.logStderr = logStderr;
+      this.charset = charset;
     }
 
     private String command;
@@ -247,6 +253,7 @@ Configurable {
     private long restartThrottle;
     private int bufferCount;
     private boolean logStderr;
+    private Charset charset;
     private Process process = null;
 
     @Override
@@ -258,11 +265,11 @@ Configurable {
           String[] commandArgs = command.split("\\s+");
           process = new ProcessBuilder(commandArgs).start();
           reader = new BufferedReader(
-              new InputStreamReader(process.getInputStream()));
+              new InputStreamReader(process.getInputStream(), charset));
 
           // StderrLogger dies as soon as the input stream is invalid
           StderrReader stderrReader = new StderrReader(new BufferedReader(
-              new InputStreamReader(process.getErrorStream())), logStderr);
+              new InputStreamReader(process.getErrorStream(), charset)), logStderr);
           stderrReader.setName("StderrReader-[" + command + "]");
           stderrReader.setDaemon(true);
           stderrReader.start();
@@ -271,7 +278,7 @@ Configurable {
           List<Event> eventList = new ArrayList<Event>();
           while ((line = reader.readLine()) != null) {
             counterGroup.incrementAndGet("exec.lines.read");
-            eventList.add(EventBuilder.withBody(line.getBytes()));
+            eventList.add(EventBuilder.withBody(line.getBytes(charset)));
             if(eventList.size() >= bufferCount) {
               channelProcessor.processEventBatch(eventList);
               eventList.clear();
@@ -340,6 +347,9 @@ Configurable {
         String line = null;
         while((line = input.readLine()) != null) {
           if(logStderr) {
+            // There is no need to read 'line' with a charset
+            // as we do not to propagate it.
+            // It is in UTF-16 and would be printed in UTF-8 format.
             logger.info("StderrLogger[{}] = '{}'", ++i, line);
           }
         }

http://git-wip-us.apache.org/repos/asf/flume/blob/a96dc82c/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
index 0ba0508..1b35b01 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
@@ -18,6 +18,8 @@
  */
 package org.apache.flume.source;
 
+import java.nio.charset.Charset;
+
 public class ExecSourceConfigurationConstants {
 
   /**
@@ -43,4 +45,10 @@ public class ExecSourceConfigurationConstants {
    */
   public static final String CONFIG_BATCH_SIZE = "batchSize";
   public static final int DEFAULT_BATCH_SIZE = 20;
+
+  /**
+   * Charset for reading input
+   */
+  public static final String CHARSET = "charset";
+  public static final String DEFAULT_CHARSET = "UTF-8";
 }


Mime
View raw message