kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lid...@apache.org
Subject kylin git commit: KYLIN-1317 Kill underlying running hadoop job while discard a job
Date Thu, 14 Jan 2016 04:55:41 GMT
Repository: kylin
Updated Branches:
  refs/heads/1.x-staging bd73b643d -> 413645742


KYLIN-1317 Kill underlying running hadoop job while discard a job


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/41364574
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/41364574
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/41364574

Branch: refs/heads/1.x-staging
Commit: 413645742067c3520e48711d3667a835b508a44a
Parents: bd73b64
Author: lidongsjtu <dongli@ebay.com>
Authored: Thu Jan 14 12:55:01 2016 +0800
Committer: lidongsjtu <dongli@ebay.com>
Committed: Thu Jan 14 12:55:01 2016 +0800

----------------------------------------------------------------------
 .../kylin/job/common/MapReduceExecutable.java   | 27 ++++++++++++--------
 1 file changed, 17 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/41364574/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
index a2797d7..cb6e76c 100644
--- a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
+++ b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
@@ -56,9 +56,9 @@ import com.google.common.base.Preconditions;
  */
 public class MapReduceExecutable extends AbstractExecutable {
 
+    public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
     private static final String KEY_MR_JOB = "MR_JOB_CLASS";
     private static final String KEY_PARAMS = "MR_JOB_PARAMS";
-    public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
 
     public MapReduceExecutable() {
         super();
@@ -143,7 +143,7 @@ public class MapReduceExecutable extends AbstractExecutable {
             while (!isDiscarded()) {
                 JobStepStatusEnum newStatus = statusChecker.checkStatus();
                 if (status == JobStepStatusEnum.KILLED) {
-                    executableManager.updateJobOutput(getId(), ExecutableState.ERROR, Collections.<String,
String>emptyMap(), "killed by admin");
+                    executableManager.updateJobOutput(getId(), ExecutableState.ERROR, Collections.<String,
String> emptyMap(), "killed by admin");
                     return new ExecuteResult(ExecuteResult.State.FAILED, "killed by admin");
                 }
                 if (status == JobStepStatusEnum.WAITING && (newStatus == JobStepStatusEnum.FINISHED
|| newStatus == JobStepStatusEnum.ERROR || newStatus == JobStepStatusEnum.RUNNING)) {
@@ -168,8 +168,15 @@ public class MapReduceExecutable extends AbstractExecutable {
                 }
                 Thread.sleep(context.getConfig().getYarnStatusCheckIntervalSeconds() * 1000);
             }
-            //TODO kill discarded mr job using "hadoop job -kill " + mrJobId
 
+            // try to kill running map-reduce job to release resources.
+            if (job != null) {
+                try {
+                    job.killJob();
+                } catch (Exception e) {
+                    logger.warn("failed to kill hadoop job: " + job.getJobID(), e);
+                }
+            }
             return new ExecuteResult(ExecuteResult.State.DISCARDED, output.toString());
 
         } catch (ReflectiveOperationException e) {
@@ -189,7 +196,7 @@ public class MapReduceExecutable extends AbstractExecutable {
             logger.info(KylinConfig.KYLIN_JOB_YARN_APP_REST_CHECK_URL + " is not set, read
from job configuration");
         }
         String rmWebHost = HAUtil.getConfValueForRMInstance(YarnConfiguration.RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, job.getConfiguration());
-        if(HAUtil.isHAEnabled(job.getConfiguration())) {
+        if (HAUtil.isHAEnabled(job.getConfiguration())) {
             YarnConfiguration conf = new YarnConfiguration(job.getConfiguration());
             String active = RMHAUtils.findActiveRMHAId(conf);
             rmWebHost = HAUtil.getConfValueForRMInstance(HAUtil.addSuffix(YarnConfiguration.RM_WEBAPP_ADDRESS,
active), YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, conf);
@@ -214,20 +221,20 @@ public class MapReduceExecutable extends AbstractExecutable {
         addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
     }
 
-    public void setMapReduceJobClass(Class<? extends AbstractHadoopJob> clazzName)
{
-        setParam(KEY_MR_JOB, clazzName.getName());
-    }
-
     public String getMapReduceJobClass() throws ExecuteException {
         return getParam(KEY_MR_JOB);
     }
 
-    public void setMapReduceParams(String param) {
-        setParam(KEY_PARAMS, param);
+    public void setMapReduceJobClass(Class<? extends AbstractHadoopJob> clazzName)
{
+        setParam(KEY_MR_JOB, clazzName.getName());
     }
 
     public String getMapReduceParams() {
         return getParam(KEY_PARAMS);
     }
 
+    public void setMapReduceParams(String param) {
+        setParam(KEY_PARAMS, param);
+    }
+
 }


Mime
View raw message