Return-Path: X-Original-To: apmail-kylin-commits-archive@minotaur.apache.org Delivered-To: apmail-kylin-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 95FEF17E1B for ; Sat, 19 Sep 2015 00:28:03 +0000 (UTC) Received: (qmail 87091 invoked by uid 500); 19 Sep 2015 00:28:03 -0000 Delivered-To: apmail-kylin-commits-archive@kylin.apache.org Received: (qmail 87060 invoked by uid 500); 19 Sep 2015 00:28:03 -0000 Mailing-List: contact commits-help@kylin.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kylin.incubator.apache.org Delivered-To: mailing list commits@kylin.incubator.apache.org Received: (qmail 87051 invoked by uid 99); 19 Sep 2015 00:28:03 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 19 Sep 2015 00:28:03 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id E1846F4226 for ; Sat, 19 Sep 2015 00:28:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id u5l1ZjcbDhXp for ; Sat, 19 Sep 2015 00:27:57 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 8147E23057 for ; Sat, 19 Sep 2015 00:27:51 +0000 (UTC) Received: (qmail 86773 invoked by uid 99); 19 Sep 2015 00:27:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 19 Sep 2015 00:27:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2194BE0984; Sat, 19 Sep 2015 00:27:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: liyang@apache.org To: commits@kylin.incubator.apache.org Date: Sat, 19 Sep 2015 00:27:56 -0000 Message-Id: <552b2ce3ddfd447f8245c6cb85adf302@git.apache.org> In-Reply-To: <2956bd58d1c6447d95de3ed89b5f498f@git.apache.org> References: <2956bd58d1c6447d95de3ed89b5f498f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/12] incubator-kylin git commit: KYLIN-1010 Decompose project job http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/engine-spark/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java b/engine-spark/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java deleted file mode 100644 index d24cc79..0000000 --- a/engine-spark/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.spark; - -import static org.junit.Assert.assertEquals; - -import java.io.File; -import java.text.SimpleDateFormat; -import java.util.List; -import java.util.TimeZone; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.AbstractKylinTestCase; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.job.DeployUtil; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.impl.threadpool.DefaultScheduler; -import org.apache.kylin.job.lock.MockJobLock; -import org.apache.kylin.job.manager.ExecutableManager; -import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import com.google.common.collect.Lists; - -public class BuildCubeWithSparkTest { - - private CubeManager cubeManager; - private DefaultScheduler scheduler; - protected ExecutableManager jobService; - - private static final Log logger = LogFactory.getLog(BuildCubeWithSparkTest.class); - - protected void waitForJob(String jobId) { - while (true) { - AbstractExecutable job = jobService.getJob(jobId); - if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) { - break; - } else { - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - } - - @BeforeClass - public static void beforeClass() throws Exception { - logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox"); - System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this - } - - @Before - public void before() throws Exception { - HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); - - DeployUtil.initCliWorkDir(); - DeployUtil.deployMetadata(); - DeployUtil.overrideJobJarLocations(); - - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - jobService = ExecutableManager.getInstance(kylinConfig); - for (String jobId : jobService.getAllJobIds()) { - jobService.deleteJob(jobId); - } - scheduler = DefaultScheduler.getInstance(); - scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock()); - if (!scheduler.hasStarted()) { - throw new RuntimeException("scheduler has not been started"); - } - cubeManager = CubeManager.getInstance(kylinConfig); - - } - - @After - public void after() { - HBaseMetadataTestCase.staticCleanupTestMetadata(); - } - - @Test - public void test() throws Exception { - final CubeSegment segment = createSegment(); - String confPath = new File(AbstractKylinTestCase.SANDBOX_TEST_DATA).getAbsolutePath(); - KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar(); - String coprocessor = KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar(); - logger.info("confPath location:" + confPath); - logger.info("coprocessor location:" + coprocessor); - final DefaultChainedExecutable cubingJob = new SparkBatchCubingEngine(confPath, coprocessor).createBatchCubingJob(segment, "BuildCubeWithSpark"); - jobService.addJob(cubingJob); - waitForJob(cubingJob.getId()); - assertEquals(ExecutableState.SUCCEED, jobService.getOutput(cubingJob.getId()).getState()); - } - - private void clearSegment(String cubeName) throws Exception { - CubeInstance cube = cubeManager.getCube(cubeName); - // remove all existing segments - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()])); - cubeManager.updateCube(cubeBuilder); - } - - private CubeSegment createSegment() throws Exception { - String cubeName = "test_kylin_cube_with_slr_left_join_empty"; - clearSegment(cubeName); - - SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); - f.setTimeZone(TimeZone.getTimeZone("GMT")); - long dateStart = cubeManager.getCube(cubeName).getDescriptor().getModel().getPartitionDesc().getPartitionDateStart(); - long dateEnd = f.parse("2050-11-12").getTime(); - - // this cube's start date is 0, end date is 20501112000000 - List result = Lists.newArrayList(); - return cubeManager.appendSegments(cubeManager.getCube(cubeName), dateEnd); - - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/engine-streaming/pom.xml ---------------------------------------------------------------------- diff --git a/engine-streaming/pom.xml b/engine-streaming/pom.xml index 46b63b3..955124c 100644 --- a/engine-streaming/pom.xml +++ b/engine-streaming/pom.xml @@ -26,11 +26,6 @@ org.apache.kylin - kylin-invertedindex - ${project.parent.version} - - - org.apache.kylin kylin-core-storage ${project.parent.version} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/invertedindex/pom.xml ---------------------------------------------------------------------- diff --git a/invertedindex/pom.xml b/invertedindex/pom.xml index 4d1796f..9e8f92e 100644 --- a/invertedindex/pom.xml +++ b/invertedindex/pom.xml @@ -33,12 +33,12 @@ org.apache.kylin - kylin-core-metadata + kylin-engine-streaming ${project.parent.version} org.apache.kylin - kylin-core-dictionary + kylin-source-hive ${project.parent.version} @@ -56,43 +56,23 @@ ${project.parent.version} - org.apache.hadoop - hadoop-common + org.apache.hive.hcatalog + hive-hcatalog-core + ${hive-hcatalog.version} provided org.apache.hadoop - hadoop-annotations - provided - - - org.apache.hadoop - hadoop-mapreduce-client-core + hadoop-mapreduce-client-jobclient provided - org.apache.hadoop - hadoop-minicluster - test - - - org.apache.mrunit - mrunit - hadoop2 - test - - org.apache.hbase hbase-hadoop2-compat provided org.apache.hbase - hbase-common - provided - - - org.apache.hbase hbase-client provided @@ -100,18 +80,12 @@ org.apache.hbase hbase-server provided - - - - org.apache.hadoop - hadoop-mapreduce-client-jobclient - - - org.apache.hadoop - hadoop-mapreduce-client-jobclient - provided + org.apache.mrunit + mrunit + hadoop2 + test junit http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java new file mode 100644 index 0000000..87ee70e --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.hadoop.dict; + +import org.apache.commons.cli.Options; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.dict.DistinctColumnValuesProvider; +import org.apache.kylin.engine.mr.DFSFileTable; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.invertedindex.IIInstance; +import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.source.ReadableTable; + +/** + */ +public class CreateInvertedIndexDictionaryJob extends AbstractHadoopJob { + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + + try { + options.addOption(OPTION_II_NAME); + options.addOption(OPTION_INPUT_PATH); + parseOptions(options, args); + + final String iiname = getOptionValue(OPTION_II_NAME); + final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH); + final KylinConfig config = KylinConfig.getInstanceFromEnv(); + + IIManager mgr = IIManager.getInstance(config); + IIInstance ii = mgr.getII(iiname); + + mgr.buildInvertedIndexDictionary(ii.getFirstSegment(), new DistinctColumnValuesProvider() { + @Override + public ReadableTable getDistinctValuesFor(TblColRef col) { + return new DFSFileTable(factColumnsInputPath + "/" + col.getName(), -1); + } + }); + return 0; + } catch (Exception e) { + printUsage(options); + throw e; + } + } + + public static void main(String[] args) throws Exception { + int exitCode = ToolRunner.run(new CreateInvertedIndexDictionaryJob(), args); + System.exit(exitCode); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java new file mode 100644 index 0000000..300c89b --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.hadoop.invertedindex; + +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.invertedindex.IIInstance; +import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.invertedindex.IISegment; +import org.apache.kylin.invertedindex.model.IIDesc; +import org.apache.kylin.metadata.model.SegmentStatusEnum; + +/** + */ +public class IIBulkLoadJob extends AbstractHadoopJob { + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + + try { + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_HTABLE_NAME); + options.addOption(OPTION_II_NAME); + parseOptions(options, args); + + String tableName = getOptionValue(OPTION_HTABLE_NAME); + String input = getOptionValue(OPTION_INPUT_PATH); + String iiname = getOptionValue(OPTION_II_NAME); + + FileSystem fs = FileSystem.get(getConf()); + FsPermission permission = new FsPermission((short) 0777); + fs.setPermission(new Path(input, IIDesc.HBASE_FAMILY), permission); + + int hbaseExitCode = ToolRunner.run(new LoadIncrementalHFiles(getConf()), new String[] { input, tableName }); + + IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv()); + IIInstance ii = mgr.getII(iiname); + IISegment seg = ii.getFirstSegment(); + seg.setStorageLocationIdentifier(tableName); + seg.setStatus(SegmentStatusEnum.READY); + mgr.updateII(ii); + + return hbaseExitCode; + + } catch (Exception e) { + printUsage(options); + throw e; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java new file mode 100644 index 0000000..528f06f --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.hadoop.invertedindex; + +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author yangli9 + * + */ +public class IICreateHFileJob extends AbstractHadoopJob { + + protected static final Logger logger = LoggerFactory.getLogger(IICreateHFileJob.class); + + public int run(String[] args) throws Exception { + Options options = new Options(); + + try { + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_II_NAME); + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_HTABLE_NAME); + parseOptions(options, args); + + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + + setJobClasspath(job); + + addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); + FileOutputFormat.setOutputPath(job, output); + + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setMapperClass(IICreateHFileMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + + String tableName = getOptionValue(OPTION_HTABLE_NAME); + HTable htable = new HTable(HBaseConfiguration.create(getConf()), tableName); + HFileOutputFormat.configureIncrementalLoad(job, htable); + + this.deletePath(job.getConfiguration(), output); + + return waitForCompletion(job); + } catch (Exception e) { + printUsage(options); + throw e; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileMapper.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileMapper.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileMapper.java new file mode 100644 index 0000000..1adf8d6 --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileMapper.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.hadoop.invertedindex; + +import java.io.IOException; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.kylin.engine.mr.KylinMapper; +import org.apache.kylin.invertedindex.model.IIDesc; + +/** + * @author yangli9 + */ +public class IICreateHFileMapper extends KylinMapper { + + long timestamp; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.bindCurrentConfiguration(context.getConfiguration()); + + timestamp = System.currentTimeMillis(); + } + + @Override + protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value, Context context) throws IOException, InterruptedException { + + KeyValue kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), // + IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, // + IIDesc.HBASE_QUALIFIER_BYTES, 0, IIDesc.HBASE_QUALIFIER_BYTES.length, // + timestamp, Type.Put, // + value.get(), value.getOffset(), value.getLength()); + + context.write(key, kv); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java new file mode 100644 index 0000000..0b7cb7a --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.hadoop.invertedindex; + +import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; +import org.apache.hadoop.hbase.security.User; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.invertedindex.IIInstance; +import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.invertedindex.model.IIDesc; +import org.apache.kylin.invertedindex.model.IIKeyValueCodec; +import org.apache.kylin.metadata.realization.IRealizationConstants; + +/** + * @author George Song (ysong1) + */ +public class IICreateHTableJob extends AbstractHadoopJob { + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + + try { + options.addOption(OPTION_II_NAME); + options.addOption(OPTION_HTABLE_NAME); + parseOptions(options, args); + + String tableName = getOptionValue(OPTION_HTABLE_NAME); + String iiName = getOptionValue(OPTION_II_NAME); + + KylinConfig config = KylinConfig.getInstanceFromEnv(); + IIManager iiManager = IIManager.getInstance(config); + IIInstance ii = iiManager.getII(iiName); + int sharding = ii.getDescriptor().getSharding(); + + HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName)); + HColumnDescriptor cf = new HColumnDescriptor(IIDesc.HBASE_FAMILY); + cf.setMaxVersions(1); + + String hbaseDefaultCC = config.getHbaseDefaultCompressionCodec().toLowerCase(); + + switch (hbaseDefaultCC) { + case "snappy": { + logger.info("hbase will use snappy to compress data"); + cf.setCompressionType(Compression.Algorithm.SNAPPY); + break; + } + case "lzo": { + logger.info("hbase will use lzo to compress data"); + cf.setCompressionType(Compression.Algorithm.LZO); + break; + } + case "gz": + case "gzip": { + logger.info("hbase will use gzip to compress data"); + cf.setCompressionType(Compression.Algorithm.GZ); + break; + } + case "lz4": { + logger.info("hbase will use lz4 to compress data"); + cf.setCompressionType(Compression.Algorithm.LZ4); + break; + } + default: { + logger.info("hbase will not user any compression codec to compress data"); + } + } + + cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); + tableDesc.addFamily(cf); + tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix()); + tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis())); + tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName()); + + Configuration conf = HBaseConfiguration.create(getConf()); + if (User.isHBaseSecurityEnabled(conf)) { + // add coprocessor for bulk load + tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); + } + + IIDeployCoprocessorCLI.deployCoprocessor(tableDesc); + + // drop the table first + HBaseAdmin admin = new HBaseAdmin(conf); + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + + // create table + byte[][] splitKeys = getSplits(sharding); + if (splitKeys.length == 0) + splitKeys = null; + admin.createTable(tableDesc, splitKeys); + if (splitKeys != null) { + for (int i = 0; i < splitKeys.length; i++) { + System.out.println("split key " + i + ": " + BytesUtil.toHex(splitKeys[i])); + } + } + System.out.println("create hbase table " + tableName + " done."); + admin.close(); + + return 0; + } catch (Exception e) { + printUsage(options); + throw e; + } + } + + //one region for one shard + private byte[][] getSplits(int shard) { + byte[][] result = new byte[shard - 1][]; + for (int i = 1; i < shard; ++i) { + byte[] split = new byte[IIKeyValueCodec.SHARD_LEN]; + BytesUtil.writeUnsigned(i, split, 0, IIKeyValueCodec.SHARD_LEN); + result[i - 1] = split; + } + return result; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDeployCoprocessorCLI.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDeployCoprocessorCLI.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDeployCoprocessorCLI.java new file mode 100644 index 0000000..a4c1961 --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDeployCoprocessorCLI.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.hadoop.invertedindex; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * THIS IS A TAILORED DUPLICATE OF org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI TO AVOID CYCLIC + * DEPENDENCY. INVERTED-INDEX CODE NOW SPLITTED ACROSS kylin-invertedindex AND kylin-storage-hbase. + * DEFENITELY NEED FURTHER REFACTOR. + */ +public class IIDeployCoprocessorCLI { + + private static final Logger logger = LoggerFactory.getLogger(IIDeployCoprocessorCLI.class); + + public static final String CubeObserverClass = "org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.AggregateRegionObserver"; + public static final String CubeEndpointClass = "org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.CubeVisitService"; + public static final String IIEndpointClass = "org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint"; + + public static void deployCoprocessor(HTableDescriptor tableDesc) { + try { + initHTableCoprocessor(tableDesc); + logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor."); + + } catch (Exception ex) { + logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex); + logger.error("Will try creating the table without coprocessor."); + } + } + + private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException { + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + Configuration hconf = HadoopUtil.getCurrentConfiguration(); + FileSystem fileSystem = FileSystem.get(hconf); + + String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar(); + Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, null); + + addCoprocessorOnHTable(desc, hdfsCoprocessorJar); + } + + private static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException { + logger.info("Add coprocessor on " + desc.getNameAsString()); + desc.addCoprocessor(IIEndpointClass, hdfsCoprocessorJar, 1000, null); + desc.addCoprocessor(CubeEndpointClass, hdfsCoprocessorJar, 1001, null); + desc.addCoprocessor(CubeObserverClass, hdfsCoprocessorJar, 1002, null); + } + + private static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set oldJarPaths) throws IOException { + Path uploadPath = null; + File localCoprocessorFile = new File(localCoprocessorJar); + + // check existing jars + if (oldJarPaths == null) { + oldJarPaths = new HashSet(); + } + Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv()); + for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) { + if (isSame(localCoprocessorFile, fileStatus)) { + uploadPath = fileStatus.getPath(); + break; + } + String filename = fileStatus.getPath().toString(); + if (filename.endsWith(".jar")) { + oldJarPaths.add(filename); + } + } + + // upload if not existing + if (uploadPath == null) { + // figure out a unique new jar file name + Set oldJarNames = new HashSet(); + for (String path : oldJarPaths) { + oldJarNames.add(new Path(path).getName()); + } + String baseName = getBaseFileName(localCoprocessorJar); + String newName = null; + int i = 0; + while (newName == null) { + newName = baseName + "-" + (i++) + ".jar"; + if (oldJarNames.contains(newName)) + newName = null; + } + + // upload + uploadPath = new Path(coprocessorDir, newName); + FileInputStream in = null; + FSDataOutputStream out = null; + try { + in = new FileInputStream(localCoprocessorFile); + out = fileSystem.create(uploadPath); + IOUtils.copy(in, out); + } finally { + IOUtils.closeQuietly(in); + IOUtils.closeQuietly(out); + } + + fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1); + + } + + uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null); + return uploadPath; + } + + private static boolean isSame(File localCoprocessorFile, FileStatus fileStatus) { + return fileStatus.getLen() == localCoprocessorFile.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified(); + } + + private static String getBaseFileName(String localCoprocessorJar) { + File localJar = new File(localCoprocessorJar); + String baseName = localJar.getName(); + if (baseName.endsWith(".jar")) + baseName = baseName.substring(0, baseName.length() - ".jar".length()); + return baseName; + } + + private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException { + String hdfsWorkingDirectory = config.getHdfsWorkingDirectory(); + Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor"); + fileSystem.mkdirs(coprocessorDir); + return coprocessorDir; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java new file mode 100644 index 0000000..1f4611b --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.hadoop.invertedindex; + +import java.io.IOException; +import java.util.HashSet; + +import org.apache.hadoop.io.ShortWritable; +import org.apache.hadoop.io.Text; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.engine.mr.KylinReducer; + +/** + * @author yangli9 + */ +public class IIDistinctColumnsCombiner extends KylinReducer { + + private Text outputValue = new Text(); + + @Override + protected void setup(Context context) throws IOException { + super.bindCurrentConfiguration(context.getConfiguration()); + + } + + @Override + public void reduce(ShortWritable key, Iterable values, Context context) throws IOException, InterruptedException { + + HashSet set = new HashSet(); + for (Text textValue : values) { + ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength())); + set.add(value); + } + + for (ByteArray value : set) { + outputValue.set(value.array(), value.offset(), value.length()); + context.write(key, outputValue); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java new file mode 100644 index 0000000..042678e --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.hadoop.invertedindex; + +import java.io.IOException; + +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.ShortWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.invertedindex.IIInstance; +import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.IntermediateColumnDesc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author yangli9 + */ +public class IIDistinctColumnsJob extends AbstractHadoopJob { + protected static final Logger logger = LoggerFactory.getLogger(IIDistinctColumnsJob.class); + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + + try { + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_TABLE_NAME); + options.addOption(OPTION_II_NAME); + options.addOption(OPTION_OUTPUT_PATH); + parseOptions(options, args); + + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + String tableName = getOptionValue(OPTION_TABLE_NAME).toUpperCase(); + String iiName = getOptionValue(OPTION_II_NAME); + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + + // ---------------------------------------------------------------------------- + + logger.info("Starting: " + job.getJobName() + " on table " + tableName); + + IIManager iiMgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv()); + IIInstance ii = iiMgr.getII(iiName); + job.getConfiguration().set(BatchConstants.TABLE_NAME, tableName); + job.getConfiguration().set(BatchConstants.TABLE_COLUMNS, getColumns(ii)); + + setJobClasspath(job); + + setupMapper(); + setupReducer(output); + + return waitForCompletion(job); + + } catch (Exception e) { + printUsage(options); + throw e; + } + + } + + private String getColumns(IIInstance ii) { + IIJoinedFlatTableDesc iiflat = new IIJoinedFlatTableDesc(ii.getDescriptor()); + StringBuilder buf = new StringBuilder(); + for (IntermediateColumnDesc col : iiflat.getColumnList()) { + if (buf.length() > 0) + buf.append(","); + buf.append(col.getColumnName()); + } + return buf.toString(); + } + + private void setupMapper() throws IOException { + + String tableName = job.getConfiguration().get(BatchConstants.TABLE_NAME); + String[] dbTableNames = HadoopUtil.parseHiveTableName(tableName); + + logger.info("setting hcat input format, db name {} , table name {}", dbTableNames[0], dbTableNames[1]); + + HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]); + + job.setInputFormatClass(HCatInputFormat.class); + + job.setMapperClass(IIDistinctColumnsMapper.class); + job.setCombinerClass(IIDistinctColumnsCombiner.class); + job.setMapOutputKeyClass(ShortWritable.class); + job.setMapOutputValueClass(Text.class); + } + + private void setupReducer(Path output) throws IOException { + job.setReducerClass(IIDistinctColumnsReducer.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(Text.class); + + FileOutputFormat.setOutputPath(job, output); + job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString()); + + job.setNumReduceTasks(1); + + deletePath(job.getConfiguration(), output); + } + + public static void main(String[] args) throws Exception { + IIDistinctColumnsJob job = new IIDistinctColumnsJob(); + int exitCode = ToolRunner.run(job, args); + System.exit(exitCode); + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsMapper.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsMapper.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsMapper.java new file mode 100644 index 0000000..3418a57 --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsMapper.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.hadoop.invertedindex; + +import java.io.IOException; + +import org.apache.hadoop.io.ShortWritable; +import org.apache.hadoop.io.Text; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.engine.mr.KylinMapper; + +/** + * @author yangli9 + */ +public class IIDistinctColumnsMapper extends KylinMapper { + + private ShortWritable outputKey = new ShortWritable(); + private Text outputValue = new Text(); + private HCatSchema schema = null; + private int columnSize = 0; + + @Override + protected void setup(Context context) throws IOException { + super.bindCurrentConfiguration(context.getConfiguration()); + schema = HCatInputFormat.getTableSchema(context.getConfiguration()); + columnSize = schema.getFields().size(); + } + + @Override + public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException { + + HCatFieldSchema fieldSchema = null; + for (short i = 0; i < columnSize; i++) { + outputKey.set(i); + fieldSchema = schema.get(i); + Object fieldValue = record.get(fieldSchema.getName(), schema); + if (fieldValue == null) + continue; + byte[] bytes = Bytes.toBytes(fieldValue.toString()); + outputValue.set(bytes, 0, bytes.length); + context.write(outputKey, outputValue); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java new file mode 100644 index 0000000..fcb4dd5 --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.hadoop.invertedindex; + +import java.io.IOException; +import java.util.HashSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.ShortWritable; +import org.apache.hadoop.io.Text; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.engine.mr.KylinReducer; +import org.apache.kylin.engine.mr.common.BatchConstants; + +/** + * @author yangli9 + */ +public class IIDistinctColumnsReducer extends KylinReducer { + + private String[] columns; + + @Override + protected void setup(Context context) throws IOException { + super.bindCurrentConfiguration(context.getConfiguration()); + + Configuration conf = context.getConfiguration(); + this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(","); + } + + @Override + public void reduce(ShortWritable key, Iterable values, Context context) throws IOException, InterruptedException { + String columnName = columns[key.get()]; + + HashSet set = new HashSet(); + for (Text textValue : values) { + ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength())); + set.add(value); + } + + Configuration conf = context.getConfiguration(); + FileSystem fs = FileSystem.get(conf); + String outputPath = conf.get(BatchConstants.OUTPUT_PATH); + FSDataOutputStream out = fs.create(new Path(outputPath, columnName)); + + try { + for (ByteArray value : set) { + out.write(value.array(), value.offset(), value.length()); + out.write('\n'); + } + } finally { + out.close(); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java new file mode 100644 index 0000000..c9ad448 --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.hadoop.invertedindex; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.invertedindex.IIInstance; +import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.invertedindex.IISegment; +import org.apache.kylin.metadata.MetadataManager; +import org.apache.kylin.metadata.model.TableDesc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author yangli9 + */ +public class InvertedIndexJob extends AbstractHadoopJob { + protected static final Logger logger = LoggerFactory.getLogger(InvertedIndexJob.class); + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + + try { + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_II_NAME); + options.addOption(OPTION_TABLE_NAME); + options.addOption(OPTION_OUTPUT_PATH); + parseOptions(options, args); + + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + String iiname = getOptionValue(OPTION_II_NAME); + String intermediateTable = getOptionValue(OPTION_TABLE_NAME); + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + + // ---------------------------------------------------------------------------- + + System.out.println("Starting: " + job.getJobName()); + + IIInstance ii = getII(iiname); + short sharding = ii.getDescriptor().getSharding(); + + setJobClasspath(job); + + setupMapper(intermediateTable); + setupReducer(output, sharding); + attachMetadata(ii); + + return waitForCompletion(job); + + } catch (Exception e) { + printUsage(options); + throw e; + } finally { + if (job != null) + cleanupTempConfFile(job.getConfiguration()); + } + + } + + private IIInstance getII(String iiName) { + IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv()); + IIInstance ii = mgr.getII(iiName); + if (ii == null) + throw new IllegalArgumentException("No Inverted Index found by name " + iiName); + return ii; + } + + private void attachMetadata(IIInstance ii) throws IOException { + + Configuration conf = job.getConfiguration(); + attachKylinPropsAndMetadata(ii, conf); + + IISegment seg = ii.getFirstSegment(); + conf.set(BatchConstants.CFG_II_NAME, ii.getName()); + conf.set(BatchConstants.CFG_II_SEGMENT_NAME, seg.getName()); + } + + protected void attachKylinPropsAndMetadata(IIInstance ii, Configuration conf) throws IOException { + MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); + + // write II / model_desc / II_desc / dict / table + ArrayList dumpList = new ArrayList(); + dumpList.add(ii.getResourcePath()); + dumpList.add(ii.getDescriptor().getModel().getResourcePath()); + dumpList.add(ii.getDescriptor().getResourcePath()); + + for (String tableName : ii.getDescriptor().getModel().getAllTables()) { + TableDesc table = metaMgr.getTableDesc(tableName); + dumpList.add(table.getResourcePath()); + } + for (IISegment segment : ii.getSegments()) { + dumpList.addAll(segment.getDictionaryPaths()); + } + + attachKylinPropsAndMetadata(dumpList, conf); + } + + private void setupMapper(String intermediateTable) throws IOException { + + String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable); + HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]); + + job.setInputFormatClass(HCatInputFormat.class); + + job.setMapperClass(InvertedIndexMapper.class); + job.setMapOutputKeyClass(LongWritable.class); + job.setMapOutputValueClass(ImmutableBytesWritable.class); + job.setPartitionerClass(InvertedIndexPartitioner.class); + } + + private void setupReducer(Path output, short sharding) throws IOException { + job.setReducerClass(InvertedIndexReducer.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(ImmutableBytesWritable.class); + + job.setNumReduceTasks(sharding); + + FileOutputFormat.setOutputPath(job, output); + + job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString()); + + deletePath(job.getConfiguration(), output); + } + + public static void main(String[] args) throws Exception { + InvertedIndexJob job = new InvertedIndexJob(); + int exitCode = ToolRunner.run(job, args); + System.exit(exitCode); + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java new file mode 100644 index 0000000..bc43b65 --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.hadoop.invertedindex; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.engine.mr.KylinMapper; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.invertedindex.IIInstance; +import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.invertedindex.IISegment; +import org.apache.kylin.invertedindex.index.TableRecord; +import org.apache.kylin.invertedindex.index.TableRecordInfo; +import org.apache.kylin.metadata.model.SegmentStatusEnum; + +/** + * @author yangli9 + */ +public class InvertedIndexMapper extends KylinMapper { + + private TableRecordInfo info; + private TableRecord rec; + + private LongWritable outputKey; + private ImmutableBytesWritable outputValue; + private HCatSchema schema = null; + private List fields; + + @Override + protected void setup(Context context) throws IOException { + super.bindCurrentConfiguration(context.getConfiguration()); + + Configuration conf = context.getConfiguration(); + + KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + IIManager mgr = IIManager.getInstance(config); + IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME)); + IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW); + this.info = new TableRecordInfo(seg); + this.rec = this.info.createTableRecord(); + + outputKey = new LongWritable(); + outputValue = new ImmutableBytesWritable(rec.getBytes()); + + schema = HCatInputFormat.getTableSchema(context.getConfiguration()); + + fields = schema.getFields(); + } + + @Override + public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException { + + rec.reset(); + for (int i = 0; i < fields.size(); i++) { + Object fieldValue = record.get(i); + rec.setValueString(i, fieldValue == null ? null : fieldValue.toString()); + } + + outputKey.set(rec.getTimestamp()); + // outputValue's backing bytes array is the same as rec + + context.write(outputKey, outputValue); + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java new file mode 100644 index 0000000..396c221 --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.hadoop.invertedindex; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.invertedindex.IIInstance; +import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.invertedindex.IISegment; +import org.apache.kylin.invertedindex.index.TableRecord; +import org.apache.kylin.invertedindex.index.TableRecordInfo; +import org.apache.kylin.metadata.model.SegmentStatusEnum; + +/** + * @author yangli9 + */ +public class InvertedIndexPartitioner extends Partitioner implements Configurable { + + private Configuration conf; + private TableRecordInfo info; + private TableRecord rec; + + @Override + public int getPartition(LongWritable key, ImmutableBytesWritable value, int numPartitions) { + rec.setBytes(value.get(), value.getOffset(), value.getLength()); + return rec.getShard(); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + try { + KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + IIManager mgr = IIManager.getInstance(config); + IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME)); + IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW); + this.info = new TableRecordInfo(seg); + this.rec = this.info.createTableRecord(); + } catch (IOException e) { + throw new RuntimeException("", e); + } + } + + @Override + public Configuration getConf() { + return conf; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java new file mode 100644 index 0000000..5a69eec --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.hadoop.invertedindex; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.engine.mr.KylinReducer; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.invertedindex.IIInstance; +import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.invertedindex.IISegment; +import org.apache.kylin.invertedindex.index.IncrementalSliceMaker; +import org.apache.kylin.invertedindex.index.Slice; +import org.apache.kylin.invertedindex.index.TableRecord; +import org.apache.kylin.invertedindex.index.TableRecordInfo; +import org.apache.kylin.invertedindex.model.IIKeyValueCodec; +import org.apache.kylin.invertedindex.model.IIRow; +import org.apache.kylin.metadata.model.SegmentStatusEnum; + +/** + * @author yangli9 + */ +public class InvertedIndexReducer extends KylinReducer { + + private TableRecordInfo info; + private TableRecord rec; + private IncrementalSliceMaker builder; + private IIKeyValueCodec kv; + + @Override + protected void setup(Context context) throws IOException { + super.bindCurrentConfiguration(context.getConfiguration()); + + Configuration conf = context.getConfiguration(); + KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + IIManager mgr = IIManager.getInstance(config); + IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME)); + IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW); + info = new TableRecordInfo(seg); + rec = info.createTableRecord(); + builder = null; + kv = new IIKeyValueCodec(info.getDigest()); + } + + @Override + public void reduce(LongWritable key, Iterable values, Context context) // + throws IOException, InterruptedException { + for (ImmutableBytesWritable v : values) { + rec.setBytes(v.get(), v.getOffset(), v.getLength()); + + if (builder == null) { + builder = new IncrementalSliceMaker(info, rec.getShard()); + } + + //TODO: to delete this log + System.out.println(rec.getShard() + " - " + rec); + + Slice slice = builder.append(rec); + if (slice != null) { + output(slice, context); + } + } + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + Slice slice = builder.close(); + if (slice != null) { + output(slice, context); + } + } + + private void output(Slice slice, Context context) throws IOException, InterruptedException { + for (IIRow pair : kv.encodeKeyValue(slice)) { + context.write(pair.getKey(), pair.getValue()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java b/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java new file mode 100644 index 0000000..0af846b --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.invertedindex; + +import org.apache.kylin.job.execution.DefaultChainedExecutable; + +/** + */ +public class IIJob extends DefaultChainedExecutable { + + public IIJob() { + super(); + } + + private static final String II_INSTANCE_NAME = "iiName"; + private static final String SEGMENT_ID = "segmentId"; + + void setIIName(String name) { + setParam(II_INSTANCE_NAME, name); + } + + public String getIIName() { + return getParam(II_INSTANCE_NAME); + } + + void setSegmentId(String segmentId) { + setParam(SEGMENT_ID, segmentId); + } + + public String getSegmentId() { + return getParam(SEGMENT_ID); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c44caa7b/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java b/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java new file mode 100644 index 0000000..4bd06c5 --- /dev/null +++ b/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.invertedindex; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.TimeZone; + +import org.apache.kylin.engine.mr.common.HadoopShellExecutable; +import org.apache.kylin.engine.mr.common.MapReduceExecutable; +import org.apache.kylin.invertedindex.IISegment; +import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.hadoop.dict.CreateInvertedIndexDictionaryJob; +import org.apache.kylin.job.hadoop.invertedindex.IIBulkLoadJob; +import org.apache.kylin.job.hadoop.invertedindex.IICreateHFileJob; +import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob; +import org.apache.kylin.job.hadoop.invertedindex.IIDistinctColumnsJob; +import org.apache.kylin.job.hadoop.invertedindex.InvertedIndexJob; +import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity; +import org.apache.kylin.source.hive.HiveMRInput.BatchCubingInputSide; + +import com.google.common.base.Preconditions; + +/** + */ +public final class IIJobBuilder { + + final JobEngineConfig engineConfig; + + public IIJobBuilder(JobEngineConfig engineConfig) { + this.engineConfig = engineConfig; + } + + public IIJob buildJob(IISegment seg, String submitter) { + checkPreconditions(seg); + + IIJob result = initialJob(seg, "BUILD", submitter); + final String jobId = result.getId(); + final IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(seg.getIIDesc()); + final String intermediateTableIdentity = getIntermediateTableIdentity(intermediateTableDesc); + final String factDistinctColumnsPath = getIIDistinctColumnsPath(seg, jobId); + final String iiRootPath = getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/"; + final String iiPath = iiRootPath + "*"; + + final AbstractExecutable intermediateHiveTableStep = createFlatHiveTableStep(intermediateTableDesc, jobId); + result.addTask(intermediateHiveTableStep); + + result.addTask(createFactDistinctColumnsStep(seg, intermediateTableIdentity, jobId, factDistinctColumnsPath)); + + result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath)); + + result.addTask(createInvertedIndexStep(seg, intermediateTableIdentity, iiRootPath)); + + // create htable step + result.addTask(createCreateHTableStep(seg)); + + // generate hfiles step + result.addTask(createConvertToHfileStep(seg, iiPath, jobId)); + + // bulk load step + result.addTask(createBulkLoadStep(seg, jobId)); + + return result; + } + + private AbstractExecutable createFlatHiveTableStep(IIJoinedFlatTableDesc intermediateTableDesc, String jobId) { + return BatchCubingInputSide.createFlatHiveTableStep(engineConfig, intermediateTableDesc, jobId); + } + + private IIJob initialJob(IISegment seg, String type, String submitter) { + IIJob result = new IIJob(); + SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss"); + format.setTimeZone(TimeZone.getTimeZone(engineConfig.getTimeZone())); + result.setIIName(seg.getIIInstance().getName()); + result.setSegmentId(seg.getUuid()); + result.setName(seg.getIIInstance().getName() + " - " + seg.getName() + " - " + type + " - " + format.format(new Date(System.currentTimeMillis()))); + result.setSubmitter(submitter); + return result; + } + + private void checkPreconditions(IISegment seg) { + Preconditions.checkNotNull(seg, "segment cannot be null"); + Preconditions.checkNotNull(engineConfig, "jobEngineConfig cannot be null"); + } + + private void appendMapReduceParameters(StringBuilder builder, JobEngineConfig engineConfig) { + try { + String jobConf = engineConfig.getHadoopJobConfFilePath(RealizationCapacity.MEDIUM); + if (jobConf != null && jobConf.length() > 0) { + builder.append(" -conf ").append(jobConf); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private String getIIDistinctColumnsPath(IISegment seg, String jobUuid) { + return getJobWorkingDir(jobUuid) + "/" + seg.getIIInstance().getName() + "/ii_distinct_columns"; + } + + private String getHFilePath(IISegment seg, String jobId) { + return getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/hfile/"; + } + + private MapReduceExecutable createFactDistinctColumnsStep(IISegment seg, String factTableName, String jobId, String output) { + MapReduceExecutable result = new MapReduceExecutable(); + result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS); + result.setMapReduceJobClass(IIDistinctColumnsJob.class); + StringBuilder cmd = new StringBuilder(); + appendMapReduceParameters(cmd, engineConfig); + appendExecCmdParameters(cmd, "tablename", factTableName); + appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName()); + appendExecCmdParameters(cmd, "output", output); + appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getIIInstance().getName() + "_Step"); + + result.setMapReduceParams(cmd.toString()); + return result; + } + + private HadoopShellExecutable createBuildDictionaryStep(IISegment seg, String factDistinctColumnsPath) { + // base cuboid job + HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable(); + buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY); + StringBuilder cmd = new StringBuilder(); + appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName()); + appendExecCmdParameters(cmd, "input", factDistinctColumnsPath); + + buildDictionaryStep.setJobParams(cmd.toString()); + buildDictionaryStep.setJobClass(CreateInvertedIndexDictionaryJob.class); + return buildDictionaryStep; + } + + private MapReduceExecutable createInvertedIndexStep(IISegment seg, String intermediateHiveTable, String iiOutputTempPath) { + // base cuboid job + MapReduceExecutable buildIIStep = new MapReduceExecutable(); + + StringBuilder cmd = new StringBuilder(); + appendMapReduceParameters(cmd, engineConfig); + + buildIIStep.setName(ExecutableConstants.STEP_NAME_BUILD_II); + + appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName()); + appendExecCmdParameters(cmd, "tablename", intermediateHiveTable); + appendExecCmdParameters(cmd, "output", iiOutputTempPath); + appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II); + + buildIIStep.setMapReduceParams(cmd.toString()); + buildIIStep.setMapReduceJobClass(InvertedIndexJob.class); + return buildIIStep; + } + + private HadoopShellExecutable createCreateHTableStep(IISegment seg) { + HadoopShellExecutable createHtableStep = new HadoopShellExecutable(); + createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE); + StringBuilder cmd = new StringBuilder(); + appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName()); + appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); + + createHtableStep.setJobParams(cmd.toString()); + createHtableStep.setJobClass(IICreateHTableJob.class); + + return createHtableStep; + } + + private MapReduceExecutable createConvertToHfileStep(IISegment seg, String inputPath, String jobId) { + MapReduceExecutable createHFilesStep = new MapReduceExecutable(); + createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_II_TO_HFILE); + StringBuilder cmd = new StringBuilder(); + + appendMapReduceParameters(cmd, engineConfig); + appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName()); + appendExecCmdParameters(cmd, "input", inputPath); + appendExecCmdParameters(cmd, "output", getHFilePath(seg, jobId)); + appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); + appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getIIInstance().getName() + "_Step"); + + createHFilesStep.setMapReduceParams(cmd.toString()); + createHFilesStep.setMapReduceJobClass(IICreateHFileJob.class); + + return createHFilesStep; + } + + private HadoopShellExecutable createBulkLoadStep(IISegment seg, String jobId) { + HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable(); + bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE); + + StringBuilder cmd = new StringBuilder(); + appendExecCmdParameters(cmd, "input", getHFilePath(seg, jobId)); + appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); + appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName()); + + bulkLoadStep.setJobParams(cmd.toString()); + bulkLoadStep.setJobClass(IIBulkLoadJob.class); + + return bulkLoadStep; + + } + + private StringBuilder appendExecCmdParameters(StringBuilder buf, String paraName, String paraValue) { + return buf.append(" -").append(paraName).append(" ").append(paraValue); + } + + private String getJobWorkingDir(String uuid) { + return engineConfig.getHdfsWorkingDirectory() + "kylin-" + uuid; + } + + private String getIntermediateTableIdentity(IIJoinedFlatTableDesc intermediateTableDesc) { + return engineConfig.getConfig().getHiveDatabaseForIntermediateTable() + "." + intermediateTableDesc.getTableName(); + } +}