hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From the...@apache.org
Subject hive git commit: HIVE-10959 : webhcat launcher job should reconnect to the running child job on task retry (Ivan Mitic via Thejas Nair)
Date Thu, 11 Jun 2015 07:14:53 GMT
Repository: hive
Updated Branches:
  refs/heads/master 612603f38 -> 9f263fcdc


HIVE-10959 : webhcat launcher job should reconnect to the running child job on task retry
(Ivan Mitic via Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9f263fcd
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9f263fcd
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9f263fcd

Branch: refs/heads/master
Commit: 9f263fcdc3ecc825d24c0212ae5286d4f6007082
Parents: 612603f
Author: Thejas Nair <thejas@hortonworks.com>
Authored: Thu Jun 11 00:13:42 2015 -0700
Committer: Thejas Nair <thejas@hortonworks.com>
Committed: Thu Jun 11 00:14:47 2015 -0700

----------------------------------------------------------------------
 .../hive/hcatalog/templeton/AppConfig.java      |   2 +
 .../hive/hcatalog/templeton/HiveDelegator.java  |  15 +-
 .../hive/hcatalog/templeton/JarDelegator.java   |   8 +-
 .../hcatalog/templeton/LauncherDelegator.java   |  14 ++
 .../hive/hcatalog/templeton/PigDelegator.java   |  13 +-
 .../apache/hive/hcatalog/templeton/Server.java  |  34 ++-
 .../hive/hcatalog/templeton/SqoopDelegator.java |  20 +-
 .../hcatalog/templeton/StreamingDelegator.java  |   3 +-
 .../templeton/tool/JobSubmissionConstants.java  |   2 +
 .../hcatalog/templeton/tool/LaunchMapper.java   | 214 ++++++++++++++-----
 .../apache/hadoop/mapred/WebHCatJTShim20S.java  |  10 +
 .../apache/hadoop/mapred/WebHCatJTShim23.java   |  18 +-
 .../apache/hadoop/hive/shims/HadoopShims.java   |   5 +
 13 files changed, 273 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9f263fcd/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
index 2b37e7f..8244274 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
@@ -96,6 +96,7 @@ public class AppConfig extends Configuration {
   public static final String EXEC_MAX_PROCS_NAME = "templeton.exec.max-procs";
   public static final String EXEC_TIMEOUT_NAME   = "templeton.exec.timeout";
   public static final String HADOOP_QUEUE_NAME   = "templeton.hadoop.queue.name";
+  public static final String ENABLE_JOB_RECONNECT_DEFAULT = "templeton.enable.job.reconnect.default";
   public static final String HADOOP_NAME         = "templeton.hadoop";
   public static final String HADOOP_CONF_DIR     = "templeton.hadoop.conf.dir";
   public static final String HCAT_NAME           = "templeton.hcat";
@@ -306,6 +307,7 @@ public class AppConfig extends Configuration {
 
   public String libJars()          { return get(LIB_JARS_NAME); }
   public String hadoopQueueName()  { return get(HADOOP_QUEUE_NAME); }
+  public String enableJobReconnectDefault() { return get(ENABLE_JOB_RECONNECT_DEFAULT); }
   public String clusterHadoop()    { return get(HADOOP_NAME); }
   public String clusterHcat()      { return get(HCAT_NAME); }
   public String clusterPython()    { return get(PYTHON_NAME); }

http://git-wip-us.apache.org/repos/asf/hive/blob/9f263fcd/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
index 7b09b8a..0ea964f 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
@@ -46,25 +46,28 @@ public class HiveDelegator extends LauncherDelegator {
   public EnqueueBean run(String user, Map<String, Object> userArgs,
                String execute, String srcFile, List<String> defines,
                List<String> hiveArgs, String otherFiles,
-               String statusdir, String callback, String completedUrl, boolean enablelog)
+               String statusdir, String callback, String completedUrl, boolean enablelog,
+               Boolean enableJobReconnect)
     throws NotAuthorizedException, BadParam, BusyException, QueueException,
     ExecuteException, IOException, InterruptedException
   {
     runAs = user;
     List<String> args = makeArgs(execute, srcFile, defines, hiveArgs, otherFiles, statusdir,
-                   completedUrl, enablelog);
+                   completedUrl, enablelog, enableJobReconnect);
 
     return enqueueController(user, userArgs, callback, args);
   }
 
   private List<String> makeArgs(String execute, String srcFile,
              List<String> defines, List<String> hiveArgs, String otherFiles,
-             String statusdir, String completedUrl, boolean enablelog)
+             String statusdir, String completedUrl, boolean enablelog,
+             Boolean enableJobReconnect)
     throws BadParam, IOException, InterruptedException
   {
     ArrayList<String> args = new ArrayList<String>();
     try {
-      args.addAll(makeBasicArgs(execute, srcFile, otherFiles, statusdir, completedUrl, enablelog));
+      args.addAll(makeBasicArgs(execute, srcFile, otherFiles, statusdir, completedUrl,
+          enablelog, enableJobReconnect));
       args.add("--");
       TempletonUtils.addCmdForWindows(args);
       addHiveMetaStoreTokenArg();
@@ -117,7 +120,7 @@ public class HiveDelegator extends LauncherDelegator {
 
   private List<String> makeBasicArgs(String execute, String srcFile, String otherFiles,
                                          String statusdir, String completedUrl,
-                                         boolean enablelog)
+                                         boolean enablelog, Boolean enableJobReconnect)
     throws URISyntaxException, IOException,
     InterruptedException
   {
@@ -135,7 +138,7 @@ public class HiveDelegator extends LauncherDelegator {
     }
 
     args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles,
-                enablelog, JobType.HIVE));
+                enablelog, enableJobReconnect, JobType.HIVE));
 
     if (appConf.hiveArchive() != null && !appConf.hiveArchive().equals(""))
     {

http://git-wip-us.apache.org/repos/asf/hive/blob/9f263fcd/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
index e5832a8..10ff2c0 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
@@ -44,13 +44,13 @@ public class JarDelegator extends LauncherDelegator {
                List<String> jarArgs, List<String> defines,
                String statusdir, String callback, 
                boolean usesHcatalog, String completedUrl,
-               boolean enablelog, JobType jobType)
+               boolean enablelog, Boolean enableJobReconnect, JobType jobType)
     throws NotAuthorizedException, BadParam, BusyException, QueueException,
     ExecuteException, IOException, InterruptedException {
     runAs = user;
     List<String> args = makeArgs(jar, mainClass,
       libjars, files, jarArgs, defines,
-      statusdir, usesHcatalog, completedUrl, enablelog, jobType);
+      statusdir, usesHcatalog, completedUrl, enablelog, enableJobReconnect, jobType);
 
     return enqueueController(user, userArgs, callback, args);
   }
