flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-1590. ExecSource should kill child process when it stops.
Date Mon, 24 Sep 2012 18:52:44 GMT
Updated Branches:
  refs/heads/flume-1.3.0 80176f340 -> dcc8a0803


FLUME-1590. ExecSource should kill child process when it stops.

(Brock Noland via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/dcc8a080
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/dcc8a080
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/dcc8a080

Branch: refs/heads/flume-1.3.0
Commit: dcc8a080317e83e7207bc6f3ec958dc83b3d0c77
Parents: 80176f3
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Mon Sep 24 11:51:25 2012 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Mon Sep 24 11:52:12 2012 -0700

----------------------------------------------------------------------
 .../java/org/apache/flume/source/ExecSource.java   |   31 ++++--
 .../org/apache/flume/source/TestExecSource.java    |   88 +++++++++++++++
 2 files changed, 109 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/dcc8a080/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
index 155f0e2..46f672f 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
@@ -179,6 +179,7 @@ Configurable {
 
     if(runner != null) {
       runner.setRestart(false);
+      runner.kill();
     }
     if (runnerFuture != null) {
       logger.debug("Stopping exec runner");
@@ -246,13 +247,13 @@ Configurable {
     private long restartThrottle;
     private int bufferCount;
     private boolean logStderr;
+    private Process process = null;
 
     @Override
     public void run() {
       do {
         String exitCode = "unknown";
         BufferedReader reader = null;
-        Process process = null;
         try {
           String[] commandArgs = command.split("\\s+");
           process = new ProcessBuilder(commandArgs).start();
@@ -292,25 +293,35 @@ Configurable {
               logger.error("Failed to close reader for exec source", ex);
             }
           }
-          if(process != null) {
-            process.destroy();
-            try {
-              exitCode = String.valueOf(process.waitFor());
-            } catch (InterruptedException ex) {
-              Thread.currentThread().interrupt();
-            }
-          }
+          exitCode = String.valueOf(kill());
         }
         if(restart) {
-          logger.info("Restarting in {}ms, exit code {}", restartThrottle, exitCode);
+          logger.info("Restarting in {}ms, exit code {}", restartThrottle,
+              exitCode);
           try {
             Thread.sleep(restartThrottle);
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
           }
+        } else {
+          logger.info("Command [" + command + "] exited with " + exitCode);
         }
       } while(restart);
     }
+    public int kill() {
+      if(process != null) {
+        synchronized (process) {
+          process.destroy();
+          try {
+            return process.waitFor();
+          } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+          }
+        }
+        return Integer.MIN_VALUE;
+      }
+      return Integer.MIN_VALUE / 2;
+    }
     public void setRestart(boolean restart) {
       this.restart = restart;
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/dcc8a080/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
index 615f2a3..8bcf320 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
@@ -22,9 +22,14 @@ package org.apache.flume.source;
 
 import static org.junit.Assert.*;
 
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.Random;
+import java.util.regex.Pattern;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.flume.Channel;
@@ -143,4 +148,87 @@ public class TestExecSource {
 
     source.stop();
   }
+
+
+  /**
+   * Tests to make sure that the shutdown mechanism works. There are races
+   * in this test if the system has another sleep command running with the
+   * same sleep interval but we pick rarely used sleep times and make an
+   * effort to detect if our sleep time is already in use. Note the
+   * ps -ef command should work on both macs and linux.
+   */
+  @Test
+  public void testShutdown() throws Exception {
+    int seconds = 272; // pick a rare sleep time
+
+    // now find one that is not in use
+    boolean searchForCommand = true;
+    while(searchForCommand) {
+      searchForCommand = false;
+      String command = "sleep " + seconds;
+      Pattern pattern = Pattern.compile("\b" + command + "\b");
+      for(String line : exec("ps -ef")) {
+        if(pattern.matcher(line).find()) {
+          seconds++;
+          searchForCommand = true;
+          break;
+        }
+      }
+    }
+
+    // yes in the mean time someone could use our sleep time
+    // but this should be a fairly rare scenerio
+
+    String command = "sleep " + seconds;
+    Pattern pattern = Pattern.compile("\b" + command + "\b");
+
+    Channel channel = new MemoryChannel();
+    Context context = new Context();
+
+    context.put(ExecSourceConfigurationConstants.CONFIG_RESTART, "false");
+
+    context.put("command", command);
+    Configurables.configure(source, context);
+    Configurables.configure(channel, context);
+
+    ChannelSelector rcs = new ReplicatingChannelSelector();
+    rcs.setChannels(Lists.newArrayList(channel));
+
+    source.setChannelProcessor(new ChannelProcessor(rcs));
+    source.start();
+    Thread.sleep(1000L);
+    source.stop();
+    Thread.sleep(1000L);
+    for(String line : exec("ps -ef")) {
+      if(pattern.matcher(line).find()) {
+        Assert.fail("Found [" + line + "]");
+      }
+    }
+  }
+
+  private static List<String> exec(String command) throws Exception {
+    String[] commandArgs = command.split("\\s+");
+    Process process = new ProcessBuilder(commandArgs).start();
+    BufferedReader reader = null;
+    try {
+      reader = new BufferedReader(
+          new InputStreamReader(process.getInputStream()));
+      List<String> result = Lists.newArrayList();
+      String line;
+      while((line = reader.readLine()) != null) {
+        result.add(line);
+      }
+      return result;
+    } finally {
+      process.destroy();
+      if(reader != null) {
+        reader.close();
+      }
+      int exit = process.waitFor();
+      if(exit != 0) {
+        throw new IllegalStateException("Command [" + command + "] exited with "
+            + exit);
+      }
+    }
+  }
 }


Mime
View raw message