kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From billy...@apache.org
Subject [23/29] kylin git commit: KYLIN-2857 code review
Date Mon, 18 Sep 2017 05:44:16 GMT
KYLIN-2857 code review


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e4624779
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e4624779
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e4624779

Branch: refs/heads/security_update
Commit: e4624779bcf8e3a68b53c77700c9ff695559c76a
Parents: 21a2560
Author: Li Yang <liyang@apache.org>
Authored: Sun Sep 17 19:03:51 2017 +0800
Committer: Li Yang <liyang@apache.org>
Committed: Sun Sep 17 21:34:38 2017 +0800

----------------------------------------------------------------------
 .../engine/mr/common/MapReduceExecutable.java   | 29 +++++---------------
 .../test_case_data/sandbox/kylin_job_conf.xml   | 26 ++++++++++++++++++
 .../sandbox/kylin_job_conf_inmem.xml            | 17 ++++++++----
 3 files changed, 45 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/e4624779/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index 4e6458f..94874dc 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -32,6 +32,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
@@ -80,10 +81,8 @@ public class MapReduceExecutable extends AbstractExecutable {
                 return;
             }
             try {
-                String params = getMapReduceParams();
-                String[] args = params.trim().split("\\s+");
-                Configuration conf = HadoopUtil.getCurrentConfiguration();
-                overwriteJobConf(conf, executableContext.getConfig(), args);
+                Configuration conf = new Configuration(HadoopUtil.getCurrentConfiguration());
+                overwriteJobConf(conf, executableContext.getConfig(), getMapReduceParams().trim().split("\\s+"));
                 Job job = new Cluster(conf).getJob(JobID.forName(mrJobId));
                 if (job == null || job.getJobState() == JobStatus.State.FAILED) {
                     //remove previous mr job info
@@ -107,15 +106,12 @@ public class MapReduceExecutable extends AbstractExecutable {
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         final String mapReduceJobClass = getMapReduceJobClass();
-        String params = getMapReduceParams();
         Preconditions.checkNotNull(mapReduceJobClass);
-        Preconditions.checkNotNull(params);
         try {
             Job job;
             ExecutableManager mgr = getManager();
-            String[] args = params.trim().split("\\s+");
-            Configuration conf = HadoopUtil.getCurrentConfiguration();
-            String[] jobArgs = overwriteJobConf(conf, context.getConfig(), args);
+            Configuration conf = new Configuration(HadoopUtil.getCurrentConfiguration());
+            String[] jobArgs = overwriteJobConf(conf, context.getConfig(), getMapReduceParams().trim().split("\\s+"));
             final Map<String, String> extra = mgr.getOutput(getId()).getExtra();
             if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) {
                 job = new Cluster(conf).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID)));
@@ -125,12 +121,9 @@ public class MapReduceExecutable extends AbstractExecutable {
                 final AbstractHadoopJob hadoopJob = constructor.newInstance();
                 hadoopJob.setConf(conf);
                 hadoopJob.setAsync(true); // so the ToolRunner.run() returns right away
-                logger.info("parameters of the MapReduceExecutable: {}", params);
+                logger.info("parameters of the MapReduceExecutable: {}", getMapReduceParams());
                 try {
-                    //for async mr job, ToolRunner just return 0;
 
-                    // use this method instead of ToolRunner.run() because ToolRunner.run()
is not thread-sale
-                    // Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe
                     hadoopJob.run(jobArgs);
 
                     if (hadoopJob.isSkipped()) {
@@ -150,14 +143,6 @@ public class MapReduceExecutable extends AbstractExecutable {
             final StringBuilder output = new StringBuilder();
             final HadoopCmdOutput hadoopCmdOutput = new HadoopCmdOutput(job, output);
 
-            //            final String restStatusCheckUrl = getRestStatusCheckUrl(job, context.getConfig());
-            //            if (restStatusCheckUrl == null) {
-            //                logger.error("restStatusCheckUrl is null");
-            //                return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl
is null");
-            //            }
-            //            String mrJobId = hadoopCmdOutput.getMrJobId();
-            //            boolean useKerberosAuth = context.getConfig().isGetJobStatusWithKerberos();
-            //            HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl,
mrJobId, output, useKerberosAuth);
             JobStepStatusEnum status = JobStepStatusEnum.NEW;
             while (!isDiscarded() && !isPaused()) {
 
@@ -269,7 +254,7 @@ public class MapReduceExecutable extends AbstractExecutable {
         String fileName = commandLine.getOptionValue(BatchConstants.ARG_CONF);
         String cubeName = commandLine.getOptionValue(BatchConstants.ARG_CUBE_NAME);
         Preconditions.checkArgument(cubeName != null && fileName != null, "Can't
get job config");
-        conf.addResource(fileName);
+        conf.addResource(new Path(fileName));
         for (Map.Entry<String, String> entry : CubeManager.getInstance(config).getCube(cubeName).getConfig()
                 .getMRConfigOverride().entrySet()) {
             conf.set(entry.getKey(), entry.getValue());

http://git-wip-us.apache.org/repos/asf/kylin/blob/e4624779/examples/test_case_data/sandbox/kylin_job_conf.xml
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin_job_conf.xml b/examples/test_case_data/sandbox/kylin_job_conf.xml
index 8f5817e..64e69a4 100644
--- a/examples/test_case_data/sandbox/kylin_job_conf.xml
+++ b/examples/test_case_data/sandbox/kylin_job_conf.xml
@@ -76,4 +76,30 @@
         <description>Block replication</description>
     </property>
 
+    <!-- memory configuration for sandbox -->
+    <property>
+        <name>mapreduce.map.memory.mb</name>
+        <value>384</value>
+    </property>
+
+    <property>
+        <name>mapreduce.map.java.opts</name>
+        <value>-Xmx350m</value>
+    </property>
+
+    <property>
+        <name>mapreduce.task.io.sort.mb</name>
+        <value>30</value>
+    </property>
+
+    <property>
+        <name>mapreduce.reduce.memory.mb</name>
+        <value>384</value>
+    </property>
+
+    <property>
+        <name>mapreduce.reduce.java.opts</name>
+        <value>-Xmx350m</value>
+    </property>
+
 </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/e4624779/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml b/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml
index b05495f..5094d24 100644
--- a/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml
+++ b/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml
@@ -75,23 +75,30 @@
         <description>Block replication</description>
     </property>
 
-    <!--Additional config for in-mem cubing, giving mapper more memory -->
+    <!-- memory configuration for sandbox -->
     <property>
         <name>mapreduce.map.memory.mb</name>
         <value>768</value>
-        <description></description>
     </property>
 
     <property>
         <name>mapreduce.map.java.opts</name>
         <value>-Xmx700m</value>
-        <description></description>
     </property>
 
     <property>
         <name>mapreduce.task.io.sort.mb</name>
-        <value>10</value>
-        <description></description>
+        <value>30</value>
+    </property>
+
+    <property>
+        <name>mapreduce.reduce.memory.mb</name>
+        <value>512</value>
+    </property>
+
+    <property>
+        <name>mapreduce.reduce.java.opts</name>
+        <value>-Xmx400m</value>
     </property>
 
 </configuration>
\ No newline at end of file


Mime
View raw message