kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liy...@apache.org
Subject [7/9] incubator-kylin git commit: KYLIN-1010 Decompose project job
Date Thu, 17 Sep 2015 09:45:16 GMT
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/engine-spark/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java b/engine-spark/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java
deleted file mode 100644
index d24cc79..0000000
--- a/engine-spark/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.spark;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.TimeZone;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractKylinTestCase;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
-import org.apache.kylin.job.lock.MockJobLock;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class BuildCubeWithSparkTest {
-
-    private CubeManager cubeManager;
-    private DefaultScheduler scheduler;
-    protected ExecutableManager jobService;
-
-    private static final Log logger = LogFactory.getLog(BuildCubeWithSparkTest.class);
-
-    protected void waitForJob(String jobId) {
-        while (true) {
-            AbstractExecutable job = jobService.getJob(jobId);
-            if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) {
-                break;
-            } else {
-                try {
-                    Thread.sleep(5000);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
-        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
-        System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this
-    }
-
-    @Before
-    public void before() throws Exception {
-        HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA);
-
-        DeployUtil.initCliWorkDir();
-        DeployUtil.deployMetadata();
-        DeployUtil.overrideJobJarLocations();
-
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        jobService = ExecutableManager.getInstance(kylinConfig);
-        for (String jobId : jobService.getAllJobIds()) {
-            jobService.deleteJob(jobId);
-        }
-        scheduler = DefaultScheduler.getInstance();
-        scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock());
-        if (!scheduler.hasStarted()) {
-            throw new RuntimeException("scheduler has not been started");
-        }
-        cubeManager = CubeManager.getInstance(kylinConfig);
-
-    }
-
-    @After
-    public void after() {
-        HBaseMetadataTestCase.staticCleanupTestMetadata();
-    }
-
-    @Test
-    public void test() throws Exception {
-        final CubeSegment segment = createSegment();
-        String confPath = new File(AbstractKylinTestCase.SANDBOX_TEST_DATA).getAbsolutePath();
-        KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar();
-        String coprocessor = KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar();
-        logger.info("confPath location:" + confPath);
-        logger.info("coprocessor location:" + coprocessor);
-        final DefaultChainedExecutable cubingJob = new SparkBatchCubingEngine(confPath, coprocessor).createBatchCubingJob(segment, "BuildCubeWithSpark");
-        jobService.addJob(cubingJob);
-        waitForJob(cubingJob.getId());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(cubingJob.getId()).getState());
-    }
-
-    private void clearSegment(String cubeName) throws Exception {
-        CubeInstance cube = cubeManager.getCube(cubeName);
-        // remove all existing segments
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
-        cubeManager.updateCube(cubeBuilder);
-    }
-
-    private CubeSegment createSegment() throws Exception {
-        String cubeName = "test_kylin_cube_with_slr_left_join_empty";
-        clearSegment(cubeName);
-
-        SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
-        f.setTimeZone(TimeZone.getTimeZone("GMT"));
-        long dateStart = cubeManager.getCube(cubeName).getDescriptor().getModel().getPartitionDesc().getPartitionDateStart();
-        long dateEnd = f.parse("2050-11-12").getTime();
-
-        // this cube's start date is 0, end date is 20501112000000
-        List<String> result = Lists.newArrayList();
-        return cubeManager.appendSegments(cubeManager.getCube(cubeName), dateEnd);
-
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/engine-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/engine-streaming/pom.xml b/engine-streaming/pom.xml
index 46b63b3..955124c 100644
--- a/engine-streaming/pom.xml
+++ b/engine-streaming/pom.xml
@@ -26,11 +26,6 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-invertedindex</artifactId>
-            <version>${project.parent.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-core-storage</artifactId>
             <version>${project.parent.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/invertedindex/pom.xml
----------------------------------------------------------------------
diff --git a/invertedindex/pom.xml b/invertedindex/pom.xml
index 4d1796f..9e8f92e 100644
--- a/invertedindex/pom.xml
+++ b/invertedindex/pom.xml
@@ -33,12 +33,12 @@
         <!--Kylin Jar -->
         <dependency>
             <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-core-metadata</artifactId>
+            <artifactId>kylin-engine-streaming</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-core-dictionary</artifactId>
+            <artifactId>kylin-source-hive</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
 
@@ -56,43 +56,23 @@
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
+            <groupId>org.apache.hive.hcatalog</groupId>
+            <artifactId>hive-hcatalog-core</artifactId>
+            <version>${hive-hcatalog.version}</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-annotations</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
             <scope>provided</scope>
         </dependency>
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-minicluster</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.mrunit</groupId>
-            <artifactId>mrunit</artifactId>
-            <classifier>hadoop2</classifier>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-hadoop2-compat</artifactId>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-common</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-client</artifactId>
             <scope>provided</scope>
         </dependency>
@@ -100,18 +80,12 @@
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-server</artifactId>
             <scope>provided</scope>
-            <!-- version conflict with hadoop2.2 -->
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
-            <scope>provided</scope>
+            <groupId>org.apache.mrunit</groupId>
+            <artifactId>mrunit</artifactId>
+            <classifier>hadoop2</classifier>
+            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/invertedindex/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java
new file mode 100644
index 0000000..87ee70e
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.dict;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.dict.DistinctColumnValuesProvider;
+import org.apache.kylin.engine.mr.DFSFileTable;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.ReadableTable;
+
+/**
+ */
+public class CreateInvertedIndexDictionaryJob extends AbstractHadoopJob {
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_II_NAME);
+            options.addOption(OPTION_INPUT_PATH);
+            parseOptions(options, args);
+
+            final String iiname = getOptionValue(OPTION_II_NAME);
+            final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
+            final KylinConfig config = KylinConfig.getInstanceFromEnv();
+
+            IIManager mgr = IIManager.getInstance(config);
+            IIInstance ii = mgr.getII(iiname);
+
+            mgr.buildInvertedIndexDictionary(ii.getFirstSegment(), new DistinctColumnValuesProvider() {
+                @Override
+                public ReadableTable getDistinctValuesFor(TblColRef col) {
+                    return new DFSFileTable(factColumnsInputPath + "/" + col.getName(), -1);
+                }
+            });
+            return 0;
+        } catch (Exception e) {
+            printUsage(options);
+            throw e;
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        int exitCode = ToolRunner.run(new CreateInvertedIndexDictionaryJob(), args);
+        System.exit(exitCode);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java
new file mode 100644
index 0000000..300c89b
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.invertedindex;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+/**
+ */
+public class IIBulkLoadJob extends AbstractHadoopJob {
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_INPUT_PATH);
+            options.addOption(OPTION_HTABLE_NAME);
+            options.addOption(OPTION_II_NAME);
+            parseOptions(options, args);
+
+            String tableName = getOptionValue(OPTION_HTABLE_NAME);
+            String input = getOptionValue(OPTION_INPUT_PATH);
+            String iiname = getOptionValue(OPTION_II_NAME);
+
+            FileSystem fs = FileSystem.get(getConf());
+            FsPermission permission = new FsPermission((short) 0777);
+            fs.setPermission(new Path(input, IIDesc.HBASE_FAMILY), permission);
+
+            int hbaseExitCode = ToolRunner.run(new LoadIncrementalHFiles(getConf()), new String[] { input, tableName });
+
+            IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
+            IIInstance ii = mgr.getII(iiname);
+            IISegment seg = ii.getFirstSegment();
+            seg.setStorageLocationIdentifier(tableName);
+            seg.setStatus(SegmentStatusEnum.READY);
+            mgr.updateII(ii);
+
+            return hbaseExitCode;
+
+        } catch (Exception e) {
+            printUsage(options);
+            throw e;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
new file mode 100644
index 0000000..528f06f
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.invertedindex;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class IICreateHFileJob extends AbstractHadoopJob {
+
+    protected static final Logger logger = LoggerFactory.getLogger(IICreateHFileJob.class);
+
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_II_NAME);
+            options.addOption(OPTION_INPUT_PATH);
+            options.addOption(OPTION_OUTPUT_PATH);
+            options.addOption(OPTION_HTABLE_NAME);
+            parseOptions(options, args);
+
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+
+            setJobClasspath(job);
+
+            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
+            FileOutputFormat.setOutputPath(job, output);
+
+            job.setInputFormatClass(SequenceFileInputFormat.class);
+            job.setMapperClass(IICreateHFileMapper.class);
+            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+            job.setMapOutputValueClass(KeyValue.class);
+
+            String tableName = getOptionValue(OPTION_HTABLE_NAME);
+            HTable htable = new HTable(HBaseConfiguration.create(getConf()), tableName);
+            HFileOutputFormat.configureIncrementalLoad(job, htable);
+
+            this.deletePath(job.getConfiguration(), output);
+
+            return waitForCompletion(job);
+        } catch (Exception e) {
+            printUsage(options);
+            throw e;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileMapper.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileMapper.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileMapper.java
new file mode 100644
index 0000000..1adf8d6
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileMapper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.invertedindex.model.IIDesc;
+
+/**
+ * @author yangli9
+ */
+public class IICreateHFileMapper extends KylinMapper<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue> {
+
+    long timestamp;
+
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+
+        timestamp = System.currentTimeMillis();
+    }
+
+    @Override
+    protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value, Context context) throws IOException, InterruptedException {
+
+        KeyValue kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), //
+                IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, //
+                IIDesc.HBASE_QUALIFIER_BYTES, 0, IIDesc.HBASE_QUALIFIER_BYTES.length, //
+                timestamp, Type.Put, //
+                value.get(), value.getOffset(), value.getLength());
+
+        context.write(key, kv);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
new file mode 100644
index 0000000..0b7cb7a
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.invertedindex;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+
+/**
+ * @author George Song (ysong1)
+ */
+public class IICreateHTableJob extends AbstractHadoopJob {
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_II_NAME);
+            options.addOption(OPTION_HTABLE_NAME);
+            parseOptions(options, args);
+
+            String tableName = getOptionValue(OPTION_HTABLE_NAME);
+            String iiName = getOptionValue(OPTION_II_NAME);
+
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
+            IIManager iiManager = IIManager.getInstance(config);
+            IIInstance ii = iiManager.getII(iiName);
+            int sharding = ii.getDescriptor().getSharding();
+
+            HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
+            HColumnDescriptor cf = new HColumnDescriptor(IIDesc.HBASE_FAMILY);
+            cf.setMaxVersions(1);
+
+            String hbaseDefaultCC = config.getHbaseDefaultCompressionCodec().toLowerCase();
+
+            switch (hbaseDefaultCC) {
+            case "snappy": {
+                logger.info("hbase will use snappy to compress data");
+                cf.setCompressionType(Compression.Algorithm.SNAPPY);
+                break;
+            }
+            case "lzo": {
+                logger.info("hbase will use lzo to compress data");
+                cf.setCompressionType(Compression.Algorithm.LZO);
+                break;
+            }
+            case "gz":
+            case "gzip": {
+                logger.info("hbase will use gzip to compress data");
+                cf.setCompressionType(Compression.Algorithm.GZ);
+                break;
+            }
+            case "lz4": {
+                logger.info("hbase will use lz4 to compress data");
+                cf.setCompressionType(Compression.Algorithm.LZ4);
+                break;
+            }
+            default: {
+                logger.info("hbase will not user any compression codec to compress data");
+            }
+            }
+
+            cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
+            tableDesc.addFamily(cf);
+            tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix());
+            tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
+            tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
+
+            Configuration conf = HBaseConfiguration.create(getConf());
+            if (User.isHBaseSecurityEnabled(conf)) {
+                // add coprocessor for bulk load
+                tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
+            }
+
+            IIDeployCoprocessorCLI.deployCoprocessor(tableDesc);
+
+            // drop the table first
+            HBaseAdmin admin = new HBaseAdmin(conf);
+            if (admin.tableExists(tableName)) {
+                admin.disableTable(tableName);
+                admin.deleteTable(tableName);
+            }
+
+            // create table
+            byte[][] splitKeys = getSplits(sharding);
+            if (splitKeys.length == 0)
+                splitKeys = null;
+            admin.createTable(tableDesc, splitKeys);
+            if (splitKeys != null) {
+                for (int i = 0; i < splitKeys.length; i++) {
+                    System.out.println("split key " + i + ": " + BytesUtil.toHex(splitKeys[i]));
+                }
+            }
+            System.out.println("create hbase table " + tableName + " done.");
+            admin.close();
+
+            return 0;
+        } catch (Exception e) {
+            printUsage(options);
+            throw e;
+        }
+    }
+
+    //one region for one shard
+    private byte[][] getSplits(int shard) {
+        byte[][] result = new byte[shard - 1][];
+        for (int i = 1; i < shard; ++i) {
+            byte[] split = new byte[IIKeyValueCodec.SHARD_LEN];
+            BytesUtil.writeUnsigned(i, split, 0, IIKeyValueCodec.SHARD_LEN);
+            result[i - 1] = split;
+        }
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDeployCoprocessorCLI.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDeployCoprocessorCLI.java
new file mode 100644
index 0000000..a4c1961
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDeployCoprocessorCLI.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.invertedindex;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * THIS IS A TAILORED DUPLICATE OF org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI TO AVOID CYCLIC
+ * DEPENDENCY. INVERTED-INDEX CODE NOW SPLITTED ACROSS kylin-invertedindex AND kylin-storage-hbase.
+ * DEFENITELY NEED FURTHER REFACTOR.
+ */
+public class IIDeployCoprocessorCLI {
+
+    private static final Logger logger = LoggerFactory.getLogger(IIDeployCoprocessorCLI.class);
+
+    public static final String CubeObserverClass = "org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.AggregateRegionObserver";
+    public static final String CubeEndpointClass = "org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.CubeVisitService";
+    public static final String IIEndpointClass = "org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint";
+
+    public static void deployCoprocessor(HTableDescriptor tableDesc) {
+        try {
+            initHTableCoprocessor(tableDesc);
+            logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor.");
+
+        } catch (Exception ex) {
+            logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex);
+            logger.error("Will try creating the table without coprocessor.");
+        }
+    }
+
+    private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        Configuration hconf = HadoopUtil.getCurrentConfiguration();
+        FileSystem fileSystem = FileSystem.get(hconf);
+
+        String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
+        Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
+
+        addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+    }
+
+    private static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
+        logger.info("Add coprocessor on " + desc.getNameAsString());
+        desc.addCoprocessor(IIEndpointClass, hdfsCoprocessorJar, 1000, null);
+        desc.addCoprocessor(CubeEndpointClass, hdfsCoprocessorJar, 1001, null);
+        desc.addCoprocessor(CubeObserverClass, hdfsCoprocessorJar, 1002, null);
+    }
+
+    private static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
+        Path uploadPath = null;
+        File localCoprocessorFile = new File(localCoprocessorJar);
+
+        // check existing jars
+        if (oldJarPaths == null) {
+            oldJarPaths = new HashSet<String>();
+        }
+        Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
+        for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
+            if (isSame(localCoprocessorFile, fileStatus)) {
+                uploadPath = fileStatus.getPath();
+                break;
+            }
+            String filename = fileStatus.getPath().toString();
+            if (filename.endsWith(".jar")) {
+                oldJarPaths.add(filename);
+            }
+        }
+
+        // upload if not existing
+        if (uploadPath == null) {
+            // figure out a unique new jar file name
+            Set<String> oldJarNames = new HashSet<String>();
+            for (String path : oldJarPaths) {
+                oldJarNames.add(new Path(path).getName());
+            }
+            String baseName = getBaseFileName(localCoprocessorJar);
+            String newName = null;
+            int i = 0;
+            while (newName == null) {
+                newName = baseName + "-" + (i++) + ".jar";
+                if (oldJarNames.contains(newName))
+                    newName = null;
+            }
+
+            // upload
+            uploadPath = new Path(coprocessorDir, newName);
+            FileInputStream in = null;
+            FSDataOutputStream out = null;
+            try {
+                in = new FileInputStream(localCoprocessorFile);
+                out = fileSystem.create(uploadPath);
+                IOUtils.copy(in, out);
+            } finally {
+                IOUtils.closeQuietly(in);
+                IOUtils.closeQuietly(out);
+            }
+
+            fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1);
+
+        }
+
+        uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
+        return uploadPath;
+    }
+
+    private static boolean isSame(File localCoprocessorFile, FileStatus fileStatus) {
+        return fileStatus.getLen() == localCoprocessorFile.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified();
+    }
+
+    private static String getBaseFileName(String localCoprocessorJar) {
+        File localJar = new File(localCoprocessorJar);
+        String baseName = localJar.getName();
+        if (baseName.endsWith(".jar"))
+            baseName = baseName.substring(0, baseName.length() - ".jar".length());
+        return baseName;
+    }
+
+    private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
+        String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
+        Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
+        fileSystem.mkdirs(coprocessorDir);
+        return coprocessorDir;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
new file mode 100644
index 0000000..1f4611b
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.invertedindex;
+
+import java.io.IOException;
+import java.util.HashSet;
+
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.engine.mr.KylinReducer;
+
+/**
+ * @author yangli9
+ */
+public class IIDistinctColumnsCombiner extends KylinReducer<ShortWritable, Text, ShortWritable, Text> {
+
+    private Text outputValue = new Text();
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+
+    }
+
+    @Override
+    public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+
+        HashSet<ByteArray> set = new HashSet<ByteArray>();
+        for (Text textValue : values) {
+            ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
+            set.add(value);
+        }
+
+        for (ByteArray value : set) {
+            outputValue.set(value.array(), value.offset(), value.length());
+            context.write(key, outputValue);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
new file mode 100644
index 0000000..042678e
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.IntermediateColumnDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ */
+public class IIDistinctColumnsJob extends AbstractHadoopJob {
+    protected static final Logger logger = LoggerFactory.getLogger(IIDistinctColumnsJob.class);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_TABLE_NAME);
+            options.addOption(OPTION_II_NAME);
+            options.addOption(OPTION_OUTPUT_PATH);
+            parseOptions(options, args);
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            String tableName = getOptionValue(OPTION_TABLE_NAME).toUpperCase();
+            String iiName = getOptionValue(OPTION_II_NAME);
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+            // ----------------------------------------------------------------------------
+
+            logger.info("Starting: " + job.getJobName() + " on table " + tableName);
+
+            IIManager iiMgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
+            IIInstance ii = iiMgr.getII(iiName);
+            job.getConfiguration().set(BatchConstants.TABLE_NAME, tableName);
+            job.getConfiguration().set(BatchConstants.TABLE_COLUMNS, getColumns(ii));
+
+            setJobClasspath(job);
+
+            setupMapper();
+            setupReducer(output);
+
+            return waitForCompletion(job);
+
+        } catch (Exception e) {
+            printUsage(options);
+            throw e;
+        }
+
+    }
+
+    private String getColumns(IIInstance ii) {
+        IIJoinedFlatTableDesc iiflat = new IIJoinedFlatTableDesc(ii.getDescriptor());
+        StringBuilder buf = new StringBuilder();
+        for (IntermediateColumnDesc col : iiflat.getColumnList()) {
+            if (buf.length() > 0)
+                buf.append(",");
+            buf.append(col.getColumnName());
+        }
+        return buf.toString();
+    }
+
+    private void setupMapper() throws IOException {
+
+        String tableName = job.getConfiguration().get(BatchConstants.TABLE_NAME);
+        String[] dbTableNames = HadoopUtil.parseHiveTableName(tableName);
+
+        logger.info("setting hcat input format, db name {} , table name {}", dbTableNames[0], dbTableNames[1]);
+
+        HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]);
+
+        job.setInputFormatClass(HCatInputFormat.class);
+
+        job.setMapperClass(IIDistinctColumnsMapper.class);
+        job.setCombinerClass(IIDistinctColumnsCombiner.class);
+        job.setMapOutputKeyClass(ShortWritable.class);
+        job.setMapOutputValueClass(Text.class);
+    }
+
+    private void setupReducer(Path output) throws IOException {
+        job.setReducerClass(IIDistinctColumnsReducer.class);
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+        job.setOutputKeyClass(NullWritable.class);
+        job.setOutputValueClass(Text.class);
+
+        FileOutputFormat.setOutputPath(job, output);
+        job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
+
+        job.setNumReduceTasks(1);
+
+        deletePath(job.getConfiguration(), output);
+    }
+
+    public static void main(String[] args) throws Exception {
+        IIDistinctColumnsJob job = new IIDistinctColumnsJob();
+        int exitCode = ToolRunner.run(job, args);
+        System.exit(exitCode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsMapper.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsMapper.java
new file mode 100644
index 0000000..3418a57
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsMapper.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.engine.mr.KylinMapper;
+
+/**
+ * @author yangli9
+ */
+public class IIDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, ShortWritable, Text> {
+
+    private ShortWritable outputKey = new ShortWritable();
+    private Text outputValue = new Text();
+    private HCatSchema schema = null;
+    private int columnSize = 0;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+        schema = HCatInputFormat.getTableSchema(context.getConfiguration());
+        columnSize = schema.getFields().size();
+    }
+
+    @Override
+    public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
+
+        HCatFieldSchema fieldSchema = null;
+        for (short i = 0; i < columnSize; i++) {
+            outputKey.set(i);
+            fieldSchema = schema.get(i);
+            Object fieldValue = record.get(fieldSchema.getName(), schema);
+            if (fieldValue == null)
+                continue;
+            byte[] bytes = Bytes.toBytes(fieldValue.toString());
+            outputValue.set(bytes, 0, bytes.length);
+            context.write(outputKey, outputValue);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
new file mode 100644
index 0000000..fcb4dd5
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.invertedindex;
+
+import java.io.IOException;
+import java.util.HashSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+
+/**
+ * @author yangli9
+ */
+public class IIDistinctColumnsReducer extends KylinReducer<ShortWritable, Text, NullWritable, Text> {
+
+    private String[] columns;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+
+        Configuration conf = context.getConfiguration();
+        this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(",");
+    }
+
+    @Override
+    public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+        String columnName = columns[key.get()];
+
+        HashSet<ByteArray> set = new HashSet<ByteArray>();
+        for (Text textValue : values) {
+            ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
+            set.add(value);
+        }
+
+        Configuration conf = context.getConfiguration();
+        FileSystem fs = FileSystem.get(conf);
+        String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
+        FSDataOutputStream out = fs.create(new Path(outputPath, columnName));
+
+        try {
+            for (ByteArray value : set) {
+                out.write(value.array(), value.offset(), value.length());
+                out.write('\n');
+            }
+        } finally {
+            out.close();
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
new file mode 100644
index 0000000..c9ad448
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.invertedindex;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexJob extends AbstractHadoopJob {
+    protected static final Logger logger = LoggerFactory.getLogger(InvertedIndexJob.class);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_II_NAME);
+            options.addOption(OPTION_TABLE_NAME);
+            options.addOption(OPTION_OUTPUT_PATH);
+            parseOptions(options, args);
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            String iiname = getOptionValue(OPTION_II_NAME);
+            String intermediateTable = getOptionValue(OPTION_TABLE_NAME);
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+            // ----------------------------------------------------------------------------
+
+            System.out.println("Starting: " + job.getJobName());
+
+            IIInstance ii = getII(iiname);
+            short sharding = ii.getDescriptor().getSharding();
+
+            setJobClasspath(job);
+
+            setupMapper(intermediateTable);
+            setupReducer(output, sharding);
+            attachMetadata(ii);
+
+            return waitForCompletion(job);
+
+        } catch (Exception e) {
+            printUsage(options);
+            throw e;
+        } finally {
+            if (job != null)
+                cleanupTempConfFile(job.getConfiguration());
+        }
+
+    }
+
+    private IIInstance getII(String iiName) {
+        IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
+        IIInstance ii = mgr.getII(iiName);
+        if (ii == null)
+            throw new IllegalArgumentException("No Inverted Index found by name " + iiName);
+        return ii;
+    }
+
+    private void attachMetadata(IIInstance ii) throws IOException {
+
+        Configuration conf = job.getConfiguration();
+        attachKylinPropsAndMetadata(ii, conf);
+
+        IISegment seg = ii.getFirstSegment();
+        conf.set(BatchConstants.CFG_II_NAME, ii.getName());
+        conf.set(BatchConstants.CFG_II_SEGMENT_NAME, seg.getName());
+    }
+
+    protected void attachKylinPropsAndMetadata(IIInstance ii, Configuration conf) throws IOException {
+        MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+        // write II / model_desc / II_desc / dict / table
+        ArrayList<String> dumpList = new ArrayList<String>();
+        dumpList.add(ii.getResourcePath());
+        dumpList.add(ii.getDescriptor().getModel().getResourcePath());
+        dumpList.add(ii.getDescriptor().getResourcePath());
+
+        for (String tableName : ii.getDescriptor().getModel().getAllTables()) {
+            TableDesc table = metaMgr.getTableDesc(tableName);
+            dumpList.add(table.getResourcePath());
+        }
+        for (IISegment segment : ii.getSegments()) {
+            dumpList.addAll(segment.getDictionaryPaths());
+        }
+
+        attachKylinPropsAndMetadata(dumpList, conf);
+    }
+
+    private void setupMapper(String intermediateTable) throws IOException {
+
+        String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
+        HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]);
+
+        job.setInputFormatClass(HCatInputFormat.class);
+
+        job.setMapperClass(InvertedIndexMapper.class);
+        job.setMapOutputKeyClass(LongWritable.class);
+        job.setMapOutputValueClass(ImmutableBytesWritable.class);
+        job.setPartitionerClass(InvertedIndexPartitioner.class);
+    }
+
+    private void setupReducer(Path output, short sharding) throws IOException {
+        job.setReducerClass(InvertedIndexReducer.class);
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+        job.setOutputKeyClass(ImmutableBytesWritable.class);
+        job.setOutputValueClass(ImmutableBytesWritable.class);
+
+        job.setNumReduceTasks(sharding);
+
+        FileOutputFormat.setOutputPath(job, output);
+
+        job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
+
+        deletePath(job.getConfiguration(), output);
+    }
+
+    public static void main(String[] args) throws Exception {
+        InvertedIndexJob job = new InvertedIndexJob();
+        int exitCode = ToolRunner.run(job, args);
+        System.exit(exitCode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
new file mode 100644
index 0000000..bc43b65
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.invertedindex;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, LongWritable, ImmutableBytesWritable> {
+
+    private TableRecordInfo info;
+    private TableRecord rec;
+
+    private LongWritable outputKey;
+    private ImmutableBytesWritable outputValue;
+    private HCatSchema schema = null;
+    private List<HCatFieldSchema> fields;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+
+        Configuration conf = context.getConfiguration();
+
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+        IIManager mgr = IIManager.getInstance(config);
+        IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
+        IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
+        this.info = new TableRecordInfo(seg);
+        this.rec = this.info.createTableRecord();
+
+        outputKey = new LongWritable();
+        outputValue = new ImmutableBytesWritable(rec.getBytes());
+
+        schema = HCatInputFormat.getTableSchema(context.getConfiguration());
+
+        fields = schema.getFields();
+    }
+
+    @Override
+    public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
+
+        rec.reset();
+        for (int i = 0; i < fields.size(); i++) {
+            Object fieldValue = record.get(i);
+            rec.setValueString(i, fieldValue == null ? null : fieldValue.toString());
+        }
+
+        outputKey.set(rec.getTimestamp());
+        // outputValue's backing bytes array is the same as rec
+
+        context.write(outputKey, outputValue);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
new file mode 100644
index 0000000..396c221
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexPartitioner extends Partitioner<LongWritable, ImmutableBytesWritable> implements Configurable {
+
+    private Configuration conf;
+    private TableRecordInfo info;
+    private TableRecord rec;
+
+    @Override
+    public int getPartition(LongWritable key, ImmutableBytesWritable value, int numPartitions) {
+        rec.setBytes(value.get(), value.getOffset(), value.getLength());
+        return rec.getShard();
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+        try {
+            KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+            IIManager mgr = IIManager.getInstance(config);
+            IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
+            IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
+            this.info = new TableRecordInfo(seg);
+            this.rec = this.info.createTableRecord();
+        } catch (IOException e) {
+            throw new RuntimeException("", e);
+        }
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
new file mode 100644
index 0000000..5a69eec
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.invertedindex;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.index.IncrementalSliceMaker;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexReducer extends KylinReducer<LongWritable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
+
+    private TableRecordInfo info;
+    private TableRecord rec;
+    private IncrementalSliceMaker builder;
+    private IIKeyValueCodec kv;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+
+        Configuration conf = context.getConfiguration();
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+        IIManager mgr = IIManager.getInstance(config);
+        IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
+        IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
+        info = new TableRecordInfo(seg);
+        rec = info.createTableRecord();
+        builder = null;
+        kv = new IIKeyValueCodec(info.getDigest());
+    }
+
+    @Override
+    public void reduce(LongWritable key, Iterable<ImmutableBytesWritable> values, Context context) //
+            throws IOException, InterruptedException {
+        for (ImmutableBytesWritable v : values) {
+            rec.setBytes(v.get(), v.getOffset(), v.getLength());
+
+            if (builder == null) {
+                builder = new IncrementalSliceMaker(info, rec.getShard());
+            }
+
+            //TODO: to delete this log
+            System.out.println(rec.getShard() + " - " + rec);
+
+            Slice slice = builder.append(rec);
+            if (slice != null) {
+                output(slice, context);
+            }
+        }
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+        Slice slice = builder.close();
+        if (slice != null) {
+            output(slice, context);
+        }
+    }
+
+    private void output(Slice slice, Context context) throws IOException, InterruptedException {
+        for (IIRow pair : kv.encodeKeyValue(slice)) {
+            context.write(pair.getKey(), pair.getValue());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java b/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java
new file mode 100644
index 0000000..0af846b
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.invertedindex;
+
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+/**
+ */
+public class IIJob extends DefaultChainedExecutable {
+
+    public IIJob() {
+        super();
+    }
+
+    private static final String II_INSTANCE_NAME = "iiName";
+    private static final String SEGMENT_ID = "segmentId";
+
+    void setIIName(String name) {
+        setParam(II_INSTANCE_NAME, name);
+    }
+
+    public String getIIName() {
+        return getParam(II_INSTANCE_NAME);
+    }
+
+    void setSegmentId(String segmentId) {
+        setParam(SEGMENT_ID, segmentId);
+    }
+
+    public String getSegmentId() {
+        return getParam(SEGMENT_ID);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c59e107/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java b/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
new file mode 100644
index 0000000..4bd06c5
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.invertedindex;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.hadoop.dict.CreateInvertedIndexDictionaryJob;
+import org.apache.kylin.job.hadoop.invertedindex.IIBulkLoadJob;
+import org.apache.kylin.job.hadoop.invertedindex.IICreateHFileJob;
+import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
+import org.apache.kylin.job.hadoop.invertedindex.IIDistinctColumnsJob;
+import org.apache.kylin.job.hadoop.invertedindex.InvertedIndexJob;
+import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
+import org.apache.kylin.source.hive.HiveMRInput.BatchCubingInputSide;
+
+import com.google.common.base.Preconditions;
+
+/**
+ */
+public final class IIJobBuilder {
+
+    final JobEngineConfig engineConfig;
+
+    public IIJobBuilder(JobEngineConfig engineConfig) {
+        this.engineConfig = engineConfig;
+    }
+
+    public IIJob buildJob(IISegment seg, String submitter) {
+        checkPreconditions(seg);
+
+        IIJob result = initialJob(seg, "BUILD", submitter);
+        final String jobId = result.getId();
+        final IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(seg.getIIDesc());
+        final String intermediateTableIdentity = getIntermediateTableIdentity(intermediateTableDesc);
+        final String factDistinctColumnsPath = getIIDistinctColumnsPath(seg, jobId);
+        final String iiRootPath = getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/";
+        final String iiPath = iiRootPath + "*";
+
+        final AbstractExecutable intermediateHiveTableStep = createFlatHiveTableStep(intermediateTableDesc, jobId);
+        result.addTask(intermediateHiveTableStep);
+
+        result.addTask(createFactDistinctColumnsStep(seg, intermediateTableIdentity, jobId, factDistinctColumnsPath));
+
+        result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath));
+
+        result.addTask(createInvertedIndexStep(seg, intermediateTableIdentity, iiRootPath));
+
+        // create htable step
+        result.addTask(createCreateHTableStep(seg));
+
+        // generate hfiles step
+        result.addTask(createConvertToHfileStep(seg, iiPath, jobId));
+
+        // bulk load step
+        result.addTask(createBulkLoadStep(seg, jobId));
+
+        return result;
+    }
+
+    private AbstractExecutable createFlatHiveTableStep(IIJoinedFlatTableDesc intermediateTableDesc, String jobId) {
+        return BatchCubingInputSide.createFlatHiveTableStep(engineConfig, intermediateTableDesc, jobId);
+    }
+
+    private IIJob initialJob(IISegment seg, String type, String submitter) {
+        IIJob result = new IIJob();
+        SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
+        format.setTimeZone(TimeZone.getTimeZone(engineConfig.getTimeZone()));
+        result.setIIName(seg.getIIInstance().getName());
+        result.setSegmentId(seg.getUuid());
+        result.setName(seg.getIIInstance().getName() + " - " + seg.getName() + " - " + type + " - " + format.format(new Date(System.currentTimeMillis())));
+        result.setSubmitter(submitter);
+        return result;
+    }
+
+    private void checkPreconditions(IISegment seg) {
+        Preconditions.checkNotNull(seg, "segment cannot be null");
+        Preconditions.checkNotNull(engineConfig, "jobEngineConfig cannot be null");
+    }
+
+    private void appendMapReduceParameters(StringBuilder builder, JobEngineConfig engineConfig) {
+        try {
+            String jobConf = engineConfig.getHadoopJobConfFilePath(RealizationCapacity.MEDIUM);
+            if (jobConf != null && jobConf.length() > 0) {
+                builder.append(" -conf ").append(jobConf);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private String getIIDistinctColumnsPath(IISegment seg, String jobUuid) {
+        return getJobWorkingDir(jobUuid) + "/" + seg.getIIInstance().getName() + "/ii_distinct_columns";
+    }
+
+    private String getHFilePath(IISegment seg, String jobId) {
+        return getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/hfile/";
+    }
+
+    private MapReduceExecutable createFactDistinctColumnsStep(IISegment seg, String factTableName, String jobId, String output) {
+        MapReduceExecutable result = new MapReduceExecutable();
+        result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
+        result.setMapReduceJobClass(IIDistinctColumnsJob.class);
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd, engineConfig);
+        appendExecCmdParameters(cmd, "tablename", factTableName);
+        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
+        appendExecCmdParameters(cmd, "output", output);
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getIIInstance().getName() + "_Step");
+
+        result.setMapReduceParams(cmd.toString());
+        return result;
+    }
+
+    private HadoopShellExecutable createBuildDictionaryStep(IISegment seg, String factDistinctColumnsPath) {
+        // base cuboid job
+        HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
+        buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
+        StringBuilder cmd = new StringBuilder();
+        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
+        appendExecCmdParameters(cmd, "input", factDistinctColumnsPath);
+
+        buildDictionaryStep.setJobParams(cmd.toString());
+        buildDictionaryStep.setJobClass(CreateInvertedIndexDictionaryJob.class);
+        return buildDictionaryStep;
+    }
+
+    private MapReduceExecutable createInvertedIndexStep(IISegment seg, String intermediateHiveTable, String iiOutputTempPath) {
+        // base cuboid job
+        MapReduceExecutable buildIIStep = new MapReduceExecutable();
+
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd, engineConfig);
+
+        buildIIStep.setName(ExecutableConstants.STEP_NAME_BUILD_II);
+
+        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
+        appendExecCmdParameters(cmd, "tablename", intermediateHiveTable);
+        appendExecCmdParameters(cmd, "output", iiOutputTempPath);
+        appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II);
+
+        buildIIStep.setMapReduceParams(cmd.toString());
+        buildIIStep.setMapReduceJobClass(InvertedIndexJob.class);
+        return buildIIStep;
+    }
+
+    private HadoopShellExecutable createCreateHTableStep(IISegment seg) {
+        HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
+        createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
+        StringBuilder cmd = new StringBuilder();
+        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
+        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+
+        createHtableStep.setJobParams(cmd.toString());
+        createHtableStep.setJobClass(IICreateHTableJob.class);
+
+        return createHtableStep;
+    }
+
+    private MapReduceExecutable createConvertToHfileStep(IISegment seg, String inputPath, String jobId) {
+        MapReduceExecutable createHFilesStep = new MapReduceExecutable();
+        createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_II_TO_HFILE);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd, engineConfig);
+        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
+        appendExecCmdParameters(cmd, "input", inputPath);
+        appendExecCmdParameters(cmd, "output", getHFilePath(seg, jobId));
+        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+        appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getIIInstance().getName() + "_Step");
+
+        createHFilesStep.setMapReduceParams(cmd.toString());
+        createHFilesStep.setMapReduceJobClass(IICreateHFileJob.class);
+
+        return createHFilesStep;
+    }
+
+    private HadoopShellExecutable createBulkLoadStep(IISegment seg, String jobId) {
+        HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
+        bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
+
+        StringBuilder cmd = new StringBuilder();
+        appendExecCmdParameters(cmd, "input", getHFilePath(seg, jobId));
+        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
+
+        bulkLoadStep.setJobParams(cmd.toString());
+        bulkLoadStep.setJobClass(IIBulkLoadJob.class);
+
+        return bulkLoadStep;
+
+    }
+
+    private StringBuilder appendExecCmdParameters(StringBuilder buf, String paraName, String paraValue) {
+        return buf.append(" -").append(paraName).append(" ").append(paraValue);
+    }
+
+    private String getJobWorkingDir(String uuid) {
+        return engineConfig.getHdfsWorkingDirectory() + "kylin-" + uuid;
+    }
+
+    private String getIntermediateTableIdentity(IIJoinedFlatTableDesc intermediateTableDesc) {
+        return engineConfig.getConfig().getHiveDatabaseForIntermediateTable() + "." + intermediateTableDesc.getTableName();
+    }
+}


Mime
View raw message