@@ -59,7 +59,7 @@ public class JarDelegator extends LauncherDelegator {
                   String libjars, String files,
                   List<String> jarArgs, List<String> defines,
                   String statusdir, boolean usesHcatalog, String completedUrl,
-                  boolean enablelog, JobType jobType)
+                  boolean enablelog, Boolean enableJobReconnect, JobType jobType)
     throws BadParam, IOException, InterruptedException {
     ArrayList<String> args = new ArrayList<String>();
     try {
@@ -67,7 +67,7 @@ public class JarDelegator extends LauncherDelegator {
       allFiles.add(TempletonUtils.hadoopFsFilename(jar, appConf, runAs));
 
       args.addAll(makeLauncherArgs(appConf, statusdir,
-        completedUrl, allFiles, enablelog, jobType));
+        completedUrl, allFiles, enablelog, enableJobReconnect, jobType));
       args.add("--");
       TempletonUtils.addCmdForWindows(args);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/9f263fcd/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
index 71a3107..82e5cb8 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
@@ -118,6 +118,7 @@ public class LauncherDelegator extends TempletonDelegator {
                      String completedUrl,
                      List<String> copyFiles,
                      boolean enablelog,
+                     Boolean enableJobReconnect,
                      JobType jobType) {
     ArrayList<String> args = new ArrayList<String>();
 
@@ -150,6 +151,19 @@ public class LauncherDelegator extends TempletonDelegator {
     addDef(args, TempletonControllerJob.TEMPLETON_JOB_LAUNCH_TIME_NAME,
       Long.toString(System.currentTimeMillis()));
 
+    if (enableJobReconnect == null) {
+      // If enablejobreconnect param was not passed by a user, use a cluster
+      // wide default
+      if (appConf.enableJobReconnectDefault() != null) {
+        enableJobReconnect = Boolean.parseBoolean(appConf.enableJobReconnectDefault());
+      } else {
+        // default is false
+        enableJobReconnect = false;
+      }
+    }
+    addDef(args, TempletonControllerJob.ENABLE_JOB_RECONNECT,
+        Boolean.toString(enableJobReconnect));
+
     // Hadoop queue information
     addDef(args, "mapred.job.queue.name", appConf.hadoopQueueName());
 

http://git-wip-us.apache.org/repos/asf/hive/blob/9f263fcd/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
index a07f66a..2679a97 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
@@ -48,13 +48,14 @@ public class PigDelegator extends LauncherDelegator {
                String execute, String srcFile,
                List<String> pigArgs, String otherFiles,
                String statusdir, String callback, 
-               boolean usesHcatalog, String completedUrl, boolean enablelog)
+               boolean usesHcatalog, String completedUrl, boolean enablelog,
+               Boolean enableJobReconnect)
     throws NotAuthorizedException, BadParam, BusyException, QueueException,
     ExecuteException, IOException, InterruptedException {
     runAs = user;
     List<String> args = makeArgs(execute,
       srcFile, pigArgs,
-      otherFiles, statusdir, usesHcatalog, completedUrl, enablelog);
+      otherFiles, statusdir, usesHcatalog, completedUrl, enablelog, enableJobReconnect);
 
     return enqueueController(user, userArgs, callback, args);
   }
@@ -68,6 +69,8 @@ public class PigDelegator extends LauncherDelegator {
    * @param usesHcatalog whether the command uses hcatalog/needs to connect
    *         to hive metastore server
    * @param completedUrl call back url
+   * @param enablelog
+   * @param enableJobReconnect
    * @return list of arguments
    * @throws BadParam
    * @throws IOException
@@ -76,7 +79,8 @@ public class PigDelegator extends LauncherDelegator {
   private List<String> makeArgs(String execute, String srcFile,
                   List<String> pigArgs, String otherFiles,
                   String statusdir, boolean usesHcatalog,
-                  String completedUrl, boolean enablelog)
+                  String completedUrl, boolean enablelog,
+                  Boolean enableJobReconnect)
     throws BadParam, IOException, InterruptedException {
     ArrayList<String> args = new ArrayList<String>();
     //check if the REST command specified explicitly to use hcatalog
@@ -93,7 +97,8 @@ public class PigDelegator extends LauncherDelegator {
         allFiles.addAll(Arrays.asList(ofs));
       }
 
-      args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles, enablelog,
JobType.PIG));
+      args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles, enablelog,
+          enableJobReconnect, JobType.PIG));
       boolean shipPigTar = appConf.pigArchive() != null && !appConf.pigArchive().equals("");
       boolean shipHiveTar = needsMetastoreAccess && appConf.hiveArchive() != null

               && !appConf.hiveArchive().equals("");

