flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-1819. ExecSource must flush events to channel periodically.
Date Thu, 25 Apr 2013 18:20:52 GMT
Updated Branches:
  refs/heads/flume-1.4 f31b3947d -> c146e5106


FLUME-1819. ExecSource must flush events to channel periodically.

(Venkatesh Sivasubramanian via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c146e510
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c146e510
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c146e510

Branch: refs/heads/flume-1.4
Commit: c146e5106c344b4ee438f4f396c3bf8544d31c66
Parents: f31b394
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Thu Apr 25 11:19:31 2013 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Thu Apr 25 11:20:32 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/flume/source/ExecSource.java   |  104 ++++++++++++---
 .../source/ExecSourceConfigurationConstants.java   |    8 +-
 .../org/apache/flume/source/TestExecSource.java    |   42 ++++++
 3 files changed, 136 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/c146e510/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
index 3c9437d..1d8d267 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
@@ -27,6 +27,8 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.flume.Channel;
@@ -34,6 +36,7 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.Source;
+import org.apache.flume.SystemClock;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.event.EventBuilder;
@@ -42,6 +45,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.nio.charset.Charset;
 
 /**
@@ -149,6 +154,7 @@ Configurable {
   private boolean restart;
   private boolean logStderr;
   private Integer bufferCount;
+  private long batchTimeout;
   private ExecRunnable runner;
   private Charset charset;
 
@@ -159,7 +165,7 @@ Configurable {
     executor = Executors.newSingleThreadExecutor();
 
     runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter,
-        restart, restartThrottle, logStderr, bufferCount, charset);
+        restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset);
 
     // FIXME: Use a callback-like executor / future to signal us upon failure.
     runnerFuture = executor.submit(runner);
@@ -178,17 +184,16 @@ Configurable {
   @Override
   public void stop() {
     logger.info("Stopping exec source with command:{}", command);
-
     if(runner != null) {
       runner.setRestart(false);
       runner.kill();
     }
+
     if (runnerFuture != null) {
       logger.debug("Stopping exec runner");
       runnerFuture.cancel(true);
       logger.debug("Exec runner stopped");
     }
-
     executor.shutdown();
 
     while (!executor.isTerminated()) {
@@ -228,6 +233,9 @@ Configurable {
     bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE,
         ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE);
 
+    batchTimeout = context.getLong(ExecSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT,
+        ExecSourceConfigurationConstants.DEFAULT_BATCH_TIME_OUT);
+
     charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET,
         ExecSourceConfigurationConstants.DEFAULT_CHARSET));
 
@@ -242,12 +250,13 @@ Configurable {
 
     public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor,
         SourceCounter sourceCounter, boolean restart, long restartThrottle,
-        boolean logStderr, int bufferCount, Charset charset) {
+        boolean logStderr, int bufferCount, long batchTimeout, Charset charset) {
       this.command = command;
       this.channelProcessor = channelProcessor;
       this.sourceCounter = sourceCounter;
       this.restartThrottle = restartThrottle;
       this.bufferCount = bufferCount;
+      this.batchTimeout = batchTimeout;
       this.restart = restart;
       this.logStderr = logStderr;
       this.charset = charset;
@@ -261,15 +270,27 @@ Configurable {
     private volatile boolean restart;
     private final long restartThrottle;
     private final int bufferCount;
+    private long batchTimeout;
     private final boolean logStderr;
     private final Charset charset;
     private Process process = null;
+    private SystemClock systemClock = new SystemClock();
+    private Long lastPushToChannel = systemClock.currentTimeMillis();
+    ScheduledExecutorService timedFlushService;
+    ScheduledFuture<?> future;
 
     @Override
     public void run() {
       do {
         String exitCode = "unknown";
         BufferedReader reader = null;
+        String line = null;
+        final List<Event> eventList = new ArrayList<Event>();
+
+        timedFlushService = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setNameFormat(
+                "timedFlushExecService" +
+                Thread.currentThread().getId() + "-%d").build());
         try {
           if(shell != null) {
             String[] commandArgs = formulateShellCommand(shell, command);
@@ -288,20 +309,39 @@ Configurable {
           stderrReader.setDaemon(true);
           stderrReader.start();
 
-          String line = null;
-          List<Event> eventList = new ArrayList<Event>();
+          future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
+              @Override
+              public void run() {
+                try {
+                  synchronized (eventList) {
+                    if(!eventList.isEmpty() && timeout()) {
+                      flushEventBatch(eventList);
+                    }
+                  }
+                } catch (Exception e) {
+                  logger.error("Exception occured when processing event batch", e);
+                  if(e instanceof InterruptedException) {
+                      Thread.currentThread().interrupt();
+                  }
+                }
+              }
+          },
+          batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);
+
           while ((line = reader.readLine()) != null) {
-            sourceCounter.incrementEventReceivedCount();
-            eventList.add(EventBuilder.withBody(line.getBytes(charset)));
-            if(eventList.size() >= bufferCount) {
-              channelProcessor.processEventBatch(eventList);
-              sourceCounter.addToEventAcceptedCount(eventList.size());
-              eventList.clear();
+            synchronized (eventList) {
+              sourceCounter.incrementEventReceivedCount();
+              eventList.add(EventBuilder.withBody(line.getBytes(charset)));
+              if(eventList.size() >= bufferCount || timeout()) {
+                flushEventBatch(eventList);
+              }
             }
           }
-          if(!eventList.isEmpty()) {
-            channelProcessor.processEventBatch(eventList);
-            sourceCounter.addToEventAcceptedCount(eventList.size());
+
+          synchronized (eventList) {
+              if(!eventList.isEmpty()) {
+                flushEventBatch(eventList);
+              }
           }
         } catch (Exception e) {
           logger.error("Failed while running command: " + command, e);
@@ -332,6 +372,17 @@ Configurable {
       } while(restart);
     }
 
+    private void flushEventBatch(List<Event> eventList){
+      channelProcessor.processEventBatch(eventList);
+      sourceCounter.addToEventAcceptedCount(eventList.size());
+      eventList.clear();
+      lastPushToChannel = systemClock.currentTimeMillis();
+    }
+
+    private boolean timeout(){
+      return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout;
+    }
+
     private static String[] formulateShellCommand(String shell, String command) {
       String[] shellArgs = shell.split("\\s+");
       String[] result = new String[shellArgs.length + 1];
@@ -344,8 +395,28 @@ Configurable {
       if(process != null) {
         synchronized (process) {
           process.destroy();
+
           try {
-            return process.waitFor();
+            int exitValue = process.waitFor();
+
+            // Stop the Thread that flushes periodically
+            if (future != null) {
+                future.cancel(true);
+            }
+
+            if (timedFlushService != null) {
+              timedFlushService.shutdown();
+              while (!timedFlushService.isTerminated()) {
+                try {
+                  timedFlushService.awaitTermination(500, TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                  logger.debug("Interrupted while waiting for exec executor service "
+                    + "to stop. Just exiting.");
+                  Thread.currentThread().interrupt();
+                }
+              }
+            }
+            return exitValue;
           } catch (InterruptedException ex) {
             Thread.currentThread().interrupt();
           }
@@ -392,6 +463,5 @@ Configurable {
         }
       }
     }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/c146e510/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
index fd5a60b..957ec7f 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
@@ -18,7 +18,6 @@
  */
 package org.apache.flume.source;
 
-import java.nio.charset.Charset;
 
 public class ExecSourceConfigurationConstants {
 
@@ -47,6 +46,13 @@ public class ExecSourceConfigurationConstants {
   public static final int DEFAULT_BATCH_SIZE = 20;
 
   /**
+   * Amount of time to wait, if the buffer size was not reached, before 
+   * to data is pushed downstream: : default 3000 ms
+   */
+  public static final String CONFIG_BATCH_TIME_OUT = "batchTimeout";
+  public static final long DEFAULT_BATCH_TIME_OUT = 3000l;
+
+  /**
    * Charset for reading input
    */
   public static final String CHARSET = "charset";

http://git-wip-us.apache.org/repos/asf/flume/blob/c146e510/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
index 77e9a44..54f71a1 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
@@ -227,6 +227,48 @@ public class TestExecSource {
     }
   }
 
+  @Test
+  public void testBatchTimeout() throws InterruptedException, LifecycleException,
+  EventDeliveryException, IOException {
+
+    String filePath = "/tmp/flume-execsource." + Thread.currentThread().getId();
+    String eventBody = "TestMessage";
+    FileOutputStream outputStream = new FileOutputStream(filePath);
+
+    context.put(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE, "50000");
+    context.put(ExecSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT, "750");
+    context.put("shell", "/bin/bash -c");
+    context.put("command", "tail -f " + filePath);
+
+    Configurables.configure(source, context);
+    source.start();
+
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+
+    for (int lineNumber = 0; lineNumber < 3; lineNumber++) {
+        outputStream.write((eventBody).getBytes());
+        outputStream.write(String.valueOf(lineNumber).getBytes());
+        outputStream.write('\n');
+        outputStream.flush();
+    }
+    outputStream.close();
+    Thread.sleep(1500);
+
+    for(int i = 0; i < 3; i++) {
+      Event event = channel.take();
+      assertNotNull(event);
+      assertNotNull(event.getBody());
+      assertEquals(eventBody + String.valueOf(i), new String(event.getBody()));
+    }
+
+    transaction.commit();
+    transaction.close();
+    source.stop();
+    File file = new File(filePath);
+    FileUtils.forceDelete(file);
+  }
+
     private void runTestShellCmdHelper(String shell, String command, String[] expectedOutput)
              throws InterruptedException, LifecycleException, EventDeliveryException, IOException
{
       context.put("shell", shell);


Mime
View raw message