Return-Path: X-Original-To: apmail-kylin-commits-archive@minotaur.apache.org Delivered-To: apmail-kylin-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9855A101AE for ; Fri, 16 Jan 2015 22:33:11 +0000 (UTC) Received: (qmail 9027 invoked by uid 500); 16 Jan 2015 22:33:13 -0000 Delivered-To: apmail-kylin-commits-archive@kylin.apache.org Received: (qmail 8995 invoked by uid 500); 16 Jan 2015 22:33:13 -0000 Mailing-List: contact commits-help@kylin.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kylin.incubator.apache.org Delivered-To: mailing list commits@kylin.incubator.apache.org Received: (qmail 8986 invoked by uid 99); 16 Jan 2015 22:33:13 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Jan 2015 22:33:13 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 16 Jan 2015 22:32:42 +0000 Received: (qmail 7441 invoked by uid 99); 16 Jan 2015 22:32:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Jan 2015 22:32:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 67AD6E03C4; Fri, 16 Jan 2015 22:32:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: liyang@apache.org To: commits@kylin.incubator.apache.org Date: Fri, 16 Jan 2015 22:32:37 -0000 Message-Id: <5a7bcb67bb924e19a2917316100bf24b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-kylin git commit: [KYLIN-528] Fix the problem in BuildCubeWithEngine and add BuildIIWithEngine X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-kylin Updated Branches: refs/heads/inverted-index cdb0cb9b4 -> df750451f [KYLIN-528] Fix the problem in BuildCubeWithEngine and add BuildIIWithEngine Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/08857b4f Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/08857b4f Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/08857b4f Branch: refs/heads/inverted-index Commit: 08857b4fe51c039b42adb45a9290998f6a6267f5 Parents: cdb0cb9 Author: Shao feng, Shi Authored: Fri Jan 16 15:20:57 2015 +0800 Committer: Shao feng, Shi Committed: Fri Jan 16 15:20:57 2015 +0800 ---------------------------------------------------------------------- .../common/util/HBaseMetadataTestCase.java | 2 +- .../com/kylinolap/common/util/HiveClient.java | 31 ++- .../com/kylinolap/common/util/BasicTest.java | 114 ++++----- .../com/kylinolap/common/util/RangeSetTest.java | 38 +-- .../java/com/kylinolap/cube/CubeDescTest.java | 21 ++ .../main/java/com/kylinolap/dict/ISegment.java | 4 + examples/test_case_data/sandbox/hive-site.xml | 7 + examples/test_case_data/sandbox/mapred-site.xml | 5 - .../com/kylinolap/invertedindex/IIManager.java | 2 +- .../com/kylinolap/job/AbstractJobBuilder.java | 108 +++++++++ .../java/com/kylinolap/job/JoinedFlatTable.java | 22 +- .../com/kylinolap/job/common/HqlExecutable.java | 49 +++- .../job/constant/ExecutableConstants.java | 3 + .../kylinolap/job/cube/CubingJobBuilder.java | 117 ++-------- .../job/hadoop/cube/FactDistinctColumnsJob.java | 25 +- .../job/hadoop/hive/IIJoinedFlatTableDesc.java | 2 - .../invertedindex/IIDistinctColumnsJob.java | 19 +- .../hadoop/invertedindex/IIFlattenHiveJob.java | 16 +- .../com/kylinolap/job/invertedindex/IIJob.java | 54 +++++ .../job/invertedindex/IIJobBuilder.java | 234 +++++++++++++++++++ .../kylinolap/job/BuildCubeWithEngineTest.java | 2 +- .../kylinolap/job/BuildIIWithEngineTest.java | 194 +++++++++++++++ .../job/hadoop/hive/JoinedFlatTableTest.java | 8 +- .../com/kylinolap/metadata/MetadataManager.java | 8 +- pom.xml | 2 +- server/.settings/.jsdtscope | 13 ++ ...ipse.wst.common.project.facet.core.prefs.xml | 7 + .../org.eclipse.wst.jsdt.ui.superType.container | 1 + .../org.eclipse.wst.jsdt.ui.superType.name | 1 + .../com/kylinolap/rest/service/JobService.java | 2 +- 30 files changed, 866 insertions(+), 245 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/common/src/main/java/com/kylinolap/common/util/HBaseMetadataTestCase.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/kylinolap/common/util/HBaseMetadataTestCase.java b/common/src/main/java/com/kylinolap/common/util/HBaseMetadataTestCase.java index 3b39912..cf4ccb0 100644 --- a/common/src/main/java/com/kylinolap/common/util/HBaseMetadataTestCase.java +++ b/common/src/main/java/com/kylinolap/common/util/HBaseMetadataTestCase.java @@ -57,5 +57,5 @@ public class HBaseMetadataTestCase extends AbstractKylinTestCase { String useSandbox = System.getProperty("useSandbox"); return Boolean.parseBoolean(useSandbox); } - + } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/common/src/main/java/com/kylinolap/common/util/HiveClient.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/kylinolap/common/util/HiveClient.java b/common/src/main/java/com/kylinolap/common/util/HiveClient.java index 002899d..4bc60c7 100644 --- a/common/src/main/java/com/kylinolap/common/util/HiveClient.java +++ b/common/src/main/java/com/kylinolap/common/util/HiveClient.java @@ -18,6 +18,8 @@ package com.kylinolap.common.util; import java.io.IOException; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.common.StatsSetupConst; @@ -27,9 +29,15 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsUtils; +/** + * Hive meta API client for Kylin + * @author shaoshi + * + */ public class HiveClient { protected HiveConf hiveConf = null; @@ -40,6 +48,11 @@ public class HiveClient { hiveConf = new HiveConf(HiveClient.class); } + public HiveClient(Map configMap) { + this(); + appendConfiguration(configMap); + } + public HiveConf getHiveConf() { return hiveConf; } @@ -58,15 +71,29 @@ public class HiveClient { } /** + * Append or overwrite the default hive client configuration; You need call this before invoke #executeHQL; + * @param configMap + */ + public void appendConfiguration(Map configMap) { + if (configMap != null && configMap.size() > 0) { + for (Entry e : configMap.entrySet()) { + hiveConf.set(e.getKey(), e.getValue()); + } + } + } + + /** * * @param hql * @throws CommandNeedRetryException * @throws IOException */ public void executeHQL(String hql) throws CommandNeedRetryException, IOException { - int retCode = getDriver().run(hql).getResponseCode(); + CommandProcessorResponse response = getDriver().run(hql); + int retCode = response.getResponseCode(); if (retCode != 0) { - throw new IOException("Failed to execute hql [" + hql + "], return code from hive driver : [" + retCode + "]"); + String err = response.getErrorMessage(); + throw new IOException("Failed to execute hql [" + hql + "], error message is: " + err); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/common/src/test/java/com/kylinolap/common/util/BasicTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/com/kylinolap/common/util/BasicTest.java b/common/src/test/java/com/kylinolap/common/util/BasicTest.java index eb6f59f..43f1832 100644 --- a/common/src/test/java/com/kylinolap/common/util/BasicTest.java +++ b/common/src/test/java/com/kylinolap/common/util/BasicTest.java @@ -1,57 +1,57 @@ -package com.kylinolap.common.util; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; - -import com.google.common.collect.DiscreteDomain; -import com.google.common.collect.Range; -import com.google.common.collect.RangeSet; -import com.google.common.collect.TreeRangeSet; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.*; - -/** - * Created by honma on 10/17/14. - *

- * Keep this test case to test basic java functionality - * development concept proving use - */ -@Ignore("convenient trial tool for dev") -@SuppressWarnings("unused") -public class BasicTest { - protected static final org.slf4j.Logger log = LoggerFactory.getLogger(BasicTest.class); - private void log(ByteBuffer a) { - Integer x = 4; - foo(x); - } - - private void foo(Long a) { - System.out.printf("a"); - - } - - private void foo(Integer b) { - System.out.printf("b"); - } - - @Test - @Ignore("convenient trial tool for dev") - public void test1() throws IOException, InterruptedException { - - RangeSet rangeSet = TreeRangeSet.create(); - Range a = Range.closed(1, 10); - Range b = Range.closedOpen(11, 15); - Range newa = a.canonical(DiscreteDomain.integers()); - Range newb = b.canonical(DiscreteDomain.integers()); - rangeSet.add(newa); - rangeSet.add(newb); - System.out.println(rangeSet); - } - - @Test - @Ignore("fix it later") - public void test2() throws IOException { - } -} +//package com.kylinolap.common.util; +// +//import java.io.IOException; +//import java.nio.ByteBuffer; +//import java.nio.charset.Charset; +// +//import com.google.common.collect.DiscreteDomain; +//import com.google.common.collect.Range; +//import com.google.common.collect.RangeSet; +//import com.google.common.collect.TreeRangeSet; +//import org.junit.Ignore; +//import org.junit.Test; +//import org.slf4j.*; +// +///** +// * Created by honma on 10/17/14. +// *

+// * Keep this test case to test basic java functionality +// * development concept proving use +// */ +//@Ignore("convenient trial tool for dev") +//@SuppressWarnings("unused") +//public class BasicTest { +// protected static final org.slf4j.Logger log = LoggerFactory.getLogger(BasicTest.class); +// private void log(ByteBuffer a) { +// Integer x = 4; +// foo(x); +// } +// +// private void foo(Long a) { +// System.out.printf("a"); +// +// } +// +// private void foo(Integer b) { +// System.out.printf("b"); +// } +// +// @Test +// @Ignore("convenient trial tool for dev") +// public void test1() throws IOException, InterruptedException { +// +// RangeSet rangeSet = TreeRangeSet.create(); +// Range a = Range.closed(1, 10); +// Range b = Range.closedOpen(11, 15); +// Range newa = a.canonical(DiscreteDomain.integers()); +// Range newb = b.canonical(DiscreteDomain.integers()); +// rangeSet.add(newa); +// rangeSet.add(newb); +// System.out.println(rangeSet); +// } +// +// @Test +// @Ignore("fix it later") +// public void test2() throws IOException { +// } +//} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/common/src/test/java/com/kylinolap/common/util/RangeSetTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/com/kylinolap/common/util/RangeSetTest.java b/common/src/test/java/com/kylinolap/common/util/RangeSetTest.java index 6ca8974..39668a0 100644 --- a/common/src/test/java/com/kylinolap/common/util/RangeSetTest.java +++ b/common/src/test/java/com/kylinolap/common/util/RangeSetTest.java @@ -9,23 +9,23 @@ import java.io.IOException; * Created by Hongbin Ma(Binmahone) on 1/13/15. */ public class RangeSetTest { - @Test - public void test1() throws IOException, InterruptedException { - RangeSet rangeSet = TreeRangeSet.create(); - Range a = Range.closedOpen(1, 2); - Range b = Range.closedOpen(2, 3); - Range newa = a.canonical(DiscreteDomain.integers()); - Range newb = b.canonical(DiscreteDomain.integers()); - rangeSet.add(newa); - rangeSet.add(newb); - System.out.println(rangeSet); - - for (Range r : rangeSet.asRanges()) { - ContiguousSet s = ContiguousSet.create(r, DiscreteDomain.integers()); - for (Integer x : s) { - System.out.println(x); - } - } - - } +// @Test +// public void test1() throws IOException, InterruptedException { +// RangeSet rangeSet = TreeRangeSet.create(); +// Range a = Range.closedOpen(1, 2); +// Range b = Range.closedOpen(2, 3); +// Range newa = a.canonical(DiscreteDomain.integers()); +// Range newb = b.canonical(DiscreteDomain.integers()); +// rangeSet.add(newa); +// rangeSet.add(newb); +// System.out.println(rangeSet); +// +// for (Range r : rangeSet.asRanges()) { +// ContiguousSet s = ContiguousSet.create(r, DiscreteDomain.integers()); +// for (Integer x : s) { +// System.out.println(x); +// } +// } +// +// } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/cube/src/test/java/com/kylinolap/cube/CubeDescTest.java ---------------------------------------------------------------------- diff --git a/cube/src/test/java/com/kylinolap/cube/CubeDescTest.java b/cube/src/test/java/com/kylinolap/cube/CubeDescTest.java index db38f34..cf9ab4b 100644 --- a/cube/src/test/java/com/kylinolap/cube/CubeDescTest.java +++ b/cube/src/test/java/com/kylinolap/cube/CubeDescTest.java @@ -16,11 +16,15 @@ package com.kylinolap.cube; +import java.util.HashMap; +import java.util.Map; + import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.google.common.collect.Maps; import com.kylinolap.common.util.JsonUtil; import com.kylinolap.common.util.LocalFileMetadataTestCase; import com.kylinolap.cube.model.CubeDesc; @@ -54,5 +58,22 @@ public class CubeDescTest extends LocalFileMetadataTestCase { CubeDesc cubeDesc = CubeDescManager.getInstance(this.getTestConfig()).getCubeDesc("test_kylin_cube_with_slr_desc"); Assert.assertNotNull(cubeDesc); } + + @Test + public void testSerializeMap() throws Exception { + Map map = Maps.newHashMap(); + + map.put("key1", "value1"); + map.put("key2", "value2"); + + String mapStr = JsonUtil.writeValueAsString(map); + + System.out.println(mapStr); + + Map map2 = JsonUtil.readValue(mapStr, HashMap.class); + + Assert.assertEquals(map, map2); + + } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/dictionary/src/main/java/com/kylinolap/dict/ISegment.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/com/kylinolap/dict/ISegment.java b/dictionary/src/main/java/com/kylinolap/dict/ISegment.java index d755e8b..dd2c14d 100644 --- a/dictionary/src/main/java/com/kylinolap/dict/ISegment.java +++ b/dictionary/src/main/java/com/kylinolap/dict/ISegment.java @@ -10,4 +10,8 @@ public interface ISegment { public abstract int getColumnLength(TblColRef col); public abstract Dictionary getDictionary(TblColRef col); + + public String getName(); + + public String getUuid(); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/examples/test_case_data/sandbox/hive-site.xml ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/hive-site.xml b/examples/test_case_data/sandbox/hive-site.xml new file mode 100644 index 0000000..5fcbd10 --- /dev/null +++ b/examples/test_case_data/sandbox/hive-site.xml @@ -0,0 +1,7 @@ + + + + hive.metastore.uris + thrift://sandbox.hortonworks.com:9083 + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/examples/test_case_data/sandbox/mapred-site.xml ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/mapred-site.xml b/examples/test_case_data/sandbox/mapred-site.xml index 24e2d7c..3a910f9 100644 --- a/examples/test_case_data/sandbox/mapred-site.xml +++ b/examples/test_case_data/sandbox/mapred-site.xml @@ -102,11 +102,6 @@ - hive.metastore.uris - thrift://sandbox.hortonworks.com:9083 - - - mapreduce.map.output.compress false http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/invertedindex/src/main/java/com/kylinolap/invertedindex/IIManager.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/com/kylinolap/invertedindex/IIManager.java b/invertedindex/src/main/java/com/kylinolap/invertedindex/IIManager.java index c961e80..130ee26 100644 --- a/invertedindex/src/main/java/com/kylinolap/invertedindex/IIManager.java +++ b/invertedindex/src/main/java/com/kylinolap/invertedindex/IIManager.java @@ -231,7 +231,7 @@ public class IIManager implements IRealizationProvider { * (pass 0 if full build) * @return */ - private IISegment buildSegment(IIInstance IIInstance, long startDate, long endDate) { + public IISegment buildSegment(IIInstance IIInstance, long startDate, long endDate) { IISegment segment = new IISegment(); String incrementalSegName = IISegment.getSegmentName(startDate, endDate); segment.setUuid(UUID.randomUUID().toString()); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/job/src/main/java/com/kylinolap/job/AbstractJobBuilder.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/com/kylinolap/job/AbstractJobBuilder.java b/job/src/main/java/com/kylinolap/job/AbstractJobBuilder.java new file mode 100644 index 0000000..65de419 --- /dev/null +++ b/job/src/main/java/com/kylinolap/job/AbstractJobBuilder.java @@ -0,0 +1,108 @@ +/* + * Copyright 2013-2014 eBay Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.kylinolap.job; + +import java.io.IOException; + +import com.kylinolap.dict.ISegment; +import com.kylinolap.job.common.ShellExecutable; +import com.kylinolap.job.constant.ExecutableConstants; +import com.kylinolap.job.engine.JobEngineConfig; +import com.kylinolap.job.hadoop.hive.IJoinedFlatTableDesc; +import com.kylinolap.job.impl.threadpool.AbstractExecutable; + +public abstract class AbstractJobBuilder { + + protected static final String JOB_WORKING_DIR_PREFIX = "kylin-"; + + protected JobEngineConfig jobEngineConfig; + protected ISegment segment; + protected String submitter; + + public abstract AbstractExecutable buildJob(); + + public AbstractJobBuilder setSegment(ISegment segment) { + this.segment = segment; + return this; + } + + public AbstractJobBuilder setJobEnginConfig(JobEngineConfig enginConfig) { + this.jobEngineConfig = enginConfig; + return this; + } + + public AbstractJobBuilder setSubmitter(String submitter) { + this.submitter = submitter; + return this; + } + + public JobEngineConfig getJobEngineConfig() { + return jobEngineConfig; + } + + public ISegment getSegment() { + return segment; + } + + + public String getSubmitter() { + return submitter; + } + + + protected StringBuilder appendExecCmdParameters(StringBuilder cmd, String paraName, String paraValue) { + return cmd.append(" -").append(paraName).append(" ").append(paraValue); + } + + protected String getIntermediateHiveTableName(IJoinedFlatTableDesc intermediateTableDesc, String jobUuid) { + return intermediateTableDesc.getTableName(jobUuid); + } + + protected String getIntermediateHiveTableLocation(IJoinedFlatTableDesc intermediateTableDesc, String jobUUID) { + return getJobWorkingDir(jobUUID) + "/" + intermediateTableDesc.getTableName(jobUUID); + } + + protected AbstractExecutable createIntermediateHiveTableStep(IJoinedFlatTableDesc intermediateTableDesc, String jobId) { + + final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, jobId); + final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, getJobWorkingDir(jobId), jobId); + String insertDataHqls; + try { + insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobId, this.jobEngineConfig); + } catch (IOException e1) { + e1.printStackTrace(); + throw new RuntimeException("Failed to generate insert data SQL for intermediate table."); + } + + ShellExecutable step = new ShellExecutable(); + StringBuffer buf = new StringBuffer(); + buf.append("hive -e \""); + buf.append(dropTableHql + "\n"); + buf.append(createTableHql + "\n"); + buf.append(insertDataHqls + "\n"); + buf.append("\""); + + step.setCmd(buf.toString()); + step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); + + return step; + } + + + protected String getJobWorkingDir(String uuid) { + return jobEngineConfig.getHdfsWorkingDirectory() + "/" + JOB_WORKING_DIR_PREFIX + uuid; + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/job/src/main/java/com/kylinolap/job/JoinedFlatTable.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/com/kylinolap/job/JoinedFlatTable.java b/job/src/main/java/com/kylinolap/job/JoinedFlatTable.java index a97fc5b..6095575 100644 --- a/job/src/main/java/com/kylinolap/job/JoinedFlatTable.java +++ b/job/src/main/java/com/kylinolap/job/JoinedFlatTable.java @@ -53,7 +53,7 @@ import com.kylinolap.metadata.model.TblColRef; public class JoinedFlatTable { - public static String getTableDir(CubeJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir, String jobUUID) { + public static String getTableDir(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir, String jobUUID) { return storageDfsDir + "/" + intermediateTableDesc.getTableName(jobUUID); } @@ -74,7 +74,7 @@ public class JoinedFlatTable { ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\177'" + "\n"); ddl.append("STORED AS SEQUENCEFILE" + "\n"); - ddl.append("LOCATION '" + storageDfsDir + "/" + intermediateTableDesc.getTableName(jobUUID) + "'"); + ddl.append("LOCATION '" + storageDfsDir + "/" + intermediateTableDesc.getTableName(jobUUID) + "';").append("\n"); // ddl.append("TBLPROPERTIES ('serialization.null.format'='\\\\N')" + // ";\n"); return ddl.toString(); @@ -82,12 +82,12 @@ public class JoinedFlatTable { public static String generateDropTableStatement(IJoinedFlatTableDesc intermediateTableDesc, String jobUUID) { StringBuilder ddl = new StringBuilder(); - ddl.append("DROP TABLE IF EXISTS " + intermediateTableDesc.getTableName(jobUUID)); + ddl.append("DROP TABLE IF EXISTS " + intermediateTableDesc.getTableName(jobUUID) + ";").append("\n"); return ddl.toString(); } - public static String[] generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, String jobUUID, JobEngineConfig engineConfig) throws IOException { - List sqlList = Lists.newArrayList(); + public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, String jobUUID, JobEngineConfig engineConfig) throws IOException { + StringBuilder sql = new StringBuilder(); File hadoopPropertiesFile = new File(engineConfig.getHadoopJobConfFilePath(intermediateTableDesc.getCapacity())); @@ -103,7 +103,7 @@ public class JoinedFlatTable { String name = doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue(); String value = doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue(); if (name.equals("tmpjars") == false) { - sqlList.add("SET " + name + "=" + value); + sql.append("SET " + name + "=" + value + ";").append("\n"); } } @@ -115,12 +115,12 @@ public class JoinedFlatTable { } // hard coded below mr parameters to enable map-side join - sqlList.add("SET hive.exec.compress.output=true"); - sqlList.add("SET hive.auto.convert.join.noconditionaltask = true"); - sqlList.add("SET hive.auto.convert.join.noconditionaltask.size = 300000000"); - sqlList.add("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName(jobUUID) + " " + generateSelectDataStatement(intermediateTableDesc)); + sql.append("SET hive.exec.compress.output=true;").append("\n"); + sql.append("SET hive.auto.convert.join.noconditionaltask = true;").append("\n"); + sql.append("SET hive.auto.convert.join.noconditionaltask.size = 300000000;").append("\n"); + sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName(jobUUID) + " " + generateSelectDataStatement(intermediateTableDesc) + ";").append("\n"); - return sqlList.toArray(new String[sqlList.size()]); + return sql.toString(); } public static String generateSelectDataStatement(IJoinedFlatTableDesc intermediateTableDesc) { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/job/src/main/java/com/kylinolap/job/common/HqlExecutable.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/com/kylinolap/job/common/HqlExecutable.java b/job/src/main/java/com/kylinolap/job/common/HqlExecutable.java index bfd76f4..c56401e 100644 --- a/job/src/main/java/com/kylinolap/job/common/HqlExecutable.java +++ b/job/src/main/java/com/kylinolap/job/common/HqlExecutable.java @@ -1,16 +1,21 @@ package com.kylinolap.job.common; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.datanucleus.store.types.backed.HashMap; + +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.Lists; import com.kylinolap.common.util.HiveClient; +import com.kylinolap.common.util.JsonUtil; import com.kylinolap.job.dao.JobPO; import com.kylinolap.job.exception.ExecuteException; import com.kylinolap.job.execution.ExecutableContext; import com.kylinolap.job.execution.ExecuteResult; import com.kylinolap.job.impl.threadpool.AbstractExecutable; -import org.apache.commons.lang.StringUtils; - -import java.util.Collections; -import java.util.List; /** * Created by qianzhou on 1/15/15. @@ -18,6 +23,7 @@ import java.util.List; public class HqlExecutable extends AbstractExecutable { private static final String HQL = "hql"; + private static final String HIVE_CONFIG = "hive-config"; public HqlExecutable() { } @@ -29,7 +35,9 @@ public class HqlExecutable extends AbstractExecutable { @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { try { - HiveClient hiveClient = new HiveClient(); + Map configMap = getConfiguration(); + HiveClient hiveClient = new HiveClient(configMap); + for (String hql: getHqls()) { hiveClient.executeHQL(hql); } @@ -39,15 +47,42 @@ public class HqlExecutable extends AbstractExecutable { return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); } } + + public void setConfiguration(Map configMap) { + if(configMap != null) { + String configStr = ""; + try { + configStr = JsonUtil.writeValueAsString(configMap); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + setParam(HIVE_CONFIG, configStr); + } + } + + private Map getConfiguration() { + String configStr = getParam(HIVE_CONFIG); + Map result = null; + if(configStr != null) { + try { + result = JsonUtil.readValue(configStr, HashMap.class); + } catch (Exception e) { + e.printStackTrace(); + } + } + + return result; + } + public void setHqls(List hqls) { - setParam(HQL, StringUtils.join(hqls, ",")); + setParam(HQL, StringUtils.join(hqls, ";")); } private List getHqls() { final String hqls = getParam(HQL); if (hqls != null) { - return Lists.newArrayList(StringUtils.split(hqls, ",")); + return Lists.newArrayList(StringUtils.split(hqls, ";")); } else { return Collections.emptyList(); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/job/src/main/java/com/kylinolap/job/constant/ExecutableConstants.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/com/kylinolap/job/constant/ExecutableConstants.java b/job/src/main/java/com/kylinolap/job/constant/ExecutableConstants.java index 6286ed5..adf6122 100644 --- a/job/src/main/java/com/kylinolap/job/constant/ExecutableConstants.java +++ b/job/src/main/java/com/kylinolap/job/constant/ExecutableConstants.java @@ -36,6 +36,9 @@ public final class ExecutableConstants { public static final String STEP_NAME_BULK_LOAD_HFILE = "Load HFile to HBase Table"; public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data"; public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info"; + + public static final String STEP_NAME_BUILD_II = "Build Inverted Index"; + public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile"; public static final String PROP_ENGINE_CONTEXT = "jobengineConfig"; public static final String PROP_JOB_FLOW = "jobFlow"; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/job/src/main/java/com/kylinolap/job/cube/CubingJobBuilder.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/com/kylinolap/job/cube/CubingJobBuilder.java b/job/src/main/java/com/kylinolap/job/cube/CubingJobBuilder.java index 918cff7..b1bb264 100644 --- a/job/src/main/java/com/kylinolap/job/cube/CubingJobBuilder.java +++ b/job/src/main/java/com/kylinolap/job/cube/CubingJobBuilder.java @@ -13,16 +13,13 @@ import org.apache.commons.lang3.StringUtils; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.kylinolap.common.util.HiveClient; import com.kylinolap.cube.CubeSegment; -import com.kylinolap.job.JoinedFlatTable; +import com.kylinolap.cube.model.CubeDesc; +import com.kylinolap.job.AbstractJobBuilder; import com.kylinolap.job.common.HadoopShellExecutable; import com.kylinolap.job.common.MapReduceExecutable; import com.kylinolap.job.constant.ExecutableConstants; import com.kylinolap.job.engine.JobEngineConfig; -import com.kylinolap.job.exception.ExecuteException; -import com.kylinolap.job.execution.ExecutableContext; -import com.kylinolap.job.execution.ExecuteResult; import com.kylinolap.job.hadoop.cube.BaseCuboidJob; import com.kylinolap.job.hadoop.cube.CubeHFileJob; import com.kylinolap.job.hadoop.cube.FactDistinctColumnsJob; @@ -38,44 +35,28 @@ import com.kylinolap.job.impl.threadpool.AbstractExecutable; /** * Created by qianzhou on 12/25/14. */ -public final class CubingJobBuilder { - - private static final String JOB_WORKING_DIR_PREFIX = "kylin-"; - - private JobEngineConfig jobEngineConfig; - private CubeSegment segment; - private String submitter; +public final class CubingJobBuilder extends AbstractJobBuilder { private CubingJobBuilder() {} - + public static CubingJobBuilder newBuilder() { return new CubingJobBuilder(); } - - public CubingJobBuilder setSegment(CubeSegment segment) { - this.segment = segment; - return this; - } - - public CubingJobBuilder setJobEnginConfig(JobEngineConfig enginConfig) { - this.jobEngineConfig = enginConfig; - return this; - } - - public CubingJobBuilder setSubmitter(String submitter) { - this.submitter = submitter; - return this; + + protected CubeDesc getCubeDesc() { + return ((CubeSegment)segment).getCubeDesc(); } - + public CubingJob buildJob() { checkPreconditions(); - final int groupRowkeyColumnsCount = segment.getCubeDesc().getRowkey().getNCuboidBuildLevels(); - final int totalRowkeyColumnsCount = segment.getCubeDesc().getRowkey().getRowKeyColumns().length; + final int groupRowkeyColumnsCount = getCubeDesc().getRowkey().getNCuboidBuildLevels(); + final int totalRowkeyColumnsCount = getCubeDesc().getRowkey().getRowKeyColumns().length; CubingJob result = initialJob("BUILD"); final String jobId = result.getId(); - final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(segment.getCubeDesc(), this.segment); + final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(getCubeDesc(), (CubeSegment)this.segment); final String intermediateHiveTableName = getIntermediateHiveTableName(intermediateTableDesc, jobId); + final String intermediateHiveTableLocation = getIntermediateHiveTableLocation(intermediateTableDesc, jobId); final String factDistinctColumnsPath = getFactDistinctColumnsPath(jobId); final String cuboidRootPath = getJobWorkingDir(jobId) + "/" + getCubeName() + "/cuboid/"; final String cuboidPath = cuboidRootPath + "*"; @@ -89,7 +70,7 @@ public final class CubingJobBuilder { result.addTask(createBuildDictionaryStep(factDistinctColumnsPath)); // base cuboid step - final MapReduceExecutable baseCuboidStep = createBaseCuboidStep(intermediateHiveTableName, cuboidOutputTempPath); + final MapReduceExecutable baseCuboidStep = createBaseCuboidStep(intermediateHiveTableLocation, cuboidOutputTempPath); result.addTask(baseCuboidStep); // n dim cuboid steps @@ -116,7 +97,7 @@ public final class CubingJobBuilder { checkPreconditions(); CubingJob result = initialJob("MERGE"); final String jobId = result.getId(); - List mergingSegments = segment.getCubeInstance().getMergingSegments(segment); + List mergingSegments = ((CubeSegment)segment).getCubeInstance().getMergingSegments((CubeSegment)segment); Preconditions.checkState(mergingSegments != null && mergingSegments.size() > 1, "there should be more than 2 segments to merge"); String[] cuboidPaths = new String[mergingSegments.size()]; for (int i = 0; i < mergingSegments.size(); i++) { @@ -168,20 +149,12 @@ public final class CubingJobBuilder { Preconditions.checkNotNull(this.jobEngineConfig, "jobEngineConfig cannot be null"); } - private String getJobWorkingDir(String uuid) { - return jobEngineConfig.getHdfsWorkingDirectory() + "/" + JOB_WORKING_DIR_PREFIX + uuid; - } - private String getPathToMerge(CubeSegment segment) { return getJobWorkingDir(segment.getLastBuildJobID()) + "/" + getCubeName() + "/cuboid/*"; } private String getCubeName() { - return segment.getCubeInstance().getName(); - } - - private String getSegmentName() { - return segment.getName(); + return ((CubeSegment)segment).getCubeInstance().getName(); } private String getRowkeyDistributionOutputPath() { @@ -190,7 +163,7 @@ public final class CubingJobBuilder { private void appendMapReduceParameters(StringBuilder builder, JobEngineConfig engineConfig) { try { - String jobConf = engineConfig.getHadoopJobConfFilePath(segment.getCubeDesc().getCapacity()); + String jobConf = engineConfig.getHadoopJobConfFilePath(getCubeDesc().getCapacity()); if (jobConf != null && jobConf.length() > 0) { builder.append(" -conf ").append(jobConf); } @@ -212,60 +185,19 @@ public final class CubingJobBuilder { return paths; } - private StringBuilder appendExecCmdParameters(StringBuilder cmd, String paraName, String paraValue) { - return cmd.append(" -").append(paraName).append(" ").append(paraValue); - } - - private String getIntermediateHiveTableName(CubeJoinedFlatTableDesc intermediateTableDesc, String jobUuid) { - return JoinedFlatTable.getTableDir(intermediateTableDesc, getJobWorkingDir(jobUuid), jobUuid); - } private String getFactDistinctColumnsPath(String jobUuid) { return getJobWorkingDir(jobUuid) + "/" + getCubeName() + "/fact_distinct_columns"; } private String getHTableName() { - return segment.getStorageLocationIdentifier(); + return ((CubeSegment)segment).getStorageLocationIdentifier(); } private String getHFilePath(String jobId) { return getJobWorkingDir(jobId) + "/" + getCubeName() + "/hfile/"; } - private AbstractExecutable createIntermediateHiveTableStep(CubeJoinedFlatTableDesc intermediateTableDesc, String jobId) { - - final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, jobId); - final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, getJobWorkingDir(jobId), jobId); - String[] insertDataHqls; - try { - insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobId, this.jobEngineConfig); - } catch (IOException e1) { - e1.printStackTrace(); - throw new RuntimeException("Failed to generate insert data SQL for intermediate table."); - } - - final String[] insertDataHqlsCopy = insertDataHqls; - AbstractExecutable step = new AbstractExecutable() { - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - HiveClient hiveClient = new HiveClient(); - try { - hiveClient.executeHQL(new String[] { dropTableHql, createTableHql }); - hiveClient.executeHQL(insertDataHqlsCopy); - } catch (Exception e) { - e.printStackTrace(); - throw new ExecuteException("Failed to createIntermediateHiveTable;", e); - } - return new ExecuteResult(ExecuteResult.State.SUCCEED); - } - }; - - step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); - - return step; - } - private MapReduceExecutable createFactDistinctColumnsStep(String intermediateHiveTableName, String jobId) { MapReduceExecutable result = new MapReduceExecutable(); result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS); @@ -273,10 +205,9 @@ public final class CubingJobBuilder { StringBuilder cmd = new StringBuilder(); appendMapReduceParameters(cmd, jobEngineConfig); appendExecCmdParameters(cmd, "cubename", getCubeName()); - appendExecCmdParameters(cmd, "input", intermediateHiveTableName); appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(jobId)); appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + getCubeName() + "_Step"); - appendExecCmdParameters(cmd, "htablename", new CubeJoinedFlatTableDesc(segment.getCubeDesc(), segment).getTableName(jobId)); + appendExecCmdParameters(cmd, "tablename", intermediateHiveTableName); result.setMapReduceParams(cmd.toString()); return result; @@ -288,7 +219,7 @@ public final class CubingJobBuilder { buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY); StringBuilder cmd = new StringBuilder(); appendExecCmdParameters(cmd, "cubename", getCubeName()); - appendExecCmdParameters(cmd, "segmentname", getSegmentName()); + appendExecCmdParameters(cmd, "segmentname", segment.getName()); appendExecCmdParameters(cmd, "input", factDistinctColumnsPath); buildDictionaryStep.setJobParams(cmd.toString()); @@ -296,7 +227,7 @@ public final class CubingJobBuilder { return buildDictionaryStep; } - private MapReduceExecutable createBaseCuboidStep(String intermediateHiveTableName, String[] cuboidOutputTempPath) { + private MapReduceExecutable createBaseCuboidStep(String intermediateHiveTableLocation, String[] cuboidOutputTempPath) { // base cuboid job MapReduceExecutable baseCuboidStep = new MapReduceExecutable(); @@ -306,8 +237,8 @@ public final class CubingJobBuilder { baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID); appendExecCmdParameters(cmd, "cubename", getCubeName()); - appendExecCmdParameters(cmd, "segmentname", getSegmentName()); - appendExecCmdParameters(cmd, "input", intermediateHiveTableName); + appendExecCmdParameters(cmd, "segmentname", segment.getName()); + appendExecCmdParameters(cmd, "input", intermediateHiveTableLocation); appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]); appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + getCubeName()); appendExecCmdParameters(cmd, "level", "0"); @@ -326,7 +257,7 @@ public final class CubingJobBuilder { appendMapReduceParameters(cmd, jobEngineConfig); appendExecCmdParameters(cmd, "cubename", getCubeName()); - appendExecCmdParameters(cmd, "segmentname", getSegmentName()); + appendExecCmdParameters(cmd, "segmentname", segment.getName()); appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]); appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]); appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + getCubeName() + "_Step"); @@ -420,7 +351,7 @@ public final class CubingJobBuilder { appendMapReduceParameters(cmd, jobEngineConfig); appendExecCmdParameters(cmd, "cubename", getCubeName()); - appendExecCmdParameters(cmd, "segmentname", getSegmentName()); + appendExecCmdParameters(cmd, "segmentname", segment.getName()); appendExecCmdParameters(cmd, "input", inputPath); appendExecCmdParameters(cmd, "output", outputPath); appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + getCubeName() + "_Step"); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java index e46ef88..4c9d66e 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java @@ -25,7 +25,6 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.ShortWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.ToolRunner; @@ -53,29 +52,24 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { try { options.addOption(OPTION_JOB_NAME); options.addOption(OPTION_CUBE_NAME); - options.addOption(OPTION_INPUT_PATH); - options.addOption(OPTION_INPUT_FORMAT); options.addOption(OPTION_OUTPUT_PATH); - options.addOption(OPTION_HTABLE_NAME); + options.addOption(OPTION_TABLE_NAME); parseOptions(options, args); job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); String cubeName = getOptionValue(OPTION_CUBE_NAME); - Path input = new Path(getOptionValue(OPTION_INPUT_PATH)); - String inputFormat = getOptionValue(OPTION_INPUT_FORMAT); Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - String intermediateTable = getOptionValue(OPTION_HTABLE_NAME); + String intermediateTable = getOptionValue(OPTION_TABLE_NAME); // ---------------------------------------------------------------------------- // add metadata to distributed cache CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cubeInstance = cubeMgr.getCube(cubeName); - String factTableName = cubeInstance.getDescriptor().getFactTable(); job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); System.out.println("Starting: " + job.getJobName()); - setupMapInput(input, inputFormat, intermediateTable); + setupMapInput(intermediateTable); setupReduceOutput(output); // CubeSegment seg = cubeMgr.getCube(cubeName).getTheOnlySegment(); @@ -91,8 +85,8 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { } - private void setupMapInput(Path input, String inputFormat, String intermediateTable) throws IOException { - FileInputFormat.setInputPaths(job, input); + private void setupMapInput(String intermediateTable) throws IOException { +// FileInputFormat.setInputPaths(job, input); File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath()); if (JarFile.exists()) { @@ -101,15 +95,6 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { job.setJarByClass(this.getClass()); } - /* - if ("text".equalsIgnoreCase(inputFormat) || "textinputformat".equalsIgnoreCase(inputFormat)) { - job.setInputFormatClass(TextInputFormat.class); - } else { - job.setInputFormatClass(SequenceFileInputFormat.class); - } - */ -// HCatInputFormat.setInput(job, "default", -// factTableName); String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable); HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/job/src/main/java/com/kylinolap/job/hadoop/hive/IIJoinedFlatTableDesc.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/com/kylinolap/job/hadoop/hive/IIJoinedFlatTableDesc.java b/job/src/main/java/com/kylinolap/job/hadoop/hive/IIJoinedFlatTableDesc.java index c940519..635dacb 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/hive/IIJoinedFlatTableDesc.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/hive/IIJoinedFlatTableDesc.java @@ -5,14 +5,12 @@ import java.util.Map; import java.util.TreeMap; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.kylinolap.cube.model.CubeDesc; import com.kylinolap.invertedindex.model.IIDesc; import com.kylinolap.metadata.model.DataModelDesc; import com.kylinolap.metadata.model.JoinDesc; import com.kylinolap.metadata.model.LookupDesc; import com.kylinolap.metadata.model.TblColRef; -import com.sun.org.apache.xml.internal.utils.StringComparable; /** * Created by Hongbin Ma(Binmahone) on 12/30/14. http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java index 6428786..8ca4de0 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIDistinctColumnsJob.java @@ -25,7 +25,6 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.ShortWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.ToolRunner; @@ -55,30 +54,26 @@ public class IIDistinctColumnsJob extends AbstractHadoopJob { try { options.addOption(OPTION_JOB_NAME); options.addOption(OPTION_TABLE_NAME); - options.addOption(OPTION_INPUT_PATH); - options.addOption(OPTION_INPUT_FORMAT); - options.addOption(OPTION_INPUT_DELIM); +// options.addOption(OPTION_INPUT_PATH); options.addOption(OPTION_OUTPUT_PATH); parseOptions(options, args); job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); String tableName = getOptionValue(OPTION_TABLE_NAME).toUpperCase(); - Path input = new Path(getOptionValue(OPTION_INPUT_PATH)); - String inputFormat = getOptionValue(OPTION_INPUT_FORMAT); - String inputDelim = getOptionValue(OPTION_INPUT_DELIM); +// Path input = new Path(getOptionValue(OPTION_INPUT_PATH)); Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); // ---------------------------------------------------------------------------- - log.info("Starting: " + job.getJobName()); + log.info("Starting: " + job.getJobName() + " on table " + tableName); // pass table and columns MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); TableDesc table = metaMgr.getTableDesc(tableName); - job.getConfiguration().set(BatchConstants.TABLE_NAME, tableName); + job.getConfiguration().set(BatchConstants.TABLE_NAME, table.getIdentity()); job.getConfiguration().set(BatchConstants.TABLE_COLUMNS, getColumns(table)); - setupMapInput(input, inputFormat, inputDelim); + setupMapInput(); setupReduceOutput(output); return waitForCompletion(job); @@ -101,8 +96,8 @@ public class IIDistinctColumnsJob extends AbstractHadoopJob { return buf.toString(); } - private void setupMapInput(Path input, String inputFormat, String inputDelim) throws IOException { - FileInputFormat.setInputPaths(job, input); + private void setupMapInput() throws IOException { +// FileInputFormat.setInputPaths(job, input); File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath()); if (JarFile.exists()) { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIFlattenHiveJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIFlattenHiveJob.java b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIFlattenHiveJob.java index 0d7c3d5..087f23b 100644 --- a/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIFlattenHiveJob.java +++ b/job/src/main/java/com/kylinolap/job/hadoop/invertedindex/IIFlattenHiveJob.java @@ -6,13 +6,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.kylinolap.common.KylinConfig; -import com.kylinolap.common.util.HiveClient; import com.kylinolap.invertedindex.IIDescManager; import com.kylinolap.invertedindex.IIInstance; import com.kylinolap.invertedindex.IIManager; import com.kylinolap.invertedindex.model.IIDesc; import com.kylinolap.job.JobInstance; import com.kylinolap.job.JoinedFlatTable; +import com.kylinolap.job.cmd.ICommandOutput; +import com.kylinolap.job.cmd.ShellCmd; import com.kylinolap.job.engine.JobEngineConfig; import com.kylinolap.job.hadoop.AbstractHadoopJob; import com.kylinolap.job.hadoop.hive.IIJoinedFlatTableDesc; @@ -44,19 +45,22 @@ public class IIFlattenHiveJob extends AbstractHadoopJob { String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, jobUUID); String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, // JobInstance.getJobWorkingDir(jobUUID, engineConfig.getHdfsWorkingDirectory()), jobUUID); - String[] insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobUUID, engineConfig); + String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobUUID, engineConfig); StringBuffer buf = new StringBuffer(); + buf.append("hive -e \""); buf.append(dropTableHql + "\n"); buf.append(createTableHql + "\n"); buf.append(insertDataHqls + "\n"); - + buf.append("\""); + System.out.println(buf.toString()); System.out.println("========================"); - HiveClient hiveClient = new HiveClient(); - hiveClient.executeHQL(new String[] { dropTableHql, createTableHql }); - hiveClient.executeHQL(insertDataHqls); + ShellCmd cmd = new ShellCmd(buf.toString(), null, null, null, false); + ICommandOutput output = cmd.execute(); + System.out.println(output.getOutput()); + System.out.println(output.getExitCode()); return 0; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/job/src/main/java/com/kylinolap/job/invertedindex/IIJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/com/kylinolap/job/invertedindex/IIJob.java b/job/src/main/java/com/kylinolap/job/invertedindex/IIJob.java new file mode 100644 index 0000000..993a6f0 --- /dev/null +++ b/job/src/main/java/com/kylinolap/job/invertedindex/IIJob.java @@ -0,0 +1,54 @@ +/* + * Copyright 2013-2014 eBay Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.kylinolap.job.invertedindex; + +import com.kylinolap.job.dao.JobPO; +import com.kylinolap.job.impl.threadpool.DefaultChainedExecutable; + +/** + * Created by shaoshi on 1/15/15. + */ +public class IIJob extends DefaultChainedExecutable { + + public IIJob() { + super(); + } + + public IIJob(JobPO job) { + super(job); + } + + private static final String II_INSTANCE_NAME = "iiName"; + private static final String SEGMENT_ID = "segmentId"; + + + void setIIName(String name) { + setParam(II_INSTANCE_NAME, name); + } + + public String getIIName() { + return getParam(II_INSTANCE_NAME); + } + + void setSegmentId(String segmentId) { + setParam(SEGMENT_ID, segmentId); + } + + public String getSegmentId() { + return getParam(SEGMENT_ID); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/job/src/main/java/com/kylinolap/job/invertedindex/IIJobBuilder.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/com/kylinolap/job/invertedindex/IIJobBuilder.java b/job/src/main/java/com/kylinolap/job/invertedindex/IIJobBuilder.java new file mode 100644 index 0000000..1c96716 --- /dev/null +++ b/job/src/main/java/com/kylinolap/job/invertedindex/IIJobBuilder.java @@ -0,0 +1,234 @@ +/* + * Copyright 2013-2014 eBay Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.kylinolap.job.invertedindex; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.TimeZone; + +import com.google.common.base.Preconditions; +import com.kylinolap.cube.model.CubeDesc.RealizationCapacity; +import com.kylinolap.invertedindex.IISegment; +import com.kylinolap.invertedindex.model.IIDesc; +import com.kylinolap.job.AbstractJobBuilder; +import com.kylinolap.job.common.HadoopShellExecutable; +import com.kylinolap.job.common.MapReduceExecutable; +import com.kylinolap.job.constant.ExecutableConstants; +import com.kylinolap.job.engine.JobEngineConfig; +import com.kylinolap.job.hadoop.dict.CreateInvertedIndexDictionaryJob; +import com.kylinolap.job.hadoop.hive.IIJoinedFlatTableDesc; +import com.kylinolap.job.hadoop.invertedindex.IIBulkLoadJob; +import com.kylinolap.job.hadoop.invertedindex.IICreateHFileJob; +import com.kylinolap.job.hadoop.invertedindex.IICreateHTableJob; +import com.kylinolap.job.hadoop.invertedindex.IIDistinctColumnsJob; +import com.kylinolap.job.hadoop.invertedindex.InvertedIndexJob; +import com.kylinolap.job.impl.threadpool.AbstractExecutable; + +/** + * Created by shaoshi on 1/15/15. + */ +public final class IIJobBuilder extends AbstractJobBuilder { + + private IIJobBuilder() { + + } + + public static IIJobBuilder newBuilder() { + return new IIJobBuilder(); + } + + public IIJob buildJob() { + checkPreconditions(); + + IIJob result = initialJob("BUILD"); + final String jobId = result.getId(); + final IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(getIIDesc()); + final String intermediateHiveTableName = getIntermediateHiveTableName(intermediateTableDesc, jobId); + final String intermediateHiveTableLocation = getIntermediateHiveTableLocation(intermediateTableDesc, jobId); + final String factTableName = getIIDesc().getFactTableName(); + final String factDistinctColumnsPath = getIIDistinctColumnsPath(jobId); + final String iiRootPath = getJobWorkingDir(jobId) + "/" + getIIName() + "/"; + final String iiPath = iiRootPath + "*"; + + final AbstractExecutable intermediateHiveTableStep = createIntermediateHiveTableStep(intermediateTableDesc, jobId); + result.addTask(intermediateHiveTableStep); + + result.addTask(createFactDistinctColumnsStep(factTableName, jobId, factDistinctColumnsPath)); + + result.addTask(createBuildDictionaryStep(factDistinctColumnsPath)); + + result.addTask(createInvertedIndexStep(intermediateHiveTableLocation, iiRootPath)); + + result.addTask(this.createCreateHTableStep()); + + // create htable step + result.addTask(createCreateHTableStep()); + + // generate hfiles step + result.addTask(createConvertToHfileStep(iiPath, jobId)); + // bulk load step + result.addTask(createBulkLoadStep(jobId)); + + + return result; + } + + protected IIDesc getIIDesc() { + return ((IISegment)segment).getIIDesc(); + } + + private IIJob initialJob(String type) { + IIJob result = new IIJob(); + SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss"); + format.setTimeZone(TimeZone.getTimeZone(jobEngineConfig.getTimeZone())); + result.setIIName(getIIName()); + result.setSegmentId(segment.getUuid()); + result.setName(getIIName() + " - " + segment.getName() + " - " + type + " - " + format.format(new Date(System.currentTimeMillis()))); + result.setSubmitter(this.submitter); + return result; + } + + private void checkPreconditions() { + Preconditions.checkNotNull(this.segment, "segment cannot be null"); + Preconditions.checkNotNull(this.jobEngineConfig, "jobEngineConfig cannot be null"); + } + + private String getIIName() { + return getIIDesc().getName(); + } + + private void appendMapReduceParameters(StringBuilder builder, JobEngineConfig engineConfig) { + try { + String jobConf = engineConfig.getHadoopJobConfFilePath(RealizationCapacity.MEDIUM); + if (jobConf != null && jobConf.length() > 0) { + builder.append(" -conf ").append(jobConf); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + + private String getIIDistinctColumnsPath(String jobUuid) { + return getJobWorkingDir(jobUuid) + "/" + getIIName() + "/ii_distinct_columns"; + } + + private String getHTableName() { + return ((IISegment)segment).getStorageLocationIdentifier(); + } + + private String getHFilePath(String jobId) { + return getJobWorkingDir(jobId) + "/" + getIIName() + "/hfile/"; + } + + private MapReduceExecutable createFactDistinctColumnsStep(String factTableName, String jobId, String output) { + MapReduceExecutable result = new MapReduceExecutable(); + result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS); + result.setMapReduceJobClass(IIDistinctColumnsJob.class); + StringBuilder cmd = new StringBuilder(); + appendMapReduceParameters(cmd, jobEngineConfig); + appendExecCmdParameters(cmd, "tablename", factTableName); + appendExecCmdParameters(cmd, "output", output); + appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + getIIName() + "_Step"); + + result.setMapReduceParams(cmd.toString()); + return result; + } + + private HadoopShellExecutable createBuildDictionaryStep(String factDistinctColumnsPath) { + // base cuboid job + HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable(); + buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY); + StringBuilder cmd = new StringBuilder(); + appendExecCmdParameters(cmd, "iiname", getIIName()); + appendExecCmdParameters(cmd, "segmentname", segment.getName()); + appendExecCmdParameters(cmd, "input", factDistinctColumnsPath); + + buildDictionaryStep.setJobParams(cmd.toString()); + buildDictionaryStep.setJobClass(CreateInvertedIndexDictionaryJob.class); + return buildDictionaryStep; + } + + private MapReduceExecutable createInvertedIndexStep(String intermediateHiveTableLocation, String iiOutputTempPath) { + // base cuboid job + MapReduceExecutable baseCuboidStep = new MapReduceExecutable(); + + StringBuilder cmd = new StringBuilder(); + appendMapReduceParameters(cmd, jobEngineConfig); + + baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID); + + appendExecCmdParameters(cmd, "iiname", getIIName()); + appendExecCmdParameters(cmd, "segmentname", segment.getName()); + appendExecCmdParameters(cmd, "input", intermediateHiveTableLocation); + appendExecCmdParameters(cmd, "output", iiOutputTempPath); + appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II); + + baseCuboidStep.setMapReduceParams(cmd.toString()); + baseCuboidStep.setMapReduceJobClass(InvertedIndexJob.class); + return baseCuboidStep; + } + + + private HadoopShellExecutable createCreateHTableStep() { + HadoopShellExecutable createHtableStep = new HadoopShellExecutable(); + createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE); + StringBuilder cmd = new StringBuilder(); + appendExecCmdParameters(cmd, "iiname", getIIName()); + appendExecCmdParameters(cmd, "htablename", getHTableName()); + + createHtableStep.setJobParams(cmd.toString()); + createHtableStep.setJobClass(IICreateHTableJob.class); + + return createHtableStep; + } + + private MapReduceExecutable createConvertToHfileStep(String inputPath, String jobId) { + MapReduceExecutable createHFilesStep = new MapReduceExecutable(); + createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_II_TO_HFILE); + StringBuilder cmd = new StringBuilder(); + + appendMapReduceParameters(cmd, jobEngineConfig); + appendExecCmdParameters(cmd, "iiname", getIIName()); + appendExecCmdParameters(cmd, "input", inputPath); + appendExecCmdParameters(cmd, "output", getHFilePath(jobId)); + appendExecCmdParameters(cmd, "htablename", getHTableName()); + appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + getIIName() + "_Step"); + + createHFilesStep.setMapReduceParams(cmd.toString()); + createHFilesStep.setMapReduceJobClass(IICreateHFileJob.class); + + return createHFilesStep; + } + + private HadoopShellExecutable createBulkLoadStep(String jobId) { + HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable(); + bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE); + + StringBuilder cmd = new StringBuilder(); + appendExecCmdParameters(cmd, "input", getHFilePath(jobId)); + appendExecCmdParameters(cmd, "htablename", getHTableName()); + appendExecCmdParameters(cmd, "cubename", getIIName()); + + bulkLoadStep.setJobParams(cmd.toString()); + bulkLoadStep.setJobClass(IIBulkLoadJob.class); + + return bulkLoadStep; + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java ---------------------------------------------------------------------- diff --git a/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java b/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java index 7e8120c..21954a6 100644 --- a/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java +++ b/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java @@ -253,7 +253,7 @@ public class BuildCubeWithEngineTest { private String buildSegment(String cubeName, long startDate, long endDate) throws Exception { CubeSegment segment = cubeManager.appendSegments(cubeManager.getCube(cubeName), startDate, endDate); - CubingJobBuilder cubingJobBuilder = CubingJobBuilder.newBuilder().setJobEnginConfig(jobEngineConfig).setSegment(segment); + CubingJobBuilder cubingJobBuilder = (CubingJobBuilder)CubingJobBuilder.newBuilder().setJobEnginConfig(jobEngineConfig).setSegment(segment); CubingJob job = cubingJobBuilder.buildJob(); jobService.addJob(job); waitForJob(job.getId()); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/job/src/test/java/com/kylinolap/job/BuildIIWithEngineTest.java ---------------------------------------------------------------------- diff --git a/job/src/test/java/com/kylinolap/job/BuildIIWithEngineTest.java b/job/src/test/java/com/kylinolap/job/BuildIIWithEngineTest.java new file mode 100644 index 0000000..4c1ceec --- /dev/null +++ b/job/src/test/java/com/kylinolap/job/BuildIIWithEngineTest.java @@ -0,0 +1,194 @@ +package com.kylinolap.job; + +import static org.junit.Assert.*; + +import java.io.File; +import java.lang.reflect.Method; +import java.text.SimpleDateFormat; +import java.util.List; +import java.util.TimeZone; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.kylinolap.common.KylinConfig; +import com.kylinolap.common.util.AbstractKylinTestCase; +import com.kylinolap.common.util.ClasspathUtil; +import com.kylinolap.common.util.HBaseMetadataTestCase; +import com.kylinolap.invertedindex.IIInstance; +import com.kylinolap.invertedindex.IIManager; +import com.kylinolap.invertedindex.IISegment; +import com.kylinolap.job.engine.JobEngineConfig; +import com.kylinolap.job.execution.ExecutableState; +import com.kylinolap.job.impl.threadpool.AbstractExecutable; +import com.kylinolap.job.impl.threadpool.DefaultScheduler; +import com.kylinolap.job.invertedindex.IIJob; +import com.kylinolap.job.invertedindex.IIJobBuilder; +import com.kylinolap.job.service.ExecutableManager; +import com.kylinolap.metadata.realization.RealizationStatusEnum; + +/** + * + * @author shaoshi + * + */ +public class BuildIIWithEngineTest { + + private JobEngineConfig jobEngineConfig; + + private IIManager iiManager; + + private DefaultScheduler scheduler; + + protected ExecutableManager jobService; + + protected static final String TEST_II_NAME = "test_kylin_ii"; + + protected void waitForJob(String jobId) { + while (true) { + AbstractExecutable job = jobService.getJob(jobId); + if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) { + break; + } else { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + @Before + public void before() throws Exception { + HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); + + DeployUtil.initCliWorkDir(); +// DeployUtil.deployMetadata(); + DeployUtil.overrideJobJarLocations(); + DeployUtil.overrideJobConf(HBaseMetadataTestCase.SANDBOX_TEST_DATA); + + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + jobService = ExecutableManager.getInstance(kylinConfig); + scheduler = DefaultScheduler.getInstance(); + scheduler.init(new JobEngineConfig(kylinConfig)); + if (!scheduler.hasStarted()) { + throw new RuntimeException("scheduler has not been started"); + } + iiManager = IIManager.getInstance(kylinConfig); + jobEngineConfig = new JobEngineConfig(kylinConfig); + for (String jobId: jobService.getAllJobIds()) { + jobService.deleteJob(jobId); + } + + IIInstance ii = iiManager.getII(TEST_II_NAME); + if(ii.getStatus() != RealizationStatusEnum.DISABLED) { + ii.setStatus(RealizationStatusEnum.DISABLED); + iiManager.updateII(ii); + } + + } + + + @Test + public void test() throws Exception { + testInner(); + } + + private void testInner() throws Exception { + DeployUtil.prepareTestData("inner", "test_kylin_cube_with_slr_empty"); + + + String[] testCase = new String[]{ + "testBuildII" + }; + ExecutorService executorService = Executors.newFixedThreadPool(testCase.length); + final CountDownLatch countDownLatch = new CountDownLatch(testCase.length); + List>> tasks = Lists.newArrayListWithExpectedSize(testCase.length); + for (int i = 0; i < testCase.length; i++) { + tasks.add(executorService.submit(new TestCallable(testCase[i], countDownLatch))); + } + countDownLatch.await(); + for (int i = 0; i < tasks.size(); ++i) { + Future> task = tasks.get(i); + final List jobIds = task.get(); + for (String jobId: jobIds) { + assertJobSucceed(jobId); + } + } + } + + + private void assertJobSucceed(String jobId) { + assertEquals(ExecutableState.SUCCEED, jobService.getOutput(jobId).getState()); + } + + private class TestCallable implements Callable> { + + private final String methodName; + private final CountDownLatch countDownLatch; + + public TestCallable(String methodName, CountDownLatch countDownLatch) { + this.methodName = methodName; + this.countDownLatch = countDownLatch; + } + + @Override + public List call() throws Exception { + try { + final Method method = BuildIIWithEngineTest.class.getDeclaredMethod(methodName); + method.setAccessible(true); + return (List) method.invoke(BuildIIWithEngineTest.this); + } finally { + countDownLatch.countDown(); + } + } + } + + + protected List testBuildII() throws Exception { + clearSegment(TEST_II_NAME); + + + SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); + f.setTimeZone(TimeZone.getTimeZone("GMT")); + + // this cube's start date is 0, end date is 20501112000000 + long date1 = 0; + long date2 = f.parse("2013-01-01").getTime(); + + + // this cube doesn't support incremental build, always do full build + + List result = Lists.newArrayList(); + result.add(buildSegment(TEST_II_NAME, date1, date2)); + return result; + } + + + + private void clearSegment(String iiName) throws Exception{ + IIInstance ii = iiManager.getII(iiName); + ii.getSegments().clear(); + iiManager.updateII(ii); + } + + + private String buildSegment(String iiName, long startDate, long endDate) throws Exception { + IISegment segment = iiManager.buildSegment(iiManager.getII(iiName), startDate, endDate); + IIJobBuilder iiJobBuilder = (IIJobBuilder)IIJobBuilder.newBuilder().setJobEnginConfig(jobEngineConfig).setSegment(segment); + IIJob job = iiJobBuilder.buildJob(); + jobService.addJob(job); + waitForJob(job.getId()); + return job.getId(); + } + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/job/src/test/java/com/kylinolap/job/hadoop/hive/JoinedFlatTableTest.java ---------------------------------------------------------------------- diff --git a/job/src/test/java/com/kylinolap/job/hadoop/hive/JoinedFlatTableTest.java b/job/src/test/java/com/kylinolap/job/hadoop/hive/JoinedFlatTableTest.java index 744f187..ff1715d 100644 --- a/job/src/test/java/com/kylinolap/job/hadoop/hive/JoinedFlatTableTest.java +++ b/job/src/test/java/com/kylinolap/job/hadoop/hive/JoinedFlatTableTest.java @@ -36,6 +36,7 @@ import com.kylinolap.job.engine.JobEngineConfig; * @author George Song (ysong1) * */ +@Ignore("This test case doesn't have much value, ignore it.") public class JoinedFlatTableTest extends LocalFileMetadataTestCase { CubeInstance cube = null; @@ -73,13 +74,10 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase { @Test public void testGenerateInsertSql() throws IOException { - String[] sqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, fakeJobUUID, new JobEngineConfig(KylinConfig.getInstanceFromEnv())); + String sqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, fakeJobUUID, new JobEngineConfig(KylinConfig.getInstanceFromEnv())); System.out.println(sqls); - int length = 0; - for(String sql : sqls) { - length += sql.length(); - } + int length = sqls.length(); assertEquals(1155, length); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/metadata/src/main/java/com/kylinolap/metadata/MetadataManager.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/com/kylinolap/metadata/MetadataManager.java b/metadata/src/main/java/com/kylinolap/metadata/MetadataManager.java index 6797328..7d17a7f 100644 --- a/metadata/src/main/java/com/kylinolap/metadata/MetadataManager.java +++ b/metadata/src/main/java/com/kylinolap/metadata/MetadataManager.java @@ -144,7 +144,13 @@ public class MetadataManager { * @return */ public TableDesc getTableDesc(String tableName) { - return srcTableMap.get(tableName.toUpperCase()); + TableDesc result = srcTableMap.get(tableName.toUpperCase()); + if(result == null) { + logger.info("No TableDesc found for table '" + tableName.toUpperCase() + "'"); + return null; + } + + return result; } /** http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index cca416d..2667957 100644 --- a/pom.xml +++ b/pom.xml @@ -44,7 +44,7 @@ 1.2.17 1.6.4 2.2.3 - 18.0 + 12.0 0.1.51 2.9.1 2.7.1 http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/server/.settings/.jsdtscope ---------------------------------------------------------------------- diff --git a/server/.settings/.jsdtscope b/server/.settings/.jsdtscope new file mode 100644 index 0000000..b72a6a4 --- /dev/null +++ b/server/.settings/.jsdtscope @@ -0,0 +1,13 @@ + + + + + + + + + + + + + http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/server/.settings/org.eclipse.wst.common.project.facet.core.prefs.xml ---------------------------------------------------------------------- diff --git a/server/.settings/org.eclipse.wst.common.project.facet.core.prefs.xml b/server/.settings/org.eclipse.wst.common.project.facet.core.prefs.xml new file mode 100644 index 0000000..cc81385 --- /dev/null +++ b/server/.settings/org.eclipse.wst.common.project.facet.core.prefs.xml @@ -0,0 +1,7 @@ + + + + + + + http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/server/.settings/org.eclipse.wst.jsdt.ui.superType.container ---------------------------------------------------------------------- diff --git a/server/.settings/org.eclipse.wst.jsdt.ui.superType.container b/server/.settings/org.eclipse.wst.jsdt.ui.superType.container new file mode 100644 index 0000000..3bd5d0a --- /dev/null +++ b/server/.settings/org.eclipse.wst.jsdt.ui.superType.container @@ -0,0 +1 @@ +org.eclipse.wst.jsdt.launching.baseBrowserLibrary \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/server/.settings/org.eclipse.wst.jsdt.ui.superType.name ---------------------------------------------------------------------- diff --git a/server/.settings/org.eclipse.wst.jsdt.ui.superType.name b/server/.settings/org.eclipse.wst.jsdt.ui.superType.name new file mode 100644 index 0000000..05bd71b --- /dev/null +++ b/server/.settings/org.eclipse.wst.jsdt.ui.superType.name @@ -0,0 +1 @@ +Window \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/08857b4f/server/src/main/java/com/kylinolap/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/com/kylinolap/rest/service/JobService.java b/server/src/main/java/com/kylinolap/rest/service/JobService.java index 5b4ddf0..8b8b550 100644 --- a/server/src/main/java/com/kylinolap/rest/service/JobService.java +++ b/server/src/main/java/com/kylinolap/rest/service/JobService.java @@ -134,7 +134,7 @@ public class JobService extends BasicService { try { CubingJob job; - CubingJobBuilder builder = CubingJobBuilder.newBuilder().setJobEnginConfig(new JobEngineConfig(getConfig())).setSubmitter(submitter); + CubingJobBuilder builder = (CubingJobBuilder)CubingJobBuilder.newBuilder().setJobEnginConfig(new JobEngineConfig(getConfig())).setSubmitter(submitter); if (buildType == CubeBuildTypeEnum.BUILD) { builder.setSegment(getCubeManager().appendSegments(cube, startDate, endDate)); job = builder.buildJob();