http://git-wip-us.apache.org/repos/asf/hive/blob/9f263fcd/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
index 80d9d2c..27b8e38 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
@@ -647,7 +647,8 @@ public class Server {
                       @FormParam("arg") List<String> args,
                       @FormParam("statusdir") String statusdir,
                       @FormParam("callback") String callback,
-                      @FormParam("enablelog") boolean enablelog)
+                      @FormParam("enablelog") boolean enablelog,
+                      @FormParam("enablejobreconnect") Boolean enablejobreconnect)
     throws NotAuthorizedException, BusyException, BadParam, QueueException,
     ExecuteException, IOException, InterruptedException {
     verifyUser();
@@ -671,12 +672,13 @@ public class Server {
     userArgs.put("statusdir", statusdir);
     userArgs.put("callback", callback);
     userArgs.put("enablelog", Boolean.toString(enablelog));
+    userArgs.put("enablejobreconnect", enablejobreconnect);
     checkEnableLogPrerequisite(enablelog, statusdir);
 
     StreamingDelegator d = new StreamingDelegator(appConf);
     return d.run(getDoAsUser(), userArgs, inputs, inputreader, output, mapper, reducer, combiner,
       fileList, files, defines, cmdenvs, args,
-      statusdir, callback, getCompletedUrl(), enablelog, JobType.STREAMING);
+      statusdir, callback, getCompletedUrl(), enablelog, enablejobreconnect, JobType.STREAMING);
   }
 
   /**
@@ -699,7 +701,8 @@ public class Server {
                   @FormParam("statusdir") String statusdir,
                   @FormParam("callback") String callback,
                   @FormParam("usehcatalog") boolean usesHcatalog,
-                  @FormParam("enablelog") boolean enablelog)
+                  @FormParam("enablelog") boolean enablelog,
+                  @FormParam("enablejobreconnect") Boolean enablejobreconnect)
     throws NotAuthorizedException, BusyException, BadParam, QueueException,
     ExecuteException, IOException, InterruptedException {
     verifyUser();
@@ -717,6 +720,7 @@ public class Server {
     userArgs.put("statusdir", statusdir);
     userArgs.put("callback", callback);
     userArgs.put("enablelog", Boolean.toString(enablelog));
+    userArgs.put("enablejobreconnect", enablejobreconnect);
 
     checkEnableLogPrerequisite(enablelog, statusdir);
 
@@ -724,7 +728,7 @@ public class Server {
     return d.run(getDoAsUser(), userArgs,
       jar, mainClass,
       libjars, files, args, defines,
-      statusdir, callback, usesHcatalog, getCompletedUrl(), enablelog, JobType.JAR);
+      statusdir, callback, usesHcatalog, getCompletedUrl(), enablelog, enablejobreconnect,
JobType.JAR);
   }
 
   /**
@@ -747,7 +751,8 @@ public class Server {
                @FormParam("statusdir") String statusdir,
                @FormParam("callback") String callback,
                @FormParam("usehcatalog") boolean usesHcatalog,
-               @FormParam("enablelog") boolean enablelog)
+               @FormParam("enablelog") boolean enablelog,
+               @FormParam("enablejobreconnect") Boolean enablejobreconnect)
     throws NotAuthorizedException, BusyException, BadParam, QueueException,
     ExecuteException, IOException, InterruptedException {
     verifyUser();
@@ -765,6 +770,7 @@ public class Server {
     userArgs.put("statusdir", statusdir);
     userArgs.put("callback", callback);
     userArgs.put("enablelog", Boolean.toString(enablelog));
+    userArgs.put("enablejobreconnect", enablejobreconnect);
 
     checkEnableLogPrerequisite(enablelog, statusdir);
 
@@ -772,7 +778,7 @@ public class Server {
     return d.run(getDoAsUser(), userArgs,
       execute, srcFile,
       pigArgs, otherFiles,
-      statusdir, callback, usesHcatalog, getCompletedUrl(), enablelog);
+      statusdir, callback, usesHcatalog, getCompletedUrl(), enablelog, enablejobreconnect);
   }
 
    /**
@@ -784,6 +790,8 @@ public class Server {
    * @param statusdir    where the stderr/stdout of templeton controller job goes
    * @param callback     URL which WebHCat will call when the sqoop job finishes
    * @param enablelog    whether to collect mapreduce log into statusdir/logs
+   * @param enablejobreconnect    whether to reconnect to a running child job on templeton
+   *                              controller job retry
    */
   @POST
   @Path("sqoop")
@@ -794,7 +802,8 @@ public class Server {
               @FormParam("files") String otherFiles,
               @FormParam("statusdir") String statusdir,
               @FormParam("callback") String callback,
-              @FormParam("enablelog") boolean enablelog)
+              @FormParam("enablelog") boolean enablelog,
+              @FormParam("enablejobreconnect") Boolean enablejobreconnect)
     throws NotAuthorizedException, BusyException, BadParam, QueueException,
     IOException, InterruptedException {
     verifyUser();
@@ -814,9 +823,10 @@ public class Server {
     userArgs.put("statusdir", statusdir);
     userArgs.put("callback", callback);
     userArgs.put("enablelog", Boolean.toString(enablelog));
+    userArgs.put("enablejobreconnect", enablejobreconnect);
     SqoopDelegator d = new SqoopDelegator(appConf);
     return d.run(getDoAsUser(), userArgs, command, optionsFile, otherFiles,
-      statusdir, callback, getCompletedUrl(), enablelog, libdir);
+      statusdir, callback, getCompletedUrl(), enablelog, enablejobreconnect, libdir);
   }
 
   /**
@@ -833,6 +843,8 @@ public class Server {
    * @param statusdir  where the stderr/stdout of templeton controller job goes
    * @param callback   URL which WebHCat will call when the hive job finishes
    * @param enablelog  whether to collect mapreduce log into statusdir/logs
+   * @param enablejobreconnect    whether to reconnect to a running child job on templeton
+   *                              controller job retry
    */
   @POST
   @Path("hive")
@@ -844,7 +856,8 @@ public class Server {
               @FormParam("define") List<String> defines,
               @FormParam("statusdir") String statusdir,
               @FormParam("callback") String callback,
-              @FormParam("enablelog") boolean enablelog)
+              @FormParam("enablelog") boolean enablelog,
+              @FormParam("enablejobreconnect") Boolean enablejobreconnect)
     throws NotAuthorizedException, BusyException, BadParam, QueueException,
     ExecuteException, IOException, InterruptedException {
     verifyUser();
@@ -862,12 +875,13 @@ public class Server {
     userArgs.put("statusdir", statusdir);
     userArgs.put("callback", callback);
     userArgs.put("enablelog", Boolean.toString(enablelog));
+    userArgs.put("enablejobreconnect", enablejobreconnect);
 
     checkEnableLogPrerequisite(enablelog, statusdir);
 
     HiveDelegator d = new HiveDelegator(appConf);
     return d.run(getDoAsUser(), userArgs, execute, srcFile, defines, hiveArgs, otherFiles,
-      statusdir, callback, getCompletedUrl(), enablelog);
+      statusdir, callback, getCompletedUrl(), enablelog, enablejobreconnect);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/9f263fcd/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
index b205bda..9002482 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
@@ -45,9 +45,10 @@ public class SqoopDelegator extends LauncherDelegator {
   }
 
   public EnqueueBean run(String user,
-               Map<String, Object> userArgs, String command, 
-               String optionsFile, String otherFiles, String statusdir, 
-               String callback, String completedUrl, boolean enablelog, String libdir)
+               Map<String, Object> userArgs, String command,
+               String optionsFile, String otherFiles, String statusdir,
+               String callback, String completedUrl, boolean enablelog,
+               Boolean enableJobReconnect, String libdir)
   throws NotAuthorizedException, BadParam, BusyException, QueueException,
   IOException, InterruptedException
   {
@@ -59,17 +60,19 @@ public class SqoopDelegator extends LauncherDelegator {
     }
     runAs = user;
     List<String> args = makeArgs(command, optionsFile, otherFiles, statusdir,
-                   completedUrl, enablelog, libdir);
+                   completedUrl, enablelog, enableJobReconnect, libdir);
 
     return enqueueController(user, userArgs, callback, args);
   }
   private List<String> makeArgs(String command, String optionsFile, String otherFiles,
-            String statusdir, String completedUrl, boolean enablelog, String libdir)
+            String statusdir, String completedUrl, boolean enablelog,
+            Boolean enableJobReconnect, String libdir)
     throws BadParam, IOException, InterruptedException
   {
     ArrayList<String> args = new ArrayList<String>();
     try {
-      args.addAll(makeBasicArgs(optionsFile, otherFiles, statusdir, completedUrl, enablelog,
libdir));
+      args.addAll(makeBasicArgs(optionsFile, otherFiles, statusdir, completedUrl, enablelog,
+          enableJobReconnect, libdir));
       args.add("--");
       TempletonUtils.addCmdForWindows(args);
       args.add(appConf.sqoopPath());
@@ -112,7 +115,8 @@ public class SqoopDelegator extends LauncherDelegator {
   }
 
   private List<String> makeBasicArgs(String optionsFile, String otherFiles,
-            String statusdir, String completedUrl, boolean enablelog, String libdir)
+            String statusdir, String completedUrl, boolean enablelog,
+            Boolean enableJobReconnect, String libdir)
     throws URISyntaxException, FileNotFoundException, IOException,
                           InterruptedException
   {
@@ -152,7 +156,7 @@ public class SqoopDelegator extends LauncherDelegator {
       }
     }
     args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles,
-                enablelog, JobType.SQOOP));
+                enablelog, enableJobReconnect, JobType.SQOOP));
     if(TempletonUtils.isset(appConf.sqoopArchive())) {
       args.add("-archives");
       args.add(appConf.sqoopArchive());

http://git-wip-us.apache.org/repos/asf/hive/blob/9f263fcd/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
index 622440b..f487d51 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
@@ -48,6 +48,7 @@ public class StreamingDelegator extends LauncherDelegator {
                String callback,
                String completedUrl,
                boolean enableLog,
+               Boolean enableJobReconnect,
                JobType jobType)
     throws NotAuthorizedException, BadParam, BusyException, QueueException,
     ExecuteException, IOException, InterruptedException {
@@ -58,7 +59,7 @@ public class StreamingDelegator extends LauncherDelegator {
     return d.run(user, userArgs,
       appConf.streamingJar(), null,
       null, files, args, defines,
-      statusdir, callback, false, completedUrl, enableLog, jobType);
+      statusdir, callback, false, completedUrl, enableLog, enableJobReconnect, jobType);
   }
 
   private List<String> makeArgs(List<String> inputs,

http://git-wip-us.apache.org/repos/asf/hive/blob/9f263fcd/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java
index d3dc3f7..4e4fcd4 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java
@@ -22,6 +22,7 @@ public interface JobSubmissionConstants {
   public static final String COPY_NAME = "templeton.copy";
   public static final String STATUSDIR_NAME = "templeton.statusdir";
   public static final String ENABLE_LOG = "templeton.enablelog";
+  public static final String ENABLE_JOB_RECONNECT = "templeton.enablejobreconnect";
   public static final String JOB_TYPE = "templeton.jobtype";
   public static final String JAR_ARGS_NAME = "templeton.args";
   public static final String TEMPLETON_JOB_LAUNCH_TIME_NAME = "templeton.job.launch.time";
@@ -31,6 +32,7 @@ public interface JobSubmissionConstants {
   public static final String EXIT_FNAME = "exit";
   public static final int WATCHER_TIMEOUT_SECS = 10;
   public static final int KEEP_ALIVE_MSEC = 60 * 1000;
+  public static final int POLL_JOBPROGRESS_MSEC = 30 * 1000;
   /**
    * A comma-separated list of files to be added to HADOOP_CLASSPATH in 
    * {@link org.apache.hive.hcatalog.templeton.tool.LaunchMapper}.  Used to localize additional

http://git-wip-us.apache.org/repos/asf/hive/blob/9f263fcd/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java
b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java
index 91fe247..422e75e 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java
@@ -51,6 +51,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -150,13 +151,8 @@ public class LaunchMapper extends Mapper<NullWritable, NullWritable,
Text, Text>
       env.put(pathVarName, paths);
     }
   }
-  protected Process startJob(Context context, String user, String overrideClasspath)
+  protected Process startJob(Configuration conf, String jobId, String user, String overrideClasspath)
     throws IOException, InterruptedException {
-    Configuration conf = context.getConfiguration();
-
-    // Kill previously launched child MR jobs started by this launcher to prevent having
-    // same jobs running side-by-side
-    killLauncherChildJobs(conf, context.getJobID().toString());
 
     copyLocal(COPY_NAME, conf);
     String[] jarArgs = TempletonUtils.decodeArray(conf.get(JAR_ARGS_NAME));
@@ -175,7 +171,7 @@ public class LaunchMapper extends Mapper<NullWritable, NullWritable,
Text, Text>
     handleTokenFile(jarArgsList, JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER, "mapreduce.job.credentials.binary");
     handleTokenFile(jarArgsList, JobSubmissionConstants.TOKEN_FILE_ARG_PLACEHOLDER_TEZ, "tez.credentials.path");
     handleMapReduceJobTag(jarArgsList, JobSubmissionConstants.MAPREDUCE_JOB_TAGS_ARG_PLACEHOLDER,
-        JobSubmissionConstants.MAPREDUCE_JOB_TAGS, context.getJobID().toString());
+        JobSubmissionConstants.MAPREDUCE_JOB_TAGS, jobId);
     return TrivialExecService.getInstance().run(jarArgsList, removeEnv, env);
   }
 
@@ -289,17 +285,105 @@ public class LaunchMapper extends Mapper<NullWritable, NullWritable,
Text, Text>
     }
   }
 
+  /**
+   * Checks if reconnection to an already running job is enabled and supported for a given
+   * job type.
+   */
+  private boolean reconnectToRunningJobEnabledAndSupported(Configuration conf,
+      LauncherDelegator.JobType jobType) {
+    if (conf.get(ENABLE_JOB_RECONNECT) == null) {
+      return false;
+    }
+
+    Boolean enableJobReconnect = Boolean.parseBoolean(conf.get(ENABLE_JOB_RECONNECT));
+    if (!enableJobReconnect) {
+      return false;
+    }
+
+    // Reconnect is only supported for MR and Streaming jobs at this time
+    return jobType.equals(LauncherDelegator.JobType.JAR)
+        || jobType.equals(LauncherDelegator.JobType.STREAMING);
+  }
+
+  /**
+   * Attempts to reconnect to an already running child job of the templeton launcher. This
+   * is used in cases where the templeton launcher task has failed and is retried by the
+   * MR framework. If reconnect to the child job is possible, the method will continue
+   * tracking its progress until completion.
+   * @return Returns true if reconnect was successful, false if not supported or
+   *         no child jobs were found.
+   */
+  private boolean tryReconnectToRunningJob(Configuration conf, Context context,
+      LauncherDelegator.JobType jobType, String statusdir) throws IOException, InterruptedException
{
+    if (!reconnectToRunningJobEnabledAndSupported(conf, jobType)) {
+      return false;
+    }
+
+    long startTime = getTempletonLaunchTime(conf);
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    WebHCatJTShim tracker = ShimLoader.getHadoopShims().getWebHCatShim(conf, ugi);
+    try {
+      Set<String> childJobs = tracker.getJobs(context.getJobID().toString(), startTime);
+      if (childJobs.size() == 0) {
+        LOG.info("No child jobs found to reconnect with");
+        return false;
+      }
+
+      if (childJobs.size() > 1) {
+        LOG.warn(String.format(
+            "Found more than one child job to reconnect with: %s, skipping reconnect",
+            Arrays.toString(childJobs.toArray())));
+        return false;
+      }
+
+      String childJobIdString = childJobs.iterator().next();
+      org.apache.hadoop.mapred.JobID childJobId =
+          org.apache.hadoop.mapred.JobID.forName(childJobIdString);
+      LOG.info(String.format("Reconnecting to an existing job %s", childJobIdString));
+
+      // Update job state with the childJob id
+      updateJobStatePercentAndChildId(conf, context.getJobID().toString(), null, childJobIdString);
+
+      do {
+        org.apache.hadoop.mapred.JobStatus jobStatus = tracker.getJobStatus(childJobId);
+        if (jobStatus.isJobComplete()) {
+          LOG.info(String.format("Child job %s completed", childJobIdString));
+          int exitCode = 0;
+          if (jobStatus.getRunState() != org.apache.hadoop.mapred.JobStatus.SUCCEEDED) {
+            exitCode = 1;
+          }
+          updateJobStateToDoneAndWriteExitValue(conf, statusdir, context.getJobID().toString(),
+              exitCode);
+          break;
+        }
+
+        String percent = String.format("map %s%%, reduce %s%%",
+            jobStatus.mapProgress()*100, jobStatus.reduceProgress()*100);
+        updateJobStatePercentAndChildId(conf, context.getJobID().toString(), percent, null);
+
+        LOG.info("KeepAlive Heart beat");
+
+        context.progress();
+        Thread.sleep(POLL_JOBPROGRESS_MSEC);
+      } while (true);
+
+      // Reconnect was successful
+      return true;
+    }
+    catch (IOException ex) {
+      LOG.error("Exception encountered in tryReconnectToRunningJob", ex);
+      throw ex;
+    } finally {
+      tracker.close();
+    }
+  }
+
   @Override
   public void run(Context context) throws IOException, InterruptedException {
 
     Configuration conf = context.getConfiguration();
-
-    Process proc = startJob(context,
-            conf.get("user.name"),
-            conf.get(OVERRIDE_CLASSPATH));
-
+    LauncherDelegator.JobType jobType = LauncherDelegator.JobType.valueOf(conf.get(JOB_TYPE));
     String statusdir = conf.get(STATUSDIR_NAME);
-
     if (statusdir != null) {
       try {
         statusdir = TempletonUtils.addUserHomeDirectoryIfApplicable(statusdir,
@@ -311,8 +395,20 @@ public class LaunchMapper extends Mapper<NullWritable, NullWritable,
Text, Text>
       }
     }
 
-    Boolean enablelog = Boolean.parseBoolean(conf.get(ENABLE_LOG));
-    LauncherDelegator.JobType jobType = LauncherDelegator.JobType.valueOf(conf.get(JOB_TYPE));
+    // Try to reconnect to a child job if one is found
+    if (tryReconnectToRunningJob(conf, context, jobType, statusdir)) {
+      return;
+    }
+
+    // Kill previously launched child MR jobs started by this launcher to prevent having
+    // same jobs running side-by-side
+    killLauncherChildJobs(conf, context.getJobID().toString());
+
+    // Start the job
+    Process proc = startJob(conf,
+            context.getJobID().toString(),
+            conf.get("user.name"),
+            conf.get(OVERRIDE_CLASSPATH));
 
     ExecutorService pool = Executors.newCachedThreadPool();
     executeWatcher(pool, conf, context.getJobID(),
@@ -328,27 +424,67 @@ public class LaunchMapper extends Mapper<NullWritable, NullWritable,
Text, Text>
       pool.shutdownNow();
     }
 
-    writeExitValue(conf, proc.exitValue(), statusdir);
-    JobState state = new JobState(context.getJobID().toString(), conf);
-    state.setExitValue(proc.exitValue());
-    state.setCompleteStatus("done");
-    state.close();
+    updateJobStateToDoneAndWriteExitValue(conf, statusdir, context.getJobID().toString(),
+        proc.exitValue());
 
+    Boolean enablelog = Boolean.parseBoolean(conf.get(ENABLE_LOG));
     if (enablelog && TempletonUtils.isset(statusdir)) {
       LOG.info("templeton: collecting logs for " + context.getJobID().toString()
-              + " to " + statusdir + "/logs");
+               + " to " + statusdir + "/logs");
       LogRetriever logRetriever = new LogRetriever(statusdir, jobType, conf);
       logRetriever.run();
     }
+  }
 
-    if (proc.exitValue() != 0) {
+  private void updateJobStateToDoneAndWriteExitValue(Configuration conf,
+      String statusdir, String jobId, int exitCode) throws IOException {
+    writeExitValue(conf, exitCode, statusdir);
+    JobState state = new JobState(jobId, conf);
+    state.setExitValue(exitCode);
+    state.setCompleteStatus("done");
+    state.close();
+
+    if (exitCode != 0) {
       LOG.info("templeton: job failed with exit code "
-              + proc.exitValue());
+              + exitCode);
     } else {
       LOG.info("templeton: job completed with exit code 0");
     }
   }
 
+  /**
+   * Updates the job state percent and childid in the templeton storage. Update only
+   * takes place for non-null values.
+   */
+  private static void updateJobStatePercentAndChildId(Configuration conf,
+      String jobId, String percent, String childid) {
+    JobState state = null;
+    try {
+      if (percent != null || childid != null) {
+        state = new JobState(jobId, conf);
+        if (percent != null) {
+          state.setPercentComplete(percent);
+        }
+        if (childid != null) {
+          JobState childState = new JobState(childid, conf);
+          childState.setParent(jobId);
+          state.addChild(childid);
+          state.close();
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("templeton: state error: ", e);
+    } finally {
+      if (state != null) {
+        try {
+          state.close();
+        } catch (IOException e) {
+          LOG.warn(e);
+        }
+      }
+    }
+  }
+
   private void executeWatcher(ExecutorService pool, Configuration conf, JobID jobid, InputStream
in,
                               String statusdir, String name) throws IOException {
     Watcher w = new Watcher(conf, jobid, in, statusdir, name);
@@ -371,6 +507,7 @@ public class LaunchMapper extends Mapper<NullWritable, NullWritable,
Text, Text>
       PrintWriter writer = new PrintWriter(out);
       writer.println(exitValue);
       writer.close();
+      LOG.info("templeton: Exit value successfully written");
     }
   }
 
@@ -414,34 +551,9 @@ public class LaunchMapper extends Mapper<NullWritable, NullWritable,
Text, Text>
         String line;
         while ((line = reader.readLine()) != null) {
           writer.println(line);
-          JobState state = null;
-          try {
-            String percent = TempletonUtils.extractPercentComplete(line);
-            String childid = TempletonUtils.extractChildJobId(line);
-
-            if (percent != null || childid != null) {
-              state = new JobState(jobid.toString(), conf);
-              if (percent != null) {
-                state.setPercentComplete(percent);
-              }
-              if (childid != null) {
-                JobState childState = new JobState(childid, conf);
-                childState.setParent(jobid.toString());
-                state.addChild(childid);
-                state.close();
-              }
-            }
-          } catch (IOException e) {
-            LOG.error("templeton: state error: ", e);
-          } finally {
-            if (state != null) {
-              try {
-                state.close();
-              } catch (IOException e) {
-                LOG.warn(e);
-              }
-            }
-          }
+          String percent = TempletonUtils.extractPercentComplete(line);
+          String childid = TempletonUtils.extractChildJobId(line);
+          updateJobStatePercentAndChildId(conf, jobid.toString(), percent, childid);
         }
         writer.flush();
         if(out != System.err && out != System.out) {

http://git-wip-us.apache.org/repos/asf/hive/blob/9f263fcd/shims/0.20S/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java
----------------------------------------------------------------------
diff --git a/shims/0.20S/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java b/shims/0.20S/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java
index 8b165f3..367ea60 100644
--- a/shims/0.20S/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java
+++ b/shims/0.20S/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.security.UserGroupInformation;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * This is in org.apache.hadoop.mapred package because it relies on
@@ -109,5 +111,13 @@ public class WebHCatJTShim20S implements WebHCatJTShim {
   public void killJobs(String tag, long timestamp) {
     return;
   }
+  /**
+   * Get jobs is only supported on hadoop 2.0+.
+   */
+  @Override
+  public Set<String> getJobs(String tag, long timestamp)
+  {
+    return new HashSet<String>();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/9f263fcd/shims/0.23/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim23.java
----------------------------------------------------------------------
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim23.java b/shims/0.23/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim23.java
index dd27cce..c85a739 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim23.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/mapred/WebHCatJTShim23.java
@@ -21,9 +21,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
-
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
@@ -180,6 +180,22 @@ public class WebHCatJTShim23 implements WebHCatJTShim {
   }
 
   /**
+   * Returns all jobs tagged with the given tag that have been started after the
+   * given timestamp. Returned jobIds are MapReduce JobIds.
+   */
+  @Override
+  public Set<String> getJobs(String tag, long timestamp) {
+    Set<ApplicationId> childYarnJobs = getYarnChildJobs(tag, timestamp);
+    Set<String> childJobs = new HashSet<String>();
+    for(ApplicationId id : childYarnJobs) {
+      // Convert to a MapReduce job id
+      String childJobId = TypeConverter.fromYarn(id).toString();
+      childJobs.add(childJobId);
+    }
+    return childJobs;
+  }
+
+  /**
    * Queries RM for the list of applications with the given tag that have started
    * after the given timestamp.
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/9f263fcd/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index 5b7e7f6..74785e5 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -393,6 +393,11 @@ public interface HadoopShims {
      * given timestamp.
      */
     public void killJobs(String tag, long timestamp);
+    /**
+     * Returns all jobs tagged with the given tag that have been started after the
+     * given timestamp. Returned jobIds are MapReduce JobIds.
+     */
+    public Set<String> getJobs(String tag, long timestamp);
   }
 
   /**


Mime
View raw message