kylin-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] shaofengshi closed pull request #139: Kylin 3221 Some improvements for lookup table
Date Fri, 01 Jun 2018 05:42:59 GMT
shaofengshi closed pull request #139: Kylin 3221 Some improvements for lookup table
URL: https://github.com/apache/kylin/pull/139
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index a971f14136..c6476e2773 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -414,6 +414,19 @@ public int getTableSnapshotMaxMB() {
         return Integer.parseInt(getOptional("kylin.snapshot.max-mb", "300"));
     }
 
+    public int getExtTableSnapshotShardingMB() {
+        return Integer.parseInt(getOptional("kylin.snapshot.ext.shard-mb", "500"));
+    }
+
+    public String getExtTableSnapshotLocalCachePath() {
+        return getOptional("kylin.snapshot.ext.local.cache.path", "lookup_cache");
+    }
+
+    public double getExtTableSnapshotLocalCacheMaxSizeGB() {
+        return Double.parseDouble(getOptional("kylin.snapshot.ext.local.cache.max-size-gb", "200"));
+    }
+
+
     // ============================================================================
     // CUBE
     // ============================================================================
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index bda6cd00f2..a71db459bd 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -77,6 +77,7 @@
     public static final String BAD_QUERY_RESOURCE_ROOT = "/bad_query";
     public static final String DRAFT_RESOURCE_ROOT = "/draft";
     public static final String USER_ROOT = "/user";
+    public static final String EXT_SNAPSHOT_RESOURCE_ROOT = "/ext_table_snapshot";
 
     public static final String METASTORE_UUID_TAG = "/UUID";
 
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index 93f5e19adb..11284f6401 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -303,6 +303,27 @@ public void clearCacheForCubeMigration(String cube, String project, String model
         }
     }
 
+    public void buildLookupSnapshotCache(String project, String lookupTableName, String snapshotID) throws IOException {
+        String url = baseUrl + "/tables/" + project + "/" + lookupTableName + "/" + snapshotID + "/snapshotLocalCache";
+        HttpPut put = new HttpPut(url);
+        HttpResponse response = client.execute(put);
+        getContent(response);
+        if (response.getStatusLine().getStatusCode() != 200) {
+            throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with url " + url + "\n");
+        }
+    }
+
+    public String getLookupSnapshotCacheState(String lookupTableName, String snapshotID) throws IOException {
+        String url = baseUrl + "/tables/" + lookupTableName + "/" + snapshotID + "/snapshotLocalCache/state";
+        HttpGet get = new HttpGet(url);
+        HttpResponse response = client.execute(get);
+        String content = getContent(response);
+        if (response.getStatusLine().getStatusCode() != 200) {
+            throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with url " + url + "\n");
+        }
+        return content;
+    }
+
     private HashMap dealResponse(HttpResponse response) throws IOException {
         if (response.getStatusLine().getStatusCode() != 200) {
             throw new IOException("Invalid response " + response.getStatusLine().getStatusCode());
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
index 5dffd96458..e12b689d86 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
@@ -75,7 +75,7 @@ public static CapabilityResult check(CubeInstance cube, SQLDigest digest) {
             }
         } else {
             //for non query-on-facttable 
-            if (cube.getSegments().get(0).getSnapshots().containsKey(digest.factTable)) {
+            if (cube.getSegments().get(0).getSnapshots().containsKey(digest.factTable) || cube.getSnapshots().containsKey(digest.factTable)) {
 
                 Set<TblColRef> dimCols = Sets.newHashSet(cube.getModel().findFirstTable(digest.factTable).getColumns());
 
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 035cf7bcbc..bc6083e17e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -28,6 +28,7 @@
 import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
+import com.google.common.collect.Maps;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.persistence.ResourceStore;
@@ -124,6 +125,9 @@ public static CubeInstance create(String cubeName, CubeDesc cubeDesc) {
     @JsonProperty("cuboid_last_optimized")
     private long cuboidLastOptimized;
 
+    @JsonProperty("snapshots")
+    private Map<String, String> snapshots = Maps.newHashMap();
+
     // cuboid scheduler lazy built
     transient private CuboidScheduler cuboidScheduler;
 
@@ -652,6 +656,20 @@ public int getEngineType() {
         return getDescriptor().getEngineType();
     }
 
+    public Map<String, String> getSnapshots() {
+        if (snapshots == null)
+            snapshots = Maps.newHashMap();
+        return snapshots;
+    }
+
+    public String getSnapshotResPath(String tableName) {
+        return getSnapshots().get(tableName);
+    }
+
+    public void putSnapshotResPath(String table, String snapshotResPath) {
+        getSnapshots().put(table, snapshotResPath);
+    }
+
     public static CubeInstance getCopyOf(CubeInstance cubeInstance) {
         CubeInstance newCube = new CubeInstance();
         newCube.setName(cubeInstance.getName());
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index fc2ad3de29..a6022aa826 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -32,6 +32,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
@@ -44,9 +45,13 @@
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.SnapshotTableDesc;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.LookupProviderFactory;
 import org.apache.kylin.dict.lookup.SnapshotManager;
 import org.apache.kylin.dict.lookup.SnapshotTable;
 import org.apache.kylin.metadata.TableMetadataManager;
@@ -290,6 +295,18 @@ public CubeInstance updateCubeSegStatus(CubeSegment seg, SegmentStatusEnum statu
         }
     }
 
+    public CubeInstance updateCubeLookupSnapshot(CubeInstance cube, String lookupTableName, String newSnapshotResPath) throws IOException {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
+            cube = cube.latestCopyForWrite();
+
+            CubeUpdate update = new CubeUpdate(cube);
+            Map<String, String> map = Maps.newHashMap();
+            map.put(lookupTableName, newSnapshotResPath);
+            update.setUpdateTableSnapshotPath(map);
+            return updateCube(update);
+        }
+    }
+
     private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IOException {
         if (update == null || update.getCubeInstance() == null)
             throw new IllegalStateException();
@@ -353,6 +370,12 @@ private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IO
             cube.setCuboidsRecommend(update.getCuboidsRecommend());
         }
 
+        if (update.getUpdateTableSnapshotPath() != null) {
+            for(Map.Entry<String, String> lookupSnapshotPathEntry : update.getUpdateTableSnapshotPath().entrySet()) {
+                cube.putSnapshotResPath(lookupSnapshotPathEntry.getKey(), lookupSnapshotPathEntry.getValue());
+            }
+        }
+
         try {
             cube = crud.save(cube);
         } catch (WriteConflictException ise) {
@@ -435,6 +458,55 @@ public CubeInstance dropCube(String cubeName, boolean deleteDesc) throws IOExcep
         }
     }
 
+    public ILookupTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
+        String tableName = join.getPKSide().getTableIdentity();
+        CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+        SnapshotTableDesc snapshotTableDesc = cubeDesc.getSnapshotTableDesc(tableName);
+        if (snapshotTableDesc == null || !snapshotTableDesc.isExtSnapshotTable()) {
+            return getInMemLookupTable(cubeSegment, join, snapshotTableDesc);
+        } else {
+            return getExtLookupTable(cubeSegment, tableName, snapshotTableDesc);
+        }
+    }
+
+    private ILookupTable getInMemLookupTable(CubeSegment cubeSegment, JoinDesc join, SnapshotTableDesc snapshotTableDesc) {
+        String tableName = join.getPKSide().getTableIdentity();
+        String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, snapshotTableDesc);
+        String[] pkCols = join.getPrimaryKey();
+
+        try {
+            SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath);
+            TableDesc tableDesc = getMetadataManager().getTableDesc(tableName, cubeSegment.getProject());
+            return LookupProviderFactory.getInMemLookupTable(tableDesc, pkCols, snapshot);
+        } catch (IOException e) {
+            throw new IllegalStateException(
+                    "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
+        }
+    }
+
+    private ILookupTable getExtLookupTable(CubeSegment cubeSegment, String tableName, SnapshotTableDesc snapshotTableDesc) {
+        String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, snapshotTableDesc);
+
+        ExtTableSnapshotInfo extTableSnapshot = ExtTableSnapshotInfoManager.getInstance(config).getSnapshot(
+                snapshotResPath);
+        TableDesc tableDesc = getMetadataManager().getTableDesc(tableName, cubeSegment.getProject());
+        return LookupProviderFactory.getExtLookupTable(tableDesc, extTableSnapshot);
+    }
+
+    private String getSnapshotResPath(CubeSegment cubeSegment, String tableName, SnapshotTableDesc snapshotTableDesc) {
+        String snapshotResPath;
+        if (snapshotTableDesc == null || !snapshotTableDesc.isGlobal()) {
+            snapshotResPath = cubeSegment.getSnapshotResPath(tableName);
+        } else {
+            snapshotResPath = cubeSegment.getCubeInstance().getSnapshotResPath(tableName);
+        }
+        if (snapshotResPath == null) {
+            throw new IllegalStateException("No snapshot for table '" + tableName + "' found on cube segment"
+                    + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment);
+        }
+        return snapshotResPath;
+    }
+
     @VisibleForTesting
     /*private*/ String generateStorageLocation() {
         String namePrefix = config.getHBaseTableNamePrefix();
@@ -972,8 +1044,8 @@ public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable)
         return dictAssist.buildSnapshotTable(cubeSeg, lookupTable);
     }
 
-    public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
-        return dictAssist.getLookupTable(cubeSegment, join);
+    private TableMetadataManager getMetadataManager() {
+        return TableMetadataManager.getInstance(config);
     }
 
     private class DictionaryAssist {
@@ -1055,31 +1127,25 @@ public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable)
             IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc);
             SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
 
-            segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
-            CubeUpdate update = new CubeUpdate(cubeCopy);
-            update.setToUpdateSegs(segCopy);
-            updateCube(update);
-
-            return snapshot;
-        }
-
-        public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
-
-            String tableName = join.getPKSide().getTableIdentity();
-            String[] pkCols = join.getPrimaryKey();
-            String snapshotResPath = cubeSegment.getSnapshotResPath(tableName);
-            if (snapshotResPath == null)
-                throw new IllegalStateException("No snapshot for table '" + tableName + "' found on cube segment"
-                        + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment);
+            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+            if (!cubeDesc.isGlobalSnapshotTable(lookupTable)) {
+                segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
+                CubeUpdate update = new CubeUpdate(cubeCopy);
+                update.setToUpdateSegs(segCopy);
+                updateCube(update);
+
+                // Update the input cubeSeg after the resource store updated
+                cubeSeg.putSnapshotResPath(lookupTable, segCopy.getSnapshotResPath(lookupTable));
+            } else {
+                CubeUpdate cubeUpdate = new CubeUpdate(cubeCopy);
+                Map<String, String> map = Maps.newHashMap();
+                map.put(lookupTable, snapshot.getResourcePath());
+                cubeUpdate.setUpdateTableSnapshotPath(map);
+                updateCube(cubeUpdate);
 
-            try {
-                SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath);
-                TableDesc tableDesc = getTableManager().getTableDesc(tableName, cubeSegment.getProject());
-                return new LookupStringTable(tableDesc, pkCols, snapshot);
-            } catch (IOException e) {
-                throw new IllegalStateException(
-                        "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
+                cubeSeg.getCubeInstance().putSnapshotResPath(lookupTable, snapshot.getResourcePath());
             }
+            return snapshot;
         }
     }
 
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
index 378d082274..62b46a9d22 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
@@ -36,6 +36,7 @@
     private int cost = -1;
     private Map<Long, Long> cuboids = null;
     private Set<Long> cuboidsRecommend = null;
+    private Map<String, String> updateTableSnapshotPath = null;
 
     public CubeUpdate(CubeInstance cubeInstance) {
         setCubeInstance(cubeInstance);
@@ -124,4 +125,12 @@ public CubeUpdate setCuboidsRecommend(Set<Long> cuboidsRecommend) {
         this.cuboidsRecommend = cuboidsRecommend;
         return this;
     }
+
+    public Map<String, String> getUpdateTableSnapshotPath() {
+        return updateTableSnapshotPath;
+    }
+
+    public void setUpdateTableSnapshotPath(Map<String, String> updateTableSnapshotPath) {
+        this.updateTableSnapshotPath = updateTableSnapshotPath;
+    }
 }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index 36c06b7922..2a243702cb 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -77,8 +77,11 @@ private static void processSegment(KylinConfig config, CubeSegment cubeSeg, Dist
         for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) {
             TableRef table = dim.getTableRef();
             if (cubeSeg.getModel().isLookupTable(table)) {
-                toSnapshot.add(table.getTableIdentity());
-                toCheckLookup.add(table);
+                // only the snapshot desc is not ext type, need to take snapshot
+                if (!cubeSeg.getCubeDesc().isExtSnapshotTable(table.getTableIdentity())) {
+                    toSnapshot.add(table.getTableIdentity());
+                    toCheckLookup.add(table);
+                }
             }
         }
 
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 77b808bfde..5b4a13464a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -203,6 +203,9 @@ public String toString() {
     // Error messages during resolving json metadata
     private List<String> errors = new ArrayList<String>();
 
+    @JsonProperty("snapshot_table_desc_list")
+    private List<SnapshotTableDesc> snapshotTableDescList = Collections.emptyList();
+
     private LinkedHashSet<TblColRef> allColumns = new LinkedHashSet<>();
     private LinkedHashSet<ColumnDesc> allColumnDescs = new LinkedHashSet<>();
     private LinkedHashSet<TblColRef> dimensionColumns = new LinkedHashSet<>();
@@ -1371,6 +1374,49 @@ public TblColRef getDistributedByColumn() {
         return null;
     }
 
+    public List<SnapshotTableDesc> getSnapshotTableDescList() {
+        return snapshotTableDescList;
+    }
+
+    public void setSnapshotTableDescList(List<SnapshotTableDesc> snapshotTableDescList) {
+        this.snapshotTableDescList = snapshotTableDescList;
+    }
+
+    public SnapshotTableDesc getSnapshotTableDesc(String tableName) {
+        for (SnapshotTableDesc snapshotTableDesc : snapshotTableDescList) {
+            if (snapshotTableDesc.getTableName().equalsIgnoreCase(tableName)) {
+                return snapshotTableDesc;
+            }
+        }
+        return null;
+    }
+
+    public boolean isGlobalSnapshotTable(String tableName) {
+        SnapshotTableDesc desc = getSnapshotTableDesc(tableName);
+        if (desc == null) {
+            return false;
+        }
+        return desc.isGlobal();
+    }
+
+    public boolean isExtSnapshotTable(String tableName) {
+        SnapshotTableDesc desc = getSnapshotTableDesc(tableName);
+        if (desc == null) {
+            return false;
+        }
+        return desc.isExtSnapshotTable();
+    }
+    
+    public List<String> getAllExtLookupSnapshotTypes() {
+        List<String> result = Lists.newArrayList();
+        for (SnapshotTableDesc snapshotTableDesc : snapshotTableDescList) {
+            if (snapshotTableDesc.isExtSnapshotTable()) {
+                result.add(snapshotTableDesc.getStorageType());
+            }
+        }
+        return result;
+    }
+
     /** Get a column which can be used to cluster the source table.
      * To reduce memory footprint in base cuboid for global dict */
     // TODO handle more than one ultra high cardinality columns use global dict in one cube
@@ -1465,6 +1511,7 @@ public static CubeDesc getCopyOf(CubeDesc cubeDesc) {
         newCubeDesc.setPartitionOffsetStart(cubeDesc.getPartitionOffsetStart());
         newCubeDesc.setVersion(cubeDesc.getVersion());
         newCubeDesc.setParentForward(cubeDesc.getParentForward());
+        newCubeDesc.setSnapshotTableDescList(cubeDesc.getSnapshotTableDescList());
         newCubeDesc.updateRandomUuid();
         return newCubeDesc;
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java
new file mode 100644
index 0000000000..e61240b786
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.model;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+
+@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+public class SnapshotTableDesc implements java.io.Serializable{
+    @JsonProperty("table_name")
+    private String tableName;
+
+    @JsonProperty("storage_type")
+    private String storageType = SnapshotTable.STORAGE_TYPE_METASTORE;
+
+    @JsonProperty("local_cache_enable")
+    private boolean enableLocalCache = true;
+
+    @JsonProperty("global")
+    private boolean global = false;
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
+    public String getStorageType() {
+        return storageType;
+    }
+
+    public void setStorageType(String storageType) {
+        this.storageType = storageType;
+    }
+
+    public boolean isGlobal() {
+        return global;
+    }
+
+    public void setGlobal(boolean global) {
+        this.global = global;
+    }
+
+    public boolean isExtSnapshotTable() {
+        return !SnapshotTable.STORAGE_TYPE_METASTORE.equals(storageType);
+    }
+
+    public boolean isEnableLocalCache() {
+        return enableLocalCache;
+    }
+
+    public void setEnableLocalCache(boolean enableLocalCache) {
+        this.enableLocalCache = enableLocalCache;
+    }
+}
diff --git a/core-dictionary/pom.xml b/core-dictionary/pom.xml
index 7e8bee18a1..d82d204508 100644
--- a/core-dictionary/pom.xml
+++ b/core-dictionary/pom.xml
@@ -38,6 +38,11 @@
             <artifactId>kylin-core-metadata</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.rocksdb</groupId>
+            <artifactId>rocksdbjni</artifactId>
+        </dependency>
+
         <!-- Env & Test -->
         <dependency>
             <groupId>org.apache.kylin</groupId>
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/AbstractLookupRowEncoder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/AbstractLookupRowEncoder.java
new file mode 100644
index 0000000000..5efe129feb
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/AbstractLookupRowEncoder.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.dimension.DimensionEncoding;
+import org.apache.kylin.metadata.model.TableDesc;
+
+/**
+ * Abstract encoder/decoder
+ * 
+ */
+abstract public class AbstractLookupRowEncoder<R> {
+    protected ByteBuffer keyByteBuffer = ByteBuffer.allocate(1024 * 1024);
+
+    protected int columnsNum;
+    protected int[] keyIndexes;
+    protected int[] valueIndexes;
+
+    public AbstractLookupRowEncoder(TableDesc tableDesc, String[] keyColumns) {
+        this.columnsNum = tableDesc.getColumns().length;
+        this.keyIndexes = new int[keyColumns.length];
+        this.valueIndexes = new int[columnsNum - keyColumns.length];
+        int keyIdx = 0;
+        int valIdx = 0;
+        for (int i = 0; i < columnsNum; i++) {
+            boolean isKeyColumn = false;
+            for (String keyColumn : keyColumns) {
+                if (keyColumn.equals(tableDesc.getColumns()[i].getName())) {
+                    isKeyColumn = true;
+                    break;
+                }
+            }
+            if (isKeyColumn) {
+                keyIndexes[keyIdx] = i;
+                keyIdx++;
+            } else {
+                valueIndexes[valIdx] = i;
+                valIdx++;
+            }
+        }
+    }
+
+    abstract public R encode(String[] row);
+
+    abstract public String[] decode(R result);
+
+    public String[] getKeyData(String[] row) {
+        return extractColValues(row, keyIndexes);
+    }
+
+    public String[] getValueData(String[] row) {
+        return extractColValues(row, valueIndexes);
+    }
+
+    public byte[] encodeStringsWithLenPfx(String[] keys, boolean allowNull) {
+        keyByteBuffer.clear();
+        for (String key : keys) {
+            if (key == null && !allowNull) {
+                throw new IllegalArgumentException("key cannot be null:" + Arrays.toString(keys));
+            }
+            byte[] byteKey = toBytes(key);
+            keyByteBuffer.putShort((short) byteKey.length);
+            keyByteBuffer.put(byteKey);
+        }
+        byte[] result = new byte[keyByteBuffer.position()];
+        System.arraycopy(keyByteBuffer.array(), 0, result, 0, keyByteBuffer.position());
+        return result;
+    }
+
+    protected void decodeFromLenPfxBytes(byte[] rowKey, int[] valueIdx, String[] result) {
+        ByteBuffer byteBuffer = ByteBuffer.wrap(rowKey);
+        for (int i = 0; i < valueIdx.length; i++) {
+            short keyLen = byteBuffer.getShort();
+            byte[] keyBytes = new byte[keyLen];
+            byteBuffer.get(keyBytes);
+            result[valueIdx[i]] = fromBytes(keyBytes);
+        }
+    }
+
+    protected String[] extractColValues(String[] row, int[] indexes) {
+        String[] result = new String[indexes.length];
+        int i = 0;
+        for (int idx : indexes) {
+            result[i++] = row[idx];
+        }
+        return result;
+    }
+
+    protected byte[] toBytes(String str) {
+        if (str == null) {
+            return new byte[] { DimensionEncoding.NULL };
+        }
+        return Bytes.toBytes(str);
+    }
+
+    protected String fromBytes(byte[] bytes) {
+        if (DimensionEncoding.isNull(bytes, 0, bytes.length)) {
+            return null;
+        }
+        return Bytes.toString(bytes);
+    }
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ExtTableSnapshotInfo.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ExtTableSnapshotInfo.java
new file mode 100644
index 0000000000..80a2b776fa
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ExtTableSnapshotInfo.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.source.IReadableTable.TableSignature;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@SuppressWarnings("serial")
+@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+public class ExtTableSnapshotInfo extends RootPersistentEntity {
+    public static final String STORAGE_TYPE_HBASE = "hbase";
+
+    @JsonProperty("tableName")
+    private String tableName;
+
+    @JsonProperty("signature")
+    private TableSignature signature;
+
+    @JsonProperty("key_columns")
+    private String[] keyColumns;
+
+    @JsonProperty("storage_type")
+    private String storageType;
+
+    @JsonProperty("storage_location_identifier")
+    private String storageLocationIdentifier;
+
+    @JsonProperty("shard_num")
+    private int shardNum;
+
+    @JsonProperty("row_cnt")
+    private long rowCnt;
+
+    @JsonProperty("last_build_time")
+    private long lastBuildTime;
+
+    // default constructor for JSON serialization
+    public ExtTableSnapshotInfo() {
+    }
+
+    public ExtTableSnapshotInfo(TableSignature signature, String tableName) throws IOException {
+        this.signature = signature;
+        this.tableName = tableName;
+    }
+
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
+    public String getResourcePath() {
+        return getResourcePath(tableName, uuid);
+    }
+
+    public String getResourceDir() {
+        return getResourceDir(tableName);
+    }
+
+    public static String getResourcePath(String tableName, String uuid) {
+        return getResourceDir(tableName) + "/" + uuid + ".snapshot";
+    }
+
+    public static String getResourceDir(String tableName) {
+        return ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT + "/" + tableName;
+    }
+
+    public TableSignature getSignature() {
+        return signature;
+    }
+
+    public String getStorageType() {
+        return storageType;
+    }
+
+    public void setStorageType(String storageType) {
+        this.storageType = storageType;
+    }
+
+    public String getStorageLocationIdentifier() {
+        return storageLocationIdentifier;
+    }
+
+    public void setStorageLocationIdentifier(String storageLocationIdentifier) {
+        this.storageLocationIdentifier = storageLocationIdentifier;
+    }
+
+    public String[] getKeyColumns() {
+        return keyColumns;
+    }
+
+    public void setKeyColumns(String[] keyColumns) {
+        this.keyColumns = keyColumns;
+    }
+
+    public int getShardNum() {
+        return shardNum;
+    }
+
+    public void setShardNum(int shardNum) {
+        this.shardNum = shardNum;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public void setSignature(TableSignature signature) {
+        this.signature = signature;
+    }
+
+    public long getRowCnt() {
+        return rowCnt;
+    }
+
+    public void setRowCnt(long rowCnt) {
+        this.rowCnt = rowCnt;
+    }
+
+    public long getLastBuildTime() {
+        return lastBuildTime;
+    }
+
+    public void setLastBuildTime(long lastBuildTime) {
+        this.lastBuildTime = lastBuildTime;
+    }
+
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ExtTableSnapshotInfoManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ExtTableSnapshotInfoManager.java
new file mode 100644
index 0000000000..e0e2a26523
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ExtTableSnapshotInfoManager.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Sets;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.source.IReadableTable.TableSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+public class ExtTableSnapshotInfoManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(ExtTableSnapshotInfoManager.class);
+    public static Serializer<ExtTableSnapshotInfo> SNAPSHOT_SERIALIZER = new JsonSerializer<>(ExtTableSnapshotInfo.class);
+
+    // static cached instances
+    private static final ConcurrentMap<KylinConfig, ExtTableSnapshotInfoManager> SERVICE_CACHE = new ConcurrentHashMap<KylinConfig, ExtTableSnapshotInfoManager>();
+
+    public static ExtTableSnapshotInfoManager getInstance(KylinConfig config) {
+        ExtTableSnapshotInfoManager r = SERVICE_CACHE.get(config);
+        if (r == null) {
+            synchronized (ExtTableSnapshotInfoManager.class) {
+                r = SERVICE_CACHE.get(config);
+                if (r == null) {
+                    r = new ExtTableSnapshotInfoManager(config);
+                    SERVICE_CACHE.put(config, r);
+                    if (SERVICE_CACHE.size() > 1) {
+                        logger.warn("More than one singleton exist");
+                    }
+                }
+            }
+        }
+        return r;
+    }
+
+    public static void clearCache() {
+        synchronized (SERVICE_CACHE) {
+            SERVICE_CACHE.clear();
+        }
+    }
+
+    // ============================================================================
+
+    private KylinConfig config;
+    private LoadingCache<String, ExtTableSnapshotInfo> snapshotCache; // resource
+
+    private ExtTableSnapshotInfoManager(KylinConfig config) {
+        this.config = config;
+        this.snapshotCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, ExtTableSnapshotInfo>() {
+            @Override
+            public void onRemoval(RemovalNotification<String, ExtTableSnapshotInfo> notification) {
+                ExtTableSnapshotInfoManager.logger.info("Snapshot with resource path " + notification.getKey()
+                        + " is removed due to " + notification.getCause());
+            }
+        }).maximumSize(1000)//
+                .expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader<String, ExtTableSnapshotInfo>() {
+                    @Override
+                    public ExtTableSnapshotInfo load(String key) throws Exception {
+                        ExtTableSnapshotInfo snapshot = ExtTableSnapshotInfoManager.this.load(key);
+                        return snapshot;
+                    }
+                });
+    }
+
+    /**
+     *
+     * @param signature source table signature
+     * @param tableName
+     * @return latest snapshot info
+     * @throws IOException
+     */
+    public ExtTableSnapshotInfo getLatestSnapshot(TableSignature signature, String tableName) throws IOException {
+        ExtTableSnapshotInfo snapshot = new ExtTableSnapshotInfo(signature, tableName);
+        snapshot.updateRandomUuid();
+        ExtTableSnapshotInfo dupSnapshot = checkDupByInfo(snapshot);
+        return dupSnapshot;
+    }
+
+    public ExtTableSnapshotInfo getSnapshot(String snapshotResPath) {
+        try {
+            return snapshotCache.get(snapshotResPath);
+        } catch (ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public ExtTableSnapshotInfo getSnapshot(String tableName, String snapshotID) {
+         return getSnapshot(ExtTableSnapshotInfo.getResourcePath(tableName, snapshotID));
+    }
+
+    public List<ExtTableSnapshotInfo> getSnapshots(String tableName) throws IOException {
+        String tableSnapshotsPath = ExtTableSnapshotInfo.getResourceDir(tableName);
+        ResourceStore store = TableMetadataManager.getInstance(this.config).getStore();
+        return store.getAllResources(tableSnapshotsPath, ExtTableSnapshotInfo.class, SNAPSHOT_SERIALIZER);
+    }
+
+    public Set<String> getAllExtSnapshotResPaths() throws IOException {
+        Set<String> result = Sets.newHashSet();
+        ResourceStore store = TableMetadataManager.getInstance(this.config).getStore();
+        Set<String> snapshotTablePaths = store.listResources(ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT);
+        if (snapshotTablePaths == null) {
+            return result;
+        }
+        for (String snapshotTablePath : snapshotTablePaths) {
+            Set<String> snapshotPaths = store.listResources(snapshotTablePath);
+            if (snapshotPaths != null) {
+                result.addAll(snapshotPaths);
+            }
+        }
+        return result;
+    }
+
+    public void removeSnapshot(String tableName, String snapshotID) throws IOException {
+        String snapshotResPath = ExtTableSnapshotInfo.getResourcePath(tableName, snapshotID);
+        snapshotCache.invalidate(snapshotResPath);
+        ResourceStore store = TableMetadataManager.getInstance(this.config).getStore();
+        store.deleteResource(snapshotResPath);
+    }
+
+    /**
+     * create ext table snapshot
+     * @param signature
+     * @param tableName
+     * @param keyColumns
+     *@param storageType
+     * @param storageLocation   @return created snapshot
+     * @throws IOException
+     */
+    public ExtTableSnapshotInfo createSnapshot(TableSignature signature, String tableName, String snapshotID, String[] keyColumns,
+                                               int shardNum, String storageType, String storageLocation) throws IOException {
+        ExtTableSnapshotInfo snapshot = new ExtTableSnapshotInfo();
+        snapshot.setUuid(snapshotID);
+        snapshot.setSignature(signature);
+        snapshot.setTableName(tableName);
+        snapshot.setKeyColumns(keyColumns);
+        snapshot.setStorageType(storageType);
+        snapshot.setStorageLocationIdentifier(storageLocation);
+        snapshot.setShardNum(shardNum);
+        save(snapshot);
+        return snapshot;
+    }
+
+    public void updateSnapshot(ExtTableSnapshotInfo extTableSnapshot) throws IOException {
+        save(extTableSnapshot);
+        snapshotCache.invalidate(extTableSnapshot.getResourcePath());
+    }
+
+    private ExtTableSnapshotInfo checkDupByInfo(ExtTableSnapshotInfo snapshot) throws IOException {
+        ResourceStore store = TableMetadataManager.getInstance(this.config).getStore();
+        String resourceDir = snapshot.getResourceDir();
+        NavigableSet<String> existings = store.listResources(resourceDir);
+        if (existings == null)
+            return null;
+
+        TableSignature sig = snapshot.getSignature();
+        for (String existing : existings) {
+            ExtTableSnapshotInfo existingSnapshot = load(existing);
+            // direct load from store
+            if (existingSnapshot != null && sig.equals(existingSnapshot.getSignature()))
+                return existingSnapshot;
+        }
+        return null;
+    }
+
+    private ExtTableSnapshotInfo load(String resourcePath) throws IOException {
+        ResourceStore store = TableMetadataManager.getInstance(this.config).getStore();
+        ExtTableSnapshotInfo snapshot = store.getResource(resourcePath, ExtTableSnapshotInfo.class, SNAPSHOT_SERIALIZER);
+
+        return snapshot;
+    }
+
+    public void save(ExtTableSnapshotInfo snapshot) throws IOException {
+        ResourceStore store = TableMetadataManager.getInstance(this.config).getStore();
+        String path = snapshot.getResourcePath();
+        store.putResource(path, snapshot, SNAPSHOT_SERIALIZER);
+    }
+
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupProvider.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupProvider.java
new file mode 100644
index 0000000000..a09a439bf8
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import org.apache.kylin.metadata.model.TableDesc;
+
+public interface IExtLookupProvider {
+    ILookupTable getLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot);
+
+    /**
+     * @return the local cache if the provider has, return null if no local cache exist
+     */
+    IExtLookupTableCache getLocalCache();
+
+    /**
+     * Return an adaptor that implements specified interface as requested by the build engine.
+     * The ILookupMaterializer in particular, is required by the MR build engine.
+     */
+    <I> I adaptToBuildEngine(Class<I> engineInterface);
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupTableCache.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupTableCache.java
new file mode 100644
index 0000000000..f473059002
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupTableCache.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import org.apache.kylin.metadata.model.TableDesc;
+
+public interface IExtLookupTableCache {
+    enum CacheState {NONE, IN_BUILDING, AVAILABLE}
+
+    /**
+     * @param tableDesc
+     * @param extTableSnapshotInfo
+     * @param buildIfNotExist if true, when the cached lookup table not exist, build it.
+     * @return null if no cached lookup table exist
+     */
+    ILookupTable getCachedLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo, boolean buildIfNotExist);
+
+    void buildSnapshotCache(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo, ILookupTable sourceTable);
+
+    void removeSnapshotCache(ExtTableSnapshotInfo extTableSnapshotInfo);
+
+    CacheState getCacheState(ExtTableSnapshotInfo extTableSnapshotInfo);
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java
new file mode 100644
index 0000000000..dccb7c4ff0
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import org.apache.kylin.common.util.Array;
+
+import java.io.Closeable;
+
+public interface ILookupTable extends Iterable<String[]>, Closeable {
+    /**
+     * get row according the key
+     * @param key
+     * @return
+     */
+    String[] getRow(Array<String> key);
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupProviderFactory.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupProviderFactory.java
new file mode 100644
index 0000000000..64ccef557c
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupProviderFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+import org.apache.kylin.dict.lookup.IExtLookupTableCache.CacheState;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+public class LookupProviderFactory {
+    private static final Logger logger = LoggerFactory.getLogger(LookupProviderFactory.class);
+    private static Map<String, String> lookupProviderImplClassMap = Maps.newConcurrentMap();
+
+    static {
+        registerLookupProvider(ExtTableSnapshotInfo.STORAGE_TYPE_HBASE,
+                "org.apache.kylin.storage.hbase.lookup.HBaseLookupProvider");
+    }
+
+    public static void registerLookupProvider(String storageType, String implClassName) {
+        lookupProviderImplClassMap.put(storageType, implClassName);
+    }
+
+    public static IExtLookupProvider getExtLookupProvider(String storageType) {
+        String className = lookupProviderImplClassMap.get(storageType);
+        if (className == null) {
+            throw new IllegalStateException("no implementation class found for storage type:" + storageType);
+        }
+        try {
+            Class clazz = Class.forName(className);
+            Constructor constructor = clazz.getConstructor();
+            return (IExtLookupProvider) constructor.newInstance();
+        } catch (ReflectiveOperationException e) {
+            throw new IllegalStateException("the lookup implementation class is invalid for storage type:"
+                    + storageType, e);
+        }
+    }
+
+    public static ILookupTable getInMemLookupTable(TableDesc tableDesc, String[] pkCols, IReadableTable readableTable)
+            throws IOException {
+        return new LookupStringTable(tableDesc, pkCols, readableTable);
+    }
+
+    public static ILookupTable getExtLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) {
+        IExtLookupTableCache extLookupTableCache = getExtLookupProvider(extTableSnapshot.getStorageType()).getLocalCache();
+        if (extLookupTableCache == null) {
+            return getExtLookupTableWithoutCache(tableDesc, extTableSnapshot);
+        }
+        ILookupTable cachedLookupTable = extLookupTableCache.getCachedLookupTable(tableDesc, extTableSnapshot, true);
+        if (cachedLookupTable != null) {
+            logger.info("try to use cached lookup table:{}", extTableSnapshot.getResourcePath());
+            return cachedLookupTable;
+        }
+        logger.info("use ext lookup table:{}", extTableSnapshot.getResourcePath());
+        return getExtLookupTableWithoutCache(tableDesc, extTableSnapshot);
+    }
+
+    public static ILookupTable getExtLookupTableWithoutCache(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) {
+        IExtLookupProvider provider = getExtLookupProvider(extTableSnapshot.getStorageType());
+        return provider.getLookupTable(tableDesc, extTableSnapshot);
+    }
+
+    public static <T> T createEngineAdapter(String lookupStorageType, Class<T> engineInterface) {
+        IExtLookupProvider provider = getExtLookupProvider(lookupStorageType);
+        return provider.adaptToBuildEngine(engineInterface);
+    }
+
+    public static void rebuildLocalCache(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo) {
+        IExtLookupTableCache tablesCache = getExtLookupProvider(extTableSnapshotInfo.getStorageType()).getLocalCache();
+        if (tablesCache != null) {
+            tablesCache.buildSnapshotCache(tableDesc, extTableSnapshotInfo, getExtLookupTableWithoutCache(tableDesc, extTableSnapshotInfo));
+        }
+    }
+
+    public static void removeLocalCache(ExtTableSnapshotInfo extTableSnapshotInfo) {
+        IExtLookupTableCache tablesCache = getExtLookupProvider(extTableSnapshotInfo.getStorageType()).getLocalCache();
+        if (tablesCache != null) {
+            tablesCache.removeSnapshotCache(extTableSnapshotInfo);
+        }
+    }
+
+    public static CacheState getCacheState(ExtTableSnapshotInfo extTableSnapshotInfo) {
+        IExtLookupTableCache tablesCache = getExtLookupProvider(extTableSnapshotInfo.getStorageType()).getLocalCache();
+        if (tablesCache != null) {
+            return tablesCache.getCacheState(extTableSnapshotInfo);
+        }
+        return CacheState.NONE;
+    }
+
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
index 886de226b1..1d0348a37d 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.util.Comparator;
+import java.util.Iterator;
 
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.metadata.datatype.DataType;
@@ -31,7 +32,7 @@
  * @author yangli9
  * 
  */
-public class LookupStringTable extends LookupTable<String> {
+public class LookupStringTable extends LookupTable<String> implements ILookupTable{
 
     private static final Comparator<String> dateStrComparator = new Comparator<String>() {
         @Override
@@ -109,4 +110,13 @@ protected String toString(String cell) {
         return String.class;
     }
 
+    @Override
+    public Iterator<String[]> iterator() {
+        return data.values().iterator();
+    }
+
+    @Override
+    public void close() throws IOException {
+        // do nothing
+    }
 }
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
index a99ef29b2c..e1123faaf7 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
@@ -22,10 +22,10 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.util.Array;
@@ -53,7 +53,7 @@ public LookupTable(TableDesc tableDesc, String[] keyColumns, IReadableTable tabl
         this.tableDesc = tableDesc;
         this.keyColumns = keyColumns;
         this.table = table;
-        this.data = new ConcurrentHashMap<Array<T>, T[]>();
+        this.data = new HashMap<Array<T>, T[]>();
         init();
     }
 
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index 519280593c..dd90b3303d 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -19,12 +19,15 @@
 package org.apache.kylin.dict.lookup;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.NavigableSet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Lists;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.IReadableTable;
 import org.apache.kylin.source.IReadableTable.TableSignature;
@@ -94,6 +97,18 @@ public SnapshotTable getSnapshotTable(String resourcePath) throws IOException {
         }
     }
 
+    public List<SnapshotTable> getSnapshots(String tableName, TableSignature sourceTableSignature) throws IOException {
+        List<SnapshotTable> result = Lists.newArrayList();
+        String tableSnapshotsPath = SnapshotTable.getResourceDir(tableName);
+        ResourceStore store = TableMetadataManager.getInstance(this.config).getStore();
+        result.addAll(store.getAllResources(tableSnapshotsPath, SnapshotTable.class, SnapshotTableSerializer.INFO_SERIALIZER));
+        if (sourceTableSignature != null) {
+            String oldTableSnapshotsPath = SnapshotTable.getOldResourceDir(sourceTableSignature);
+            result.addAll(store.getAllResources(oldTableSnapshotsPath, SnapshotTable.class, SnapshotTableSerializer.INFO_SERIALIZER));
+        }
+        return result;
+    }
+
     public void removeSnapshot(String resourcePath) throws IOException {
         ResourceStore store = getStore();
         store.deleteResource(resourcePath);
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
index 7bf32b1c8f..6cae22ba10 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
@@ -50,6 +50,7 @@
 @SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
 public class SnapshotTable extends RootPersistentEntity implements IReadableTable {
+    public static final String STORAGE_TYPE_METASTORE = "metaStore";
 
     @JsonProperty("tableName")
     private String tableName;
@@ -57,6 +58,8 @@
     private TableSignature signature;
     @JsonProperty("useDictionary")
     private boolean useDictionary;
+    @JsonProperty("last_build_time")
+    private long lastBuildTime;
 
     private ArrayList<int[]> rowIndices;
     private Dictionary<String> dict;
@@ -71,6 +74,18 @@ public SnapshotTable() {
         this.useDictionary = true;
     }
 
+    public long getLastBuildTime() {
+        return lastBuildTime;
+    }
+
+    public void setLastBuildTime(long lastBuildTime) {
+        this.lastBuildTime = lastBuildTime;
+    }
+
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
     public void takeSnapshot(IReadableTable table, TableDesc tableDesc) throws IOException {
         this.signature = table.getSignature();
 
@@ -121,13 +136,17 @@ public String getResourcePath() {
 
     public String getResourceDir() {
         if (Strings.isNullOrEmpty(tableName)) {
-            return getOldResourceDir();
+            return getOldResourceDir(signature);
         } else {
-            return ResourceStore.SNAPSHOT_RESOURCE_ROOT + "/" + tableName;
+            return getResourceDir(tableName);
         }
     }
 
-    private String getOldResourceDir() {
+    public static String getResourceDir(String tableName) {
+        return ResourceStore.SNAPSHOT_RESOURCE_ROOT + "/" + tableName;
+    }
+
+    public static String getOldResourceDir(TableSignature signature) {
         return ResourceStore.SNAPSHOT_RESOURCE_ROOT + "/" + new File(signature.getPath()).getName();
     }
 
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupBuilder.java
new file mode 100644
index 0000000000..7408e2172e
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupBuilder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import java.io.File;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.cache.RocksDBLookupRowEncoder.KV;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.util.SizeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocksDBLookupBuilder {
+    private static final Logger logger = LoggerFactory.getLogger(RocksDBLookupBuilder.class);
+
+    static {
+        RocksDB.loadLibrary();
+    }
+    private Options options;
+    private String dbPath;
+    private TableDesc tableDesc;
+    private RocksDBLookupRowEncoder encoder;
+    private int writeBatchSize;
+
+    public RocksDBLookupBuilder(TableDesc tableDesc, String[] keyColumns, String dbPath) {
+        this.tableDesc = tableDesc;
+        this.encoder = new RocksDBLookupRowEncoder(tableDesc, keyColumns);
+        this.dbPath = dbPath;
+        this.writeBatchSize = 500;
+        this.options = new Options();
+        options.setCreateIfMissing(true).setWriteBufferSize(8 * SizeUnit.KB).setMaxWriteBufferNumber(3)
+                .setMaxBackgroundCompactions(5).setCompressionType(CompressionType.SNAPPY_COMPRESSION)
+                .setCompactionStyle(CompactionStyle.UNIVERSAL);
+
+    }
+
+    public void build(ILookupTable srcLookupTable) {
+        File dbFolder = new File(dbPath);
+        if (dbFolder.exists()) {
+            logger.info("remove rocksdb folder:{} to rebuild table cache:{}", dbPath, tableDesc.getIdentity());
+            FileUtils.deleteQuietly(dbFolder);
+        } else {
+            logger.info("create new rocksdb folder:{} for table cache:{}", dbPath, tableDesc.getIdentity());
+            dbFolder.mkdirs();
+        }
+        logger.info("start to build lookup table:{} to rocks db:{}", tableDesc.getIdentity(), dbPath);
+        try (RocksDB rocksDB = RocksDB.open(options, dbPath)) {
+            // todo use batch may improve write performance
+            for (String[] row : srcLookupTable) {
+                KV kv = encoder.encode(row);
+                rocksDB.put(kv.getKey(), kv.getValue());
+            }
+        } catch (RocksDBException e) {
+            logger.error("error when put data to rocksDB", e);
+            throw new RuntimeException("error when write data to rocks db", e);
+        }
+
+        logger.info("source table:{} has been written to rocks db:{}", tableDesc.getIdentity(), dbPath);
+    }
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoder.java
new file mode 100644
index 0000000000..7455a7b7ab
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoder.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import org.apache.kylin.dict.lookup.AbstractLookupRowEncoder;
+import org.apache.kylin.dict.lookup.cache.RocksDBLookupRowEncoder.KV;
+import org.apache.kylin.metadata.model.TableDesc;
+
+/**
+ * encode/decode original table row to rocksDB KV
+ * 
+ */
+public class RocksDBLookupRowEncoder extends AbstractLookupRowEncoder<KV>{
+
+    public RocksDBLookupRowEncoder(TableDesc tableDesc, String[] keyColumns) {
+        super(tableDesc, keyColumns);
+    }
+
+    public KV encode(String[] row) {
+        String[] keys = getKeyData(row);
+        String[] values = getValueData(row);
+        byte[] encodeKey = encodeStringsWithLenPfx(keys, false);
+        byte[] encodeValue = encodeStringsWithLenPfx(values, true);
+
+        return new KV(encodeKey, encodeValue);
+    }
+
+    public String[] decode(KV kv) {
+        String[] result = new String[columnsNum];
+
+        decodeFromLenPfxBytes(kv.key, keyIndexes, result);
+        decodeFromLenPfxBytes(kv.value, valueIndexes, result);
+
+        return result;
+    }
+
+    public static class KV {
+        private byte[] key;
+        private byte[] value;
+
+        public KV(byte[] key, byte[] value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        public byte[] getKey() {
+            return key;
+        }
+
+        public byte[] getValue() {
+            return value;
+        }
+    }
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTable.java
new file mode 100644
index 0000000000..4f0c8166da
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTable.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.cache.RocksDBLookupRowEncoder.KV;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocksDBLookupTable implements ILookupTable {
+    private static final Logger logger = LoggerFactory.getLogger(RocksDBLookupTable.class);
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    private TableDesc tableDesc;
+    private RocksDB rocksDB;
+    private Options options;
+
+    private RocksDBLookupRowEncoder rowEncoder;
+
+    public RocksDBLookupTable(TableDesc tableDesc, String[] keyColumns, String dbPath) {
+        this.tableDesc = tableDesc;
+        this.options = new Options();
+        this.rowEncoder = new RocksDBLookupRowEncoder(tableDesc, keyColumns);
+        try {
+            this.rocksDB = RocksDB.openReadOnly(options, dbPath);
+        } catch (RocksDBException e) {
+            throw new RuntimeException("cannot open rocks db in path:" + dbPath, e);
+        }
+    }
+
+    @Override
+    public String[] getRow(Array<String> key) {
+        byte[] encodeKey = rowEncoder.encodeStringsWithLenPfx(key.data, false);
+        try {
+            byte[] value = rocksDB.get(encodeKey);
+            if (value == null) {
+                return null;
+            }
+            return rowEncoder.decode(new KV(encodeKey, value));
+        } catch (RocksDBException e) {
+            throw new RuntimeException("error when get key from rocksdb", e);
+        }
+    }
+
+    @Override
+    public Iterator<String[]> iterator() {
+        final RocksIterator rocksIterator = rocksDB.newIterator();
+        rocksIterator.seekToFirst();
+
+        return new Iterator<String[]>() {
+            int counter;
+            @Override
+            public boolean hasNext() {
+                boolean valid = rocksIterator.isValid();
+                if (!valid) {
+                    rocksIterator.close();
+                }
+                return valid;
+            }
+
+            @Override
+            public String[] next() {
+                counter ++;
+                if (counter % 100000 == 0) {
+                    logger.info("scanned {} rows from rocksDB", counter);
+                }
+                String[] result = rowEncoder.decode(new KV(rocksIterator.key(), rocksIterator.value()));
+                rocksIterator.next();
+                return result;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException("not support operation");
+            }
+        };
+    }
+
+    @Override
+    public void close() throws IOException {
+        options.close();
+        if (rocksDB != null) {
+            rocksDB.close();
+        }
+    }
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java
new file mode 100644
index 0000000000..559c43541c
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.dict.lookup.IExtLookupTableCache;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.LookupProviderFactory;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Predicate;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.Weigher;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+
+public class RocksDBLookupTableCache implements IExtLookupTableCache {
+    private static final Logger logger = LoggerFactory.getLogger(RocksDBLookupTableCache.class);
+
+    private static final String CACHE_TYPE_ROCKSDB = "rocksdb";
+    private static final String STATE_FILE = "STATE";
+    private static final String DB_FILE = "db";
+
+    private String basePath;
+    private long maxCacheSizeInKB;
+    private Cache<String, CachedTableInfo> tablesCache;
+
+    private ConcurrentMap<String, Boolean> inBuildingTables = Maps.newConcurrentMap();
+
+    private ExecutorService cacheBuildExecutor;
+    private ScheduledExecutorService cacheStateCheckExecutor;
+    private CacheStateChecker cacheStateChecker;
+
+    // static cached instances
+    private static final ConcurrentMap<KylinConfig, RocksDBLookupTableCache> SERVICE_CACHE = new ConcurrentHashMap<>();
+
+    public static RocksDBLookupTableCache getInstance(KylinConfig config) {
+        RocksDBLookupTableCache r = SERVICE_CACHE.get(config);
+        if (r == null) {
+            synchronized (RocksDBLookupTableCache.class) {
+                r = SERVICE_CACHE.get(config);
+                if (r == null) {
+                    r = new RocksDBLookupTableCache(config);
+                    SERVICE_CACHE.put(config, r);
+                    if (SERVICE_CACHE.size() > 1) {
+                        logger.warn("More than one singleton exist");
+                    }
+                }
+            }
+        }
+        return r;
+    }
+
+    public static void clearCache() {
+        synchronized (SERVICE_CACHE) {
+            SERVICE_CACHE.clear();
+        }
+    }
+
+    // ============================================================================
+
+    private KylinConfig config;
+
+    private RocksDBLookupTableCache(KylinConfig config) {
+        this.config = config;
+        init();
+    }
+
+    private void init() {
+        this.basePath = getCacheBasePath(config);
+
+        this.maxCacheSizeInKB = (long) (config.getExtTableSnapshotLocalCacheMaxSizeGB() * 1024 * 1024);
+        this.tablesCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, CachedTableInfo>() {
+            @Override
+            public void onRemoval(RemovalNotification<String, CachedTableInfo> notification) {
+                logger.warn(notification.getValue() + " is removed " + "because of " + notification.getCause());
+                notification.getValue().cleanStorage();
+            }
+        }).maximumWeight(maxCacheSizeInKB).weigher(new Weigher<String, CachedTableInfo>() {
+            @Override
+            public int weigh(String key, CachedTableInfo value) {
+                return value.getSizeInKB();
+            }
+        }).build();
+        restoreCacheState();
+        cacheStateChecker = new CacheStateChecker();
+        initExecutors();
+    }
+
+    protected static String getCacheBasePath(KylinConfig config) {
+        String basePath = config.getExtTableSnapshotLocalCachePath();
+        if ((!basePath.startsWith("/")) && (KylinConfig.getKylinHome() != null)) {
+            basePath = KylinConfig.getKylinHome() + File.separator + basePath;
+        }
+        return basePath + File.separator + CACHE_TYPE_ROCKSDB;
+    }
+
+    private void restoreCacheState() {
+        File dbBaseFolder = new File(basePath);
+        if (!dbBaseFolder.exists()) {
+            dbBaseFolder.mkdirs();
+        }
+        Map<String, File[]> tableSnapshotsFileMap = getCachedTableSnapshotsFolders(dbBaseFolder);
+        for (Entry<String, File[]> tableSnapshotsEntry : tableSnapshotsFileMap.entrySet()) {
+            for (File snapshotFolder : tableSnapshotsEntry.getValue()) {
+                initSnapshotState(tableSnapshotsEntry.getKey(), snapshotFolder);
+            }
+        }
+    }
+
+    private Map<String, File[]> getCachedTableSnapshotsFolders(File dbBaseFolder) {
+        Map<String, File[]> result = Maps.newHashMap();
+        File[] tableFolders = dbBaseFolder.listFiles(new FileFilter() {
+            @Override
+            public boolean accept(File file) {
+                return file.isDirectory();
+            }
+        });
+        if (tableFolders == null) {
+            return result;
+        }
+
+        for (File tableFolder : tableFolders) {
+            String tableName = tableFolder.getName();
+            File[] snapshotFolders = tableFolder.listFiles(new FileFilter() {
+                @Override
+                public boolean accept(File snapshotFile) {
+                    return snapshotFile.isDirectory();
+                }
+            });
+            result.put(tableName, snapshotFolders);
+        }
+        return result;
+    }
+
+    private void initSnapshotState(String tableName, File snapshotCacheFolder) {
+        String snapshotID = snapshotCacheFolder.getName();
+        File stateFile = getCacheStateFile(snapshotCacheFolder.getAbsolutePath());
+        if (stateFile.exists()) {
+            try {
+                String stateStr = Files.toString(stateFile, Charsets.UTF_8);
+                String resourcePath = ExtTableSnapshotInfo.getResourcePath(tableName, snapshotID);
+                if (CacheState.AVAILABLE.name().equals(stateStr)) {
+                    tablesCache.put(resourcePath, new CachedTableInfo(snapshotCacheFolder.getAbsolutePath()));
+                }
+            } catch (IOException e) {
+                logger.error("error to read state file:" + stateFile.getAbsolutePath());
+            }
+        }
+    }
+
+    private void initExecutors() {
+        this.cacheBuildExecutor = new ThreadPoolExecutor(0, 50, 60L, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("lookup-cache-build-thread"));
+        this.cacheStateCheckExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(
+                "lookup-cache-state-checker"));
+        cacheStateCheckExecutor.scheduleAtFixedRate(cacheStateChecker, 10, 10 * 60, TimeUnit.SECONDS); // check every 10 minutes
+    }
+
+    @Override
+    public ILookupTable getCachedLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo, boolean buildIfNotExist) {
+        String resourcePath = extTableSnapshotInfo.getResourcePath();
+        if (inBuildingTables.containsKey(resourcePath)) {
+            logger.info("cache is in building for snapshot:" + resourcePath);
+            return null;
+        }
+        CachedTableInfo cachedTableInfo = tablesCache.getIfPresent(resourcePath);
+        if (cachedTableInfo == null) {
+            if (buildIfNotExist) {
+                buildSnapshotCache(tableDesc, extTableSnapshotInfo, getSourceLookupTable(tableDesc, extTableSnapshotInfo));
+            }
+            logger.info("no available cache ready for the table snapshot:" + extTableSnapshotInfo.getResourcePath());
+            return null;
+        }
+
+        String[] keyColumns = extTableSnapshotInfo.getKeyColumns();
+        String dbPath = getSnapshotStorePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId());
+        return new RocksDBLookupTable(tableDesc, keyColumns, dbPath);
+    }
+
+    private ILookupTable getSourceLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo) {
+        return LookupProviderFactory.getExtLookupTableWithoutCache(tableDesc, extTableSnapshotInfo);
+    }
+
+    @Override
+    public void buildSnapshotCache(final TableDesc tableDesc, final ExtTableSnapshotInfo extTableSnapshotInfo, final ILookupTable sourceTable) {
+        if (extTableSnapshotInfo.getSignature().getSize() / 1024 > maxCacheSizeInKB * 2 / 3) {
+            logger.warn("the size is to large to build to cache for snapshot:{}, size:{}, skip cache building",
+                    extTableSnapshotInfo.getResourcePath(), extTableSnapshotInfo.getSignature().getSize());
+            return;
+        }
+        final String[] keyColumns = extTableSnapshotInfo.getKeyColumns();
+        final String cachePath = getSnapshotCachePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId());
+        final String dbPath = getSnapshotStorePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId());
+        final String snapshotResPath = extTableSnapshotInfo.getResourcePath();
+
+        if (inBuildingTables.containsKey(snapshotResPath)) {
+            logger.info("there is already snapshot cache in building for snapshot:{}, skip it", snapshotResPath);
+            return;
+        }
+        if (inBuildingTables.putIfAbsent(snapshotResPath, true) == null) {
+            cacheBuildExecutor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        RocksDBLookupBuilder builder = new RocksDBLookupBuilder(tableDesc, keyColumns, dbPath);
+                        builder.build(sourceTable);
+                        saveSnapshotCacheState(extTableSnapshotInfo, cachePath);
+                    } catch (Exception e) {
+                        logger.error("error when build snapshot cache", e);
+                    } finally {
+                        inBuildingTables.remove(snapshotResPath);
+                    }
+                }
+            });
+
+        } else {
+            logger.info("there is already snapshot cache in building for snapshot:{}, skip it", snapshotResPath);
+        }
+    }
+
+    @Override
+    public void removeSnapshotCache(ExtTableSnapshotInfo extTableSnapshotInfo) {
+        tablesCache.invalidate(extTableSnapshotInfo.getResourcePath());
+    }
+
+    @Override
+    public CacheState getCacheState(ExtTableSnapshotInfo extTableSnapshotInfo) {
+        String resourcePath = extTableSnapshotInfo.getResourcePath();
+        if (inBuildingTables.containsKey(resourcePath)) {
+            return CacheState.IN_BUILDING;
+        }
+        File stateFile = getCacheStateFile(getSnapshotCachePath(extTableSnapshotInfo.getTableName(),
+                extTableSnapshotInfo.getId()));
+        if (!stateFile.exists()) {
+            return CacheState.NONE;
+        }
+        try {
+            String stateString = Files.toString(stateFile, Charsets.UTF_8);
+            return CacheState.valueOf(stateString);
+        } catch (IOException e) {
+            logger.error("error when read state file", e);
+        }
+        return CacheState.NONE;
+    }
+
+    public long getTotalCacheSize() {
+        return FileUtils.sizeOfDirectory(new File(getCacheBasePath(config)));
+    }
+
+    public void checkCacheState() {
+        cacheStateChecker.run();
+    }
+
+    private void saveSnapshotCacheState(ExtTableSnapshotInfo extTableSnapshotInfo, String cachePath) {
+        File stateFile = getCacheStateFile(getSnapshotCachePath(extTableSnapshotInfo.getTableName(),
+                extTableSnapshotInfo.getId()));
+        try {
+            Files.write(CacheState.AVAILABLE.name(), stateFile, Charsets.UTF_8);
+            tablesCache.put(extTableSnapshotInfo.getResourcePath(), new CachedTableInfo(cachePath));
+        } catch (IOException e) {
+            throw new RuntimeException("error when write cache state for snapshot:"
+                    + extTableSnapshotInfo.getResourcePath());
+        }
+    }
+
+    private File getCacheStateFile(String snapshotCacheFolder) {
+        String stateFilePath = snapshotCacheFolder + File.separator + STATE_FILE;
+        return new File(stateFilePath);
+    }
+
+    protected String getSnapshotStorePath(String tableName, String snapshotID) {
+        return getSnapshotCachePath(tableName, snapshotID) + File.separator + DB_FILE;
+    }
+
+    protected String getSnapshotCachePath(String tableName, String snapshotID) {
+        return basePath + File.separator + tableName + File.separator + snapshotID;
+    }
+
+    private class CacheStateChecker implements Runnable {
+
+        @Override
+        public void run() {
+            try {
+                String cacheBasePath = getCacheBasePath(config);
+                logger.info("check snapshot local cache state, local path:{}", cacheBasePath);
+                File baseFolder = new File(cacheBasePath);
+                if (!baseFolder.exists()) {
+                    return;
+                }
+                Map<String, File[]> tableSnapshotsFileMap = getCachedTableSnapshotsFolders(baseFolder);
+                List<Pair<String, File>> allCachedSnapshots = Lists.newArrayList();
+                for (Entry<String, File[]> tableSnapshotsEntry : tableSnapshotsFileMap.entrySet()) {
+                    String tableName = tableSnapshotsEntry.getKey();
+                    for (File file : tableSnapshotsEntry.getValue()) {
+                        String snapshotID = file.getName();
+                        allCachedSnapshots
+                                .add(new Pair<>(ExtTableSnapshotInfo.getResourcePath(tableName, snapshotID), file));
+                    }
+                }
+
+                final Set<String> activeSnapshotSet = ExtTableSnapshotInfoManager.getInstance(config).getAllExtSnapshotResPaths();
+
+                List<Pair<String, File>> toRemovedCachedSnapshots = Lists.newArrayList(FluentIterable.from(
+                        allCachedSnapshots).filter(new Predicate<Pair<String, File>>() {
+                    @Override
+                    public boolean apply(@Nullable Pair<String, File> input) {
+                        return !activeSnapshotSet.contains(input.getFirst());
+                    }
+                }));
+                for (Pair<String, File> toRemovedCachedSnapshot : toRemovedCachedSnapshots) {
+                    File snapshotCacheFolder = toRemovedCachedSnapshot.getSecond();
+                    logger.info("removed cache file:{}, it is not referred by any cube",
+                            snapshotCacheFolder.getAbsolutePath());
+                    try {
+                        FileUtils.deleteDirectory(snapshotCacheFolder);
+                    } catch (IOException e) {
+                        logger.error("fail to remove folder:" + snapshotCacheFolder.getAbsolutePath(), e);
+                    }
+                    tablesCache.invalidate(toRemovedCachedSnapshot.getFirst());
+                }
+            } catch (Exception e) {
+                logger.error("error happens when check cache state", e);
+            }
+
+        }
+    }
+
+    private static class CachedTableInfo {
+        private String cachePath;
+
+        private long dbSize;
+
+        public CachedTableInfo(String cachePath) {
+            this.cachePath = cachePath;
+            this.dbSize = FileUtils.sizeOfDirectory(new File(cachePath));
+        }
+
+        public int getSizeInKB() {
+            return (int) (dbSize / 1024);
+        }
+
+        public void cleanStorage() {
+            logger.info("clean cache storage for path:" + cachePath);
+            try {
+                FileUtils.deleteDirectory(new File(cachePath));
+            } catch (IOException e) {
+                logger.error("file delete fail:" + cachePath, e);
+            }
+        }
+    }
+
+    /**
+     * A simple named thread factory.
+     */
+    private static class NamedThreadFactory implements ThreadFactory {
+        private final ThreadGroup group;
+        private final AtomicInteger threadNumber = new AtomicInteger(1);
+        private final String namePrefix;
+
+        public NamedThreadFactory(String threadPrefix) {
+            final SecurityManager s = System.getSecurityManager();
+            this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+            this.namePrefix = threadPrefix + "-";
+        }
+
+        @Override
+        public Thread newThread(Runnable r) {
+            final Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
+            t.setDaemon(true);
+            return t;
+        }
+    }
+}
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoderTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoderTest.java
new file mode 100644
index 0000000000..ff5fdff746
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoderTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.dict.lookup.cache.RocksDBLookupRowEncoder.KV;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RocksDBLookupRowEncoderTest extends LocalFileMetadataTestCase {
+    private TableDesc tableDesc;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        TableMetadataManager metadataManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        tableDesc = metadataManager.getTableDesc("TEST_COUNTRY", "default");
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testEnDeCode() {
+        RocksDBLookupRowEncoder lookupRowEncoder = new RocksDBLookupRowEncoder(tableDesc, new String[] { "COUNTRY" });
+        String[] row = new String[] { "AD", "42.546245", "1.601554", "Andorra" };
+        KV kv = lookupRowEncoder.encode(row);
+
+        String[] decodeRow = lookupRowEncoder.decode(kv);
+        assertArrayEquals(row, decodeRow);
+    }
+
+    @Test
+    public void testEnDeCodeWithNullValue() {
+        RocksDBLookupRowEncoder lookupRowEncoder = new RocksDBLookupRowEncoder(tableDesc, new String[] { "COUNTRY" });
+        String[] row = new String[] { "AD", "42.546245", "1.601554", null };
+        KV kv = lookupRowEncoder.encode(row);
+
+        String[] decodeRow = lookupRowEncoder.decode(kv);
+        assertNull(decodeRow[3]);
+        assertArrayEquals(row, decodeRow);
+    }
+
+    @Test
+    public void testEnDeCodeWithMultiKeys() {
+        RocksDBLookupRowEncoder lookupRowEncoder = new RocksDBLookupRowEncoder(tableDesc, new String[] { "COUNTRY",
+                "NAME" });
+        String[] row = new String[] { "AD", "42.546245", "1.601554", "Andorra" };
+        KV kv = lookupRowEncoder.encode(row);
+
+        String[] decodeRow = lookupRowEncoder.decode(kv);
+        assertArrayEquals(row, decodeRow);
+    }
+
+}
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCacheTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCacheTest.java
new file mode 100644
index 0000000000..fb78591861
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCacheTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.dict.lookup.IExtLookupProvider;
+import org.apache.kylin.dict.lookup.IExtLookupTableCache;
+import org.apache.kylin.dict.lookup.IExtLookupTableCache.CacheState;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.LookupProviderFactory;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable.TableSignature;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+/**
+ */
+public class RocksDBLookupTableCacheTest extends LocalFileMetadataTestCase {
+    private static final String TABLE_COUNTRY = "DEFAULT.TEST_COUNTRY";
+    private static final String MOCK_EXT_LOOKUP = "mock";
+    private TableDesc tableDesc;
+    private KylinConfig kylinConfig;
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+        TableMetadataManager metadataManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        kylinConfig = getTestConfig();
+        tableDesc = metadataManager.getTableDesc(TABLE_COUNTRY, "default");
+        cleanCache();
+        LookupProviderFactory.registerLookupProvider(MOCK_EXT_LOOKUP, MockedLookupProvider.class.getName());
+    }
+
+    private void cleanCache() {
+        FileUtils.deleteQuietly(new File(kylinConfig.getExtTableSnapshotLocalCachePath()));
+    }
+
+    @After
+    public void tearDown() {
+        cleanupTestMetadata();
+        cleanCache();
+    }
+
+    @Test
+    public void testBuildTableCache() throws Exception {
+        String snapshotID = UUID.randomUUID().toString();
+        ExtTableSnapshotInfo snapshotInfo = buildSnapshotCache(snapshotID, 100000);
+        assertEquals(CacheState.AVAILABLE, RocksDBLookupTableCache.getInstance(kylinConfig).getCacheState(snapshotInfo));
+    }
+
+    private ExtTableSnapshotInfo buildSnapshotCache(String snapshotID, int rowCnt) throws Exception {
+        ExtTableSnapshotInfo snapshotInfo = new ExtTableSnapshotInfo();
+        snapshotInfo.setTableName(TABLE_COUNTRY);
+        snapshotInfo.setUuid(snapshotID);
+        snapshotInfo.setStorageType(MOCK_EXT_LOOKUP);
+        snapshotInfo.setKeyColumns(new String[] { "COUNTRY" });
+        snapshotInfo.setRowCnt(rowCnt);
+        snapshotInfo.setSignature(new TableSignature("/test", rowCnt, System.currentTimeMillis()));
+
+        ExtTableSnapshotInfoManager.getInstance(kylinConfig).save(snapshotInfo);
+        RocksDBLookupTableCache cache = RocksDBLookupTableCache.getInstance(kylinConfig);
+        cache.buildSnapshotCache(tableDesc, snapshotInfo, getLookupTableWithRandomData(rowCnt));
+
+        while (cache.getCacheState(snapshotInfo) == CacheState.IN_BUILDING) {
+            Thread.sleep(500);
+        }
+        return snapshotInfo;
+    }
+
+    @Test
+    public void testRestoreCacheFromFiles() throws Exception {
+        String snapshotID = UUID.randomUUID().toString();
+        String snapshotCacheBasePath = RocksDBLookupTableCache.getCacheBasePath(kylinConfig) + File.separator
+                + TABLE_COUNTRY + File.separator + snapshotID;
+        String dbPath = snapshotCacheBasePath + File.separator + "db";
+        RocksDBLookupBuilder builder = new RocksDBLookupBuilder(tableDesc, new String[] { "COUNTRY" }, dbPath);
+        builder.build(getLookupTableWithRandomData(10000));
+        String stateFilePath = snapshotCacheBasePath + File.separator + "STATE";
+        Files.write(CacheState.AVAILABLE.name(), new File(stateFilePath), Charsets.UTF_8);
+
+        RocksDBLookupTableCache cache = RocksDBLookupTableCache.getInstance(kylinConfig);
+        ExtTableSnapshotInfo snapshotInfo = new ExtTableSnapshotInfo();
+        snapshotInfo.setTableName(TABLE_COUNTRY);
+        snapshotInfo.setUuid(snapshotID);
+        snapshotInfo.setStorageType(MOCK_EXT_LOOKUP);
+        snapshotInfo.setKeyColumns(new String[] { "COUNTRY" });
+        ILookupTable lookupTable = cache.getCachedLookupTable(tableDesc, snapshotInfo, false);
+        int rowCnt = 0;
+        for (String[] strings : lookupTable) {
+            rowCnt++;
+        }
+        lookupTable.close();
+        assertEquals(10000, rowCnt);
+    }
+
+    @Test
+    public void testEvict() throws Exception {
+        kylinConfig.setProperty("kylin.snapshot.ext.local.cache.max-size-gb", "0.005");
+        int snapshotNum = 10;
+        int snapshotRowCnt = 100000;
+        for (int i = 0; i < snapshotNum; i++) {
+            buildSnapshotCache(UUID.randomUUID().toString(), snapshotRowCnt);
+        }
+        assertTrue(RocksDBLookupTableCache.getInstance(kylinConfig).getTotalCacheSize() < 0.006 * 1024 * 1024 * 1024);
+    }
+
+    @Test
+    public void testCheckCacheState() throws Exception {
+        ExtTableSnapshotInfo snapshotInfo = buildSnapshotCache(UUID.randomUUID().toString(), 1000);
+        RocksDBLookupTableCache cache = RocksDBLookupTableCache.getInstance(kylinConfig);
+        ILookupTable cachedLookupTable = cache.getCachedLookupTable(tableDesc, snapshotInfo, false);
+        assertNotNull(cachedLookupTable);
+        cachedLookupTable.close();
+
+        ExtTableSnapshotInfoManager.getInstance(kylinConfig).removeSnapshot(snapshotInfo.getTableName(), snapshotInfo.getId());
+        cache.checkCacheState();
+        String cacheLocalPath = cache.getSnapshotCachePath(snapshotInfo.getTableName(), snapshotInfo.getId());
+        assertFalse(new File(cacheLocalPath).exists());
+        cachedLookupTable = cache.getCachedLookupTable(tableDesc, snapshotInfo, false);
+        assertNull(cachedLookupTable);
+    }
+
+    public static class MockedLookupProvider implements IExtLookupProvider {
+
+        @Override
+        public ILookupTable getLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) {
+            return getLookupTableWithRandomData(extTableSnapshot.getRowCnt());
+        }
+
+        @Override
+        public IExtLookupTableCache getLocalCache() {
+            return null;
+        }
+
+        @Override
+        public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+            return null;
+        }
+    }
+
+    private static ILookupTable getLookupTableWithRandomData(final long rowNum) {
+        return new ILookupTable() {
+            Random random = new Random();
+
+            @Override
+            public String[] getRow(Array<String> key) {
+                return new String[0];
+            }
+
+            @Override
+            public void close() throws IOException {
+
+            }
+
+            @Override
+            public Iterator<String[]> iterator() {
+                return new Iterator<String[]>() {
+                    private int iterCnt = 0;
+
+                    @Override
+                    public boolean hasNext() {
+                        return iterCnt < rowNum;
+                    }
+
+                    @Override
+                    public String[] next() {
+                        iterCnt++;
+                        return genRandomRow(iterCnt);
+                    }
+
+                    @Override
+                    public void remove() {
+
+                    }
+                };
+            }
+
+            private String[] genRandomRow(int id) {
+                return new String[] { "keyyyyy" + id, String.valueOf(random.nextDouble()),
+                        String.valueOf(random.nextDouble()), "Andorra" + id };
+            }
+        };
+    }
+
+}
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableTest.java
new file mode 100644
index 0000000000..d5bbd40dda
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class RocksDBLookupTableTest extends LocalFileMetadataTestCase {
+
+    private TableDesc tableDesc;
+    private RocksDBLookupTable lookupTable;
+    private Random random;
+    private int sourceRowNum;
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+        TableMetadataManager metadataManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        this.random = new Random();
+        tableDesc = metadataManager.getTableDesc("TEST_COUNTRY", "default");
+        sourceRowNum = 10000;
+        genTestData();
+        lookupTable = new RocksDBLookupTable(tableDesc, new String[] { "COUNTRY" }, "lookup_cache/TEST_COUNTRY");
+    }
+
+    private void genTestData() {
+        removeTestDataIfExists();
+        File file = new File("lookup_cache/TEST_COUNTRY");
+        file.mkdirs();
+        RocksDBLookupBuilder builder = new RocksDBLookupBuilder(tableDesc, new String[] { "COUNTRY" },
+                "lookup_cache/TEST_COUNTRY");
+        long start = System.currentTimeMillis();
+        builder.build(getLookupTableWithRandomData(sourceRowNum));
+        long take = System.currentTimeMillis() - start;
+        System.out.println("take:" + take + " ms to complete build");
+    }
+
+    private void removeTestDataIfExists() {
+        FileUtils.deleteQuietly(new File("lookup_cache"));
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        cleanupTestMetadata();
+        removeTestDataIfExists();
+        lookupTable.close();
+    }
+
+    @Test
+    public void testIterator() throws Exception {
+        System.out.println("start iterator table");
+        long start = System.currentTimeMillis();
+        Iterator<String[]> iter = lookupTable.iterator();
+        int count = 0;
+        while (iter.hasNext()) {
+            iter.next();
+            count++;
+        }
+        long take = System.currentTimeMillis() - start;
+        System.out.println("scan " + count + " rows, take " + take + " ms");
+
+    }
+
+    @Test
+    public void testGet() throws Exception {
+        int getNum = 3000;
+        List<String[]> keys = Lists.newArrayList();
+        for (int i = 0; i < getNum; i++) {
+            String[] keyi = new String[] { "keyyyyy" + random.nextInt(sourceRowNum) };
+            keys.add(keyi);
+        }
+
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < getNum; i++) {
+            String[] row = lookupTable.getRow(new Array<>(keys.get(i)));
+            if (row == null) {
+                System.out.println("null value for key:" + Arrays.toString(keys.get(i)));
+            }
+        }
+        long take = System.currentTimeMillis() - start;
+        System.out.println("muliti get " + getNum + " rows, take " + take + " ms");
+    }
+
+    private ILookupTable getLookupTableWithRandomData(final int rowNum) {
+        return new ILookupTable() {
+            @Override
+            public String[] getRow(Array<String> key) {
+                return new String[0];
+            }
+
+            @Override
+            public void close() throws IOException {
+
+            }
+
+            @Override
+            public Iterator<String[]> iterator() {
+                return new Iterator<String[]>() {
+                    private int iterCnt = 0;
+
+                    @Override
+                    public boolean hasNext() {
+                        return iterCnt < rowNum;
+                    }
+
+                    @Override
+                    public String[] next() {
+                        iterCnt++;
+                        return genRandomRow(iterCnt);
+                    }
+
+                    @Override
+                    public void remove() {
+
+                    }
+                };
+            }
+        };
+    }
+
+    private String[] genRandomRow(int id) {
+        return new String[] { "keyyyyy" + id, String.valueOf(random.nextDouble()), String.valueOf(random.nextDouble()),
+                "Andorra" + id };
+    }
+
+}
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index b9a3651837..7b3c5a3736 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -62,4 +62,9 @@ private ExecutableConstants() {
     public static final String STEP_NAME_GARBAGE_COLLECTION_HBASE = "Garbage Collection on HBase";
     public static final String STEP_NAME_GARBAGE_COLLECTION_HDFS = "Garbage Collection on HDFS";
     public static final String STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE = "Redistribute Flat Hive Table";
+    public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_CONVERT_HFILE = "Convert Lookup Table to HFile";
+    public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_BULK_LOAD = "Load HFile to HBase Table";
+    public static final String STEP_NAME_LOOKUP_SNAPSHOT_CACHE_UPDATE = "Update Lookup Snapshot Cache to Query Engine";
+    public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_META_STORE = "Take Snapshot to Metadata Store";
+    public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_UPDATE_CUBE = "Update Cube Info";
 }
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index dbe11c2ea3..1a534e110b 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -399,10 +399,14 @@ public static long getExtraInfoAsLong(Output output, String key, long defaultVal
         }
     }
 
-    protected final void addExtraInfo(String key, String value) {
+    public final void addExtraInfo(String key, String value) {
         getManager().addJobInfo(getId(), key, value);
     }
 
+    public final String getExtraInfo(String key) {
+        return getExtraInfo().get(key);
+    }
+
     protected final Map<String, String> getExtraInfo() {
         return getOutput().getExtra();
     }
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 404db54110..2297be7855 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.job.execution;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -203,4 +205,28 @@ private boolean retryFetchTaskStatus(Executable task) {
     public int getDefaultPriority() {
         return DEFAULT_PRIORITY;
     }
+
+    public String findExtraInfo(String key, String dft) {
+        return findExtraInfo(key, dft, false);
+    }
+
+    public String findExtraInfoBackward(String key, String dft) {
+        return findExtraInfo(key, dft, true);
+    }
+
+    private String findExtraInfo(String key, String dft, boolean backward) {
+        ArrayList<AbstractExecutable> tasks = new ArrayList<AbstractExecutable>(getTasks());
+
+        if (backward) {
+            Collections.reverse(tasks);
+        }
+
+        for (AbstractExecutable child : tasks) {
+            Output output = getManager().getOutput(child.getId());
+            String value = output.getExtra().get(key);
+            if (value != null)
+                return value;
+        }
+        return dft;
+    }
 }
diff --git a/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java b/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
index 09f5a7a93c..ddeeafa565 100644
--- a/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
@@ -36,7 +36,7 @@ public SelfStopExecutable() {
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         doingWork = true;
         try {
-            for (int i = 0; i < 20; i++) {
+            for (int i = 0; i < 60; i++) {
                 sleepOneSecond();
                 
                 if (isDiscarded())
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
index 2e4f4008e4..ad311d99be 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
@@ -22,6 +22,8 @@
  */
 public class IRealizationConstants {
 
+    public final static String LookupHbaseStorageLocationPrefix = "LOOKUP_";
+
     /**
      * For each cube htable, we leverage htable's metadata to keep track of
      * which kylin server(represented by its kylin_metadata prefix) owns this htable
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index 8af9e3f798..05fc90f2df 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -19,30 +19,26 @@
 package org.apache.kylin.storage.gtrecord;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
-import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.dict.lookup.SnapshotManager;
-import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.dict.lookup.ILookupTable;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
-import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.source.IReadableTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +63,7 @@
 
     public final List<IAdvMeasureFiller> advMeasureFillers;
     public final List<Integer> advMeasureIndexInGTValues;
+    private List<ILookupTable> usedLookupTables;
 
     public final int nSelectedDims;
 
@@ -86,6 +83,7 @@ public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, //
 
         advMeasureFillers = Lists.newArrayListWithCapacity(1);
         advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1);
+        usedLookupTables = Lists.newArrayList();
 
         ////////////
 
@@ -178,6 +176,17 @@ public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, //
         }
     }
 
+    @Override
+    public void close() throws IOException {
+        for (ILookupTable usedLookupTable : usedLookupTables) {
+            try {
+                usedLookupTable.close();
+            } catch (Exception e) {
+                logger.error("error when close lookup table:" + usedLookupTable);
+            }
+        }
+    }
+
     protected interface IDerivedColumnFiller {
         public void fillDerivedColumns(Object[] gtValues, Tuple tuple);
     }
@@ -204,7 +213,7 @@ protected IDerivedColumnFiller newDerivedColumnFiller(TblColRef[] hostCols, fina
         switch (deriveInfo.type) {
         case LOOKUP:
             return new IDerivedColumnFiller() {
-                LookupStringTable lookupTable = getLookupTable(cubeSeg, deriveInfo.join);
+                ILookupTable lookupTable = getAndAddLookupTable(cubeSeg, deriveInfo.join);
                 int[] derivedColIdx = initDerivedColIdx();
                 Array<String> lookupKey = new Array<String>(new String[hostTmpIdx.length]);
 
@@ -263,40 +272,10 @@ public int indexOnTheGTValues(TblColRef col) {
         return -1;
     }
 
-    public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
-        long ts = System.currentTimeMillis();
-
-        TableMetadataManager metaMgr = TableMetadataManager.getInstance(cubeSeg.getCubeInstance().getConfig());
-        SnapshotManager snapshotMgr = SnapshotManager.getInstance(cubeSeg.getCubeInstance().getConfig());
-
-        String tableName = join.getPKSide().getTableIdentity();
-        String[] pkCols = join.getPrimaryKey();
-        String snapshotResPath = cubeSegment.getSnapshotResPath(tableName);
-        if (snapshotResPath == null)
-            throw new IllegalStateException("No snaphot for table '" + tableName + "' found on cube segment" + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment);
-
-        try {
-            SnapshotTable snapshot = snapshotMgr.getSnapshotTable(snapshotResPath);
-            TableDesc tableDesc = metaMgr.getTableDesc(tableName, cubeSegment.getProject());
-            EnhancedStringLookupTable enhancedStringLookupTable = new EnhancedStringLookupTable(tableDesc, pkCols, snapshot);
-            logger.info("Time to get lookup up table for {} is {} ", join.getPKSide().getTableName(), (System.currentTimeMillis() - ts));
-            return enhancedStringLookupTable;
-        } catch (IOException e) {
-            throw new IllegalStateException("Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
-        }
-    }
-
-    public static class EnhancedStringLookupTable extends LookupStringTable {
-
-        public EnhancedStringLookupTable(TableDesc tableDesc, String[] keyColumns, IReadableTable table) throws IOException {
-            super(tableDesc, keyColumns, table);
-        }
-
-        @Override
-        protected void init() throws IOException {
-            this.data = new HashMap<>();
-            super.init();
-        }
+    public ILookupTable getAndAddLookupTable(CubeSegment cubeSegment, JoinDesc join) {
+        ILookupTable lookupTable = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getLookupTable(cubeSegment, join);
+        usedLookupTables.add(lookupTable);
+        return lookupTable;
     }
 
     private static String toString(Object o) {
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 2f69b76900..5e11e91ce2 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.storage.gtrecord;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -36,7 +37,7 @@
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.RowKeyColDesc;
-import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.dict.lookup.ILookupTable;
 import org.apache.kylin.gridtable.StorageLimitLevel;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.bitmap.BitmapMeasureType;
@@ -363,9 +364,16 @@ private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblC
             return compf;
 
         DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
-        LookupStringTable lookup = cubeDesc.getHostInfo(derived).type == CubeDesc.DeriveType.PK_FK ? null
+        ILookupTable lookup = cubeDesc.getHostInfo(derived).type == CubeDesc.DeriveType.PK_FK ? null
                 : getLookupStringTableForDerived(derived, hostInfo);
         Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
+        try {
+            if (lookup != null) {
+                lookup.close();
+            }
+        } catch (IOException e) {
+            logger.error("error when close lookup table.", e);
+        }
         TupleFilter translatedFilter = translated.getFirst();
         boolean loosened = translated.getSecond();
         if (loosened) {
@@ -375,7 +383,7 @@ private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblC
     }
 
     @SuppressWarnings("unchecked")
-    protected LookupStringTable getLookupStringTableForDerived(TblColRef derived, DeriveInfo hostInfo) {
+    protected ILookupTable getLookupStringTableForDerived(TblColRef derived, DeriveInfo hostInfo) {
         CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
         CubeSegment seg = cubeInstance.getLatestReadySegment();
         return cubeMgr.getLookupTable(seg, hostInfo.join);
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
index dd48e4d1ab..72e6848fe4 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
@@ -18,12 +18,13 @@
 
 package org.apache.kylin.storage.gtrecord;
 
+import java.io.Closeable;
 import java.util.List;
 
 import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
 import org.apache.kylin.metadata.tuple.Tuple;
 
-public interface ITupleConverter {
+public interface ITupleConverter extends Closeable {
 
     public List<IAdvMeasureFiller> translateResult(Object[] gtValues, Tuple tuple);
 }
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
index 3bac5ec5a1..a6e0bc1348 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
@@ -202,6 +202,7 @@ public void close() {
     protected void close(CubeSegmentScanner scanner) {
         try {
             scanner.close();
+            cubeTupleConverter.close();
         } catch (IOException e) {
             logger.error("Exception when close CubeScanner", e);
         }
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
index f4150fe6ae..7fa426fd2a 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
@@ -27,7 +27,7 @@
 import org.apache.kylin.cube.kv.RowKeyColumnOrder;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.CubeDesc.DeriveType;
-import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.dict.lookup.ILookupTable;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
@@ -50,7 +50,7 @@
 
     private static final Logger logger = LoggerFactory.getLogger(DerivedFilterTranslator.class);
 
-    public static Pair<TupleFilter, Boolean> translate(LookupStringTable lookup, DeriveInfo hostInfo, CompareTupleFilter compf) {
+    public static Pair<TupleFilter, Boolean> translate(ILookupTable lookup, DeriveInfo hostInfo, CompareTupleFilter compf) {
 
         TblColRef derivedCol = compf.getColumn();
         TblColRef[] hostCols = hostInfo.columns;
@@ -76,7 +76,7 @@
 
         Set<Array<String>> satisfyingHostRecords = Sets.newHashSet();
         SingleColumnTuple tuple = new SingleColumnTuple(derivedCol);
-        for (String[] row : lookup.getAllRows()) {
+        for (String[] row : lookup) {
             tuple.value = row[di];
             if (compf.evaluate(tuple, FilterCodeSystemFactory.getFilterCodeSystem(derivedCol.getColumnDesc().getType()))) {
                 collect(row, pi, satisfyingHostRecords);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index faac724443..a840bf7113 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -22,6 +22,7 @@
 
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidUtil;
+import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
@@ -69,6 +70,10 @@ public CubingJob build() {
 
         result.addTask(createBuildDictionaryStep(jobId));
         result.addTask(createSaveStatisticsStep(jobId));
+
+        // add materialize lookup tables if needed
+        addMaterializeLookupTableSteps(result);
+
         outputSide.addStepPhase2_BuildDictionary(result);
 
         // Phase 3: Build Cube
@@ -97,6 +102,18 @@ private boolean isEnableUHCDictStep() {
         return true;
     }
 
+    private void addMaterializeLookupTableSteps(final CubingJob result) {
+        CubeDesc cubeDesc = seg.getCubeDesc();
+        List<String> allSnapshotTypes = cubeDesc.getAllExtLookupSnapshotTypes();
+        if (allSnapshotTypes.isEmpty()) {
+            return;
+        }
+        for (String snapshotType : allSnapshotTypes) {
+            ILookupMaterializer materializer = MRUtil.getExtLookupMaterializer(snapshotType);
+            materializer.materializeLookupTablesForCube(result, seg.getCubeInstance());
+        }
+    }
+
     protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
         // Don't know statistics so that tree cuboid scheduler is not determined. Determine the maxLevel at runtime
         final int maxLevel = CuboidUtil.getLongestDepth(seg.getCuboidScheduler().getAllCuboidIds());
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index c9ed3594fa..7f7191d047 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -19,8 +19,6 @@
 package org.apache.kylin.engine.mr;
 
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -350,28 +348,4 @@ public long findCubeSizeBytes() {
         return Long.parseLong(findExtraInfoBackward(CUBE_SIZE_BYTES, "0"));
     }
 
-    public String findExtraInfo(String key, String dft) {
-        return findExtraInfo(key, dft, false);
-    }
-
-    public String findExtraInfoBackward(String key, String dft) {
-        return findExtraInfo(key, dft, true);
-    }
-
-    private String findExtraInfo(String key, String dft, boolean backward) {
-        ArrayList<AbstractExecutable> tasks = new ArrayList<AbstractExecutable>(getTasks());
-
-        if (backward) {
-            Collections.reverse(tasks);
-        }
-
-        for (AbstractExecutable child : tasks) {
-            Output output = getManager().getOutput(child.getId());
-            String value = output.getExtra().get(key);
-            if (value != null)
-                return value;
-        }
-        return dft;
-    }
-
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/ILookupMaterializer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/ILookupMaterializer.java
new file mode 100644
index 0000000000..f103da299e
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/ILookupMaterializer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public interface ILookupMaterializer {
+    void materializeLookupTable(DefaultChainedExecutable jobFlow, CubeInstance cube, String lookupTableName);
+
+    void materializeLookupTablesForCube(DefaultChainedExecutable jobFlow, CubeInstance cube);
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotBuildJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotBuildJob.java
new file mode 100644
index 0000000000..6865ce3be7
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotBuildJob.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+
+public class LookupSnapshotBuildJob extends DefaultChainedExecutable {
+
+    public static final Integer DEFAULT_PRIORITY = 30;
+
+    private static final String DEPLOY_ENV_NAME = "envName";
+    private static final String PROJECT_INSTANCE_NAME = "projectName";
+    private static final String CUBE_NAME = "cubeName";
+
+    private static final String JOB_TYPE = "Lookup ";
+
+    public static LookupSnapshotBuildJob createJob(CubeInstance cube, String tableName, String submitter,
+            KylinConfig kylinConfig) {
+        return initJob(cube, tableName, submitter, kylinConfig);
+    }
+
+    private static LookupSnapshotBuildJob initJob(CubeInstance cube, String tableName, String submitter,
+            KylinConfig kylinConfig) {
+        List<ProjectInstance> projList = ProjectManager.getInstance(kylinConfig).findProjects(cube.getType(),
+                cube.getName());
+        if (projList == null || projList.size() == 0) {
+            throw new RuntimeException("Cannot find the project containing the cube " + cube.getName() + "!!!");
+        } else if (projList.size() >= 2) {
+            String msg = "Find more than one project containing the cube " + cube.getName()
+                    + ". It does't meet the uniqueness requirement!!! ";
+            throw new RuntimeException(msg);
+        }
+
+        LookupSnapshotBuildJob result = new LookupSnapshotBuildJob();
+        SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
+        format.setTimeZone(TimeZone.getTimeZone(kylinConfig.getTimeZone()));
+        result.setDeployEnvName(kylinConfig.getDeployEnv());
+        result.setProjectName(projList.get(0).getName());
+        CubingExecutableUtil.setCubeName(cube.getName(), result.getParams());
+        result.setName(JOB_TYPE + " CUBE - " + cube.getName() + " - " + " TABLE - " + tableName + " - "
+                + format.format(new Date(System.currentTimeMillis())));
+        result.setSubmitter(submitter);
+        result.setNotifyList(cube.getDescriptor().getNotifyList());
+        return result;
+    }
+
+    protected void setDeployEnvName(String name) {
+        setParam(DEPLOY_ENV_NAME, name);
+    }
+
+    public String getDeployEnvName() {
+        return getParam(DEPLOY_ENV_NAME);
+    }
+
+    public String getProjectName() {
+        return getParam(PROJECT_INSTANCE_NAME);
+    }
+
+    public void setProjectName(String name) {
+        setParam(PROJECT_INSTANCE_NAME, name);
+    }
+
+    public String getCubeName() {
+        return getParam(CUBE_NAME);
+    }
+
+    @Override
+    public int getDefaultPriority() {
+        return DEFAULT_PRIORITY;
+    }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotJobBuilder.java
new file mode 100644
index 0000000000..e7888a5000
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotJobBuilder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.SnapshotTableDesc;
+import org.apache.kylin.engine.mr.steps.lookup.LookupExecutableUtil;
+import org.apache.kylin.engine.mr.steps.lookup.LookupSnapshotToMetaStoreStep;
+import org.apache.kylin.engine.mr.steps.lookup.UpdateCubeAfterSnapshotStep;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LookupSnapshotJobBuilder {
+    private static final Logger logger = LoggerFactory.getLogger(LookupSnapshotJobBuilder.class);
+    private CubeInstance cube;
+    private String lookupTable;
+    private List<String> segments;
+    private String submitter;
+    private KylinConfig kylinConfig;
+
+    public LookupSnapshotJobBuilder(CubeInstance cube, String lookupTable, List<String> segments, String submitter) {
+        this.cube = cube;
+        this.lookupTable = lookupTable;
+        this.segments = segments;
+        this.submitter = submitter;
+        this.kylinConfig = cube.getConfig();
+    }
+
+    public LookupSnapshotBuildJob build() {
+        logger.info("new job to build lookup snapshot:{} for cube:{}", lookupTable, cube.getName());
+        LookupSnapshotBuildJob result = LookupSnapshotBuildJob.createJob(cube, lookupTable, submitter, kylinConfig);
+        CubeDesc cubeDesc = cube.getDescriptor();
+        SnapshotTableDesc snapshotTableDesc = cubeDesc.getSnapshotTableDesc(lookupTable);
+        if (snapshotTableDesc != null && snapshotTableDesc.isExtSnapshotTable()) {
+            addExtMaterializeLookupTableSteps(result, snapshotTableDesc);
+        } else {
+            addInMetaStoreMaterializeLookupTableSteps(result);
+        }
+        return result;
+    }
+
+    private void addExtMaterializeLookupTableSteps(final LookupSnapshotBuildJob result,
+            SnapshotTableDesc snapshotTableDesc) {
+        ILookupMaterializer materializer = MRUtil.getExtLookupMaterializer(snapshotTableDesc.getStorageType());
+        materializer.materializeLookupTable(result, cube, lookupTable);
+
+        UpdateCubeAfterSnapshotStep afterSnapshotStep = new UpdateCubeAfterSnapshotStep();
+        afterSnapshotStep.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_LOOKUP_TABLE_UPDATE_CUBE);
+        LookupExecutableUtil.setCubeName(cube.getName(), afterSnapshotStep.getParams());
+        LookupExecutableUtil.setLookupTableName(lookupTable, afterSnapshotStep.getParams());
+        LookupExecutableUtil.setSegments(segments, afterSnapshotStep.getParams());
+        LookupExecutableUtil.setJobID(result.getId(), afterSnapshotStep.getParams());
+        result.addTask(afterSnapshotStep);
+    }
+
+    private void addInMetaStoreMaterializeLookupTableSteps(final LookupSnapshotBuildJob result) {
+        LookupSnapshotToMetaStoreStep lookupSnapshotToMetaStoreStep = new LookupSnapshotToMetaStoreStep();
+        lookupSnapshotToMetaStoreStep.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_LOOKUP_TABLE_META_STORE);
+        LookupExecutableUtil.setCubeName(cube.getName(), lookupSnapshotToMetaStoreStep.getParams());
+        LookupExecutableUtil.setLookupTableName(lookupTable, lookupSnapshotToMetaStoreStep.getParams());
+        LookupExecutableUtil.setSegments(segments, lookupSnapshotToMetaStoreStep.getParams());
+        result.addTask(lookupSnapshotToMetaStoreStep);
+    }
+
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 3a9d0ede94..9eae3839d2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -23,6 +23,7 @@
 import org.apache.hadoop.util.Tool;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.lookup.LookupProviderFactory;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
@@ -79,6 +80,10 @@ public static IMRBatchMergeOutputSide2 getBatchMergeOutputSide2(CubeSegment seg)
     public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) {
         return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchOptimizeOutputSide(seg);
     }
+
+    public static ILookupMaterializer getExtLookupMaterializer(String lookupStorageType) {
+        return LookupProviderFactory.createEngineAdapter(lookupStorageType, ILookupMaterializer.class);
+    }
     
     // use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale
     // Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 7b253549ac..fa3c22e19d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -120,6 +120,10 @@
     protected static final Option OPTION_NEED_UPDATE_BASE_CUBOID_SHARD = OptionBuilder
             .withArgName(BatchConstants.ARG_UPDATE_SHARD).hasArg().isRequired(false)
             .withDescription("If need to update base cuboid shard").create(BatchConstants.ARG_UPDATE_SHARD);
+    protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_TABLE_NAME).hasArg().isRequired(true).withDescription("Table name. For exmaple, default.table1").create(BatchConstants.ARG_TABLE_NAME);
+    protected static final Option OPTION_LOOKUP_SNAPSHOT_ID = OptionBuilder.withArgName(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID).hasArg()
+            .isRequired(true).withDescription("Lookup table snapshotID")
+            .create(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID);
 
     private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 64163adb8a..36f2566f8b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -72,6 +72,8 @@
 
     String CFG_HLL_REDUCER_NUM = "cuboidHLLCounterReducerNum";
 
+    String CFG_SHARD_NUM = "shard.num";
+
     /**
      * command line ARGuments
      */
@@ -95,6 +97,8 @@
     String ARG_LEVEL = "level";
     String ARG_CONF = "conf";
     String ARG_DICT_PATH = "dictPath";
+    String ARG_TABLE_NAME = "tableName";
+    String ARG_LOOKUP_SNAPSHOT_ID = "snapshotID";
 
     /**
      * logger and counter
@@ -106,4 +110,11 @@
      * dictionaries builder class
      */
     String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder";
+
+    /**
+     * the prefix of ext lookup table snapshot resource path that stored in the build context
+     */
+    String LOOKUP_EXT_SNAPSHOT_CONTEXT_PFX = "lookup.ext.snapshot.res.path.";
+
+    String LOOKUP_EXT_SNAPSHOT_SRC_RECORD_CNT_PFX = "lookup.ext.snapshot.src.record.cnt.";
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index d085a77acf..ad1245c1f0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -21,6 +21,7 @@
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.List;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -30,6 +31,7 @@
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.SnapshotTableDesc;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -71,6 +73,7 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio
         segment.setInputRecordsSize(sourceSizeBytes);
 
         try {
+            saveExtSnapshotIfNeeded(cubeManager, cubingJob, cube, segment);
             if (segment.isOffsetCube()) {
                 updateTimeRange(segment);
             }
@@ -83,6 +86,26 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio
         }
     }
 
+    private void saveExtSnapshotIfNeeded(CubeManager cubeManager, CubingJob cubingJob, CubeInstance cube, CubeSegment segment) throws IOException {
+        List<SnapshotTableDesc> snapshotTableDescList = cube.getDescriptor().getSnapshotTableDescList();
+        for (SnapshotTableDesc snapshotTableDesc : snapshotTableDescList) {
+            String tableName = snapshotTableDesc.getTableName();
+            if (snapshotTableDesc.isExtSnapshotTable()) {
+                String contextKey = BatchConstants.LOOKUP_EXT_SNAPSHOT_CONTEXT_PFX + tableName;
+                String newSnapshotResPath = cubingJob.getExtraInfo(contextKey);
+                if (newSnapshotResPath == null) {
+                    continue;
+                }
+
+                if (snapshotTableDesc.isGlobal()) {
+                    cubeManager.updateCubeLookupSnapshot(cube, tableName, newSnapshotResPath);
+                } else {
+                    segment.putSnapshotResPath(tableName, newSnapshotResPath);
+                }
+            }
+        }
+    }
+
     private void updateTimeRange(CubeSegment segment) throws IOException {
         final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupExecutableUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupExecutableUtil.java
new file mode 100644
index 0000000000..bc2aa1d038
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupExecutableUtil.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps.lookup;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+
+import com.google.common.collect.Lists;
+
+public class LookupExecutableUtil {
+
+    public static final String CUBE_NAME = "cubeName";
+    public static final String LOOKUP_TABLE_NAME = "lookupTableName";
+    public static final String PROJECT_NAME = "projectName";
+    public static final String LOOKUP_SNAPSHOT_ID = "snapshotID";
+    public static final String SEGMENT_IDS = "segments";
+    public static final String JOB_ID = "jobID";
+
+
+    public static void setCubeName(String cubeName, Map<String, String> params) {
+        params.put(CUBE_NAME, cubeName);
+    }
+
+    public static String getCubeName(Map<String, String> params) {
+        return params.get(CUBE_NAME);
+    }
+
+    public static void setLookupTableName(String lookupTableName, Map<String, String> params) {
+        params.put(LOOKUP_TABLE_NAME, lookupTableName);
+    }
+
+    public static String getLookupTableName(Map<String, String> params) {
+        return params.get(LOOKUP_TABLE_NAME);
+    }
+
+    public static void setProjectName(String projectName, Map<String, String> params) {
+        params.put(PROJECT_NAME, projectName);
+    }
+
+    public static String getProjectName(Map<String, String> params) {
+        return params.get(PROJECT_NAME);
+    }
+
+    public static void setLookupSnapshotID(String snapshotID, Map<String, String> params) {
+        params.put(LOOKUP_SNAPSHOT_ID, snapshotID);
+    }
+
+    public static String getLookupSnapshotID(Map<String, String> params) {
+        return params.get(LOOKUP_SNAPSHOT_ID);
+    }
+
+    public static List<String> getSegments(Map<String, String> params) {
+        final String ids = params.get(SEGMENT_IDS);
+        if (ids != null) {
+            final String[] splitted = StringUtils.split(ids, ",");
+            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+            for (String id : splitted) {
+                result.add(id);
+            }
+            return result;
+        } else {
+            return Collections.emptyList();
+        }
+    }
+
+    public static void setSegments(List<String> segments, Map<String, String> params) {
+        params.put(SEGMENT_IDS, StringUtils.join(segments, ","));
+    }
+
+
+    public static String getJobID(Map<String, String> params) {
+        return params.get(JOB_ID);
+    }
+
+    public static void setJobID(String jobID, Map<String, String> params) {
+        params.put(JOB_ID, jobID);
+    }
+    
+    public static void updateSnapshotPathToCube(CubeManager cubeManager, CubeInstance cube, String lookupTableName,
+            String snapshotPath) throws IOException {
+        cubeManager.updateCubeLookupSnapshot(cube, lookupTableName, snapshotPath);
+        cube.putSnapshotResPath(lookupTableName, snapshotPath);
+    }
+
+    public static void updateSnapshotPathToSegments(CubeManager cubeManager, CubeInstance cube, List<String> segmentIDs, String lookupTableName, String snapshotPath) throws IOException {
+        CubeInstance cubeCopy = cube.latestCopyForWrite();
+        if (segmentIDs.size() > 0) {
+            CubeSegment[] segments = new CubeSegment[segmentIDs.size()];
+            for (int i = 0; i < segments.length; i++) {
+                CubeSegment segment = cubeCopy.getSegmentById(segmentIDs.get(i));
+                if (segment == null) {
+                    throw new IllegalStateException("the segment not exist in cube:" + segmentIDs.get(i));
+                }
+                segment.putSnapshotResPath(lookupTableName, snapshotPath);
+                segments[i] = segment;
+            }
+            CubeUpdate cubeUpdate = new CubeUpdate(cubeCopy);
+            cubeUpdate.setToUpdateSegs(segments);
+            cubeManager.updateCube(cubeUpdate);
+
+            // Update the input cubeSeg after the resource store updated
+            for (int i = 0; i < segments.length; i++) {
+                CubeSegment segment = cube.getSegmentById(segmentIDs.get(i));
+                segment.putSnapshotResPath(lookupTableName, snapshotPath);
+            }
+        }
+    }
+
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java
new file mode 100644
index 0000000000..783ded065b
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps.lookup;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.SourceManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Save lookup snapshot information to cube metadata
+ */
+public class LookupSnapshotToMetaStoreStep extends AbstractExecutable {
+
+    private static final Logger logger = LoggerFactory.getLogger(LookupSnapshotToMetaStoreStep.class);
+
+    public LookupSnapshotToMetaStoreStep() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        KylinConfig kylinConfig = context.getConfig();
+        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+        TableMetadataManager metaMgr = TableMetadataManager.getInstance(kylinConfig);
+        SnapshotManager snapshotMgr = SnapshotManager.getInstance(kylinConfig);
+        CubeInstance cube = cubeManager.getCube(LookupExecutableUtil.getCubeName(this.getParams()));
+        List<String> segmentIDs = LookupExecutableUtil.getSegments(this.getParams());
+        String lookupTableName = LookupExecutableUtil.getLookupTableName(this.getParams());
+        CubeDesc cubeDesc = cube.getDescriptor();
+        try {
+            TableDesc tableDesc = metaMgr.getTableDesc(lookupTableName, cube.getProject());
+            IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc);
+            logger.info("take snapshot for table:" + lookupTableName);
+            SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
+
+            logger.info("update snapshot path to cube metadata");
+            if (cubeDesc.isGlobalSnapshotTable(lookupTableName)) {
+                LookupExecutableUtil.updateSnapshotPathToCube(cubeManager, cube, lookupTableName,
+                        snapshot.getResourcePath());
+            } else {
+                LookupExecutableUtil.updateSnapshotPathToSegments(cubeManager, cube, segmentIDs, lookupTableName,
+                        snapshot.getResourcePath());
+            }
+            return new ExecuteResult();
+        } catch (IOException e) {
+            logger.error("fail to build snapshot for:" + lookupTableName, e);
+            return ExecuteResult.createError(e);
+        }
+    }
+
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/UpdateCubeAfterSnapshotStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/UpdateCubeAfterSnapshotStep.java
new file mode 100644
index 0000000000..42290b0697
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/UpdateCubeAfterSnapshotStep.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps.lookup;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Save lookup snapshot information to cube metadata
+ */
+public class UpdateCubeAfterSnapshotStep extends AbstractExecutable {
+
+    private static final Logger logger = LoggerFactory.getLogger(UpdateCubeAfterSnapshotStep.class);
+
+    public UpdateCubeAfterSnapshotStep() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        KylinConfig kylinConfig = context.getConfig();
+        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+
+        CubeInstance cube = cubeManager.getCube(LookupExecutableUtil.getCubeName(this.getParams()));
+        List<String> segmentIDs = LookupExecutableUtil.getSegments(this.getParams());
+        String lookupTableName = LookupExecutableUtil.getLookupTableName(this.getParams());
+        DefaultChainedExecutable job = (DefaultChainedExecutable) getManager().getJob(LookupExecutableUtil.getJobID(this.getParams()));
+
+        String contextKey = BatchConstants.LOOKUP_EXT_SNAPSHOT_CONTEXT_PFX + lookupTableName;
+        String snapshotResPath = job.getExtraInfo(contextKey);
+        if (snapshotResPath == null) {
+            logger.info("no snapshot path exist in the context, so no need to update snapshot path");
+            return new ExecuteResult();
+        }
+        CubeDesc cubeDesc = cube.getDescriptor();
+        try {
+            logger.info("update snapshot path to cube metadata");
+            if (cubeDesc.isGlobalSnapshotTable(lookupTableName)) {
+                if (!snapshotResPath.equals(cube.getSnapshotResPath(lookupTableName))) {
+                    LookupExecutableUtil.updateSnapshotPathToCube(cubeManager, cube, lookupTableName,
+                            snapshotResPath);
+                }
+            } else {
+                LookupExecutableUtil.updateSnapshotPathToSegments(cubeManager, cube, segmentIDs, lookupTableName,
+                        snapshotResPath);
+            }
+            return new ExecuteResult();
+        } catch (IOException e) {
+            logger.error("fail to save cuboid statistics", e);
+            return ExecuteResult.createError(e);
+        }
+    }
+
+}
diff --git a/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json b/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json
index f1a42b11ab..e42c522caa 100644
--- a/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json
+++ b/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json
@@ -582,5 +582,13 @@
   "override_kylin_properties": {
     "kylin.cube.algorithm": "INMEM"
   },
+  "snapshot_table_desc_list": [
+    {
+      "table_name": "DEFAULT.TEST_CATEGORY_GROUPINGS",
+      "storage_type": "hbase",
+      "local_cache_enable": true,
+      "global": true
+    }
+  ],
   "partition_date_start": 0
 }
diff --git a/pom.xml b/pom.xml
index a8e8312e1c..a5cc1fca24 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,6 +114,7 @@
         <tomcat.version>7.0.85</tomcat.version>
         <t-digest.version>3.1</t-digest.version>
         <freemarker.version>2.3.23</freemarker.version>
+        <rocksdb.version>5.9.2</rocksdb.version>
         <!--metric-->
         <dropwizard.version>3.1.2</dropwizard.version>
         <!-- REST Service, ref https://github.com/spring-projects/spring-boot/blob/v1.3.8.RELEASE/spring-boot-dependencies/pom.xml -->
@@ -655,6 +656,11 @@
                 <artifactId>freemarker</artifactId>
                 <version>${freemarker.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.rocksdb</groupId>
+                <artifactId>rocksdbjni</artifactId>
+                <version>${rocksdb.version}</version>
+            </dependency>
 
             <!-- Logging -->
             <dependency>
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
index 1500def459..c3407bc131 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
@@ -18,8 +18,8 @@
 
 package org.apache.kylin.query.enumerator;
 
+import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
@@ -27,19 +27,22 @@
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.dict.lookup.ILookupTable;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.schema.OLAPTable;
 import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  */
 public class LookupTableEnumerator implements Enumerator<Object[]> {
+    private final static Logger logger = LoggerFactory.getLogger(LookupTableEnumerator.class);
 
-    private final Collection<String[]> allRows;
+    private final ILookupTable lookupTable;
     private final List<ColumnDesc> colDescs;
     private final Object[] current;
     private Iterator<String[]> iterator;
@@ -67,8 +70,7 @@ else if (olapContext.realization instanceof HybridInstance) {
             throw new IllegalStateException("No dimension with derived columns found for lookup table " + lookupTableName + ", cube desc " + cube.getDescriptor());
 
         CubeManager cubeMgr = CubeManager.getInstance(cube.getConfig());
-        LookupStringTable table = cubeMgr.getLookupTable(cube.getLatestReadySegment(), dim.getJoin());
-        this.allRows = table.getAllRows();
+        this.lookupTable = cubeMgr.getLookupTable(cube.getLatestReadySegment(), dim.getJoin());
 
         OLAPTable olapTable = (OLAPTable) olapContext.firstTableScan.getOlapTable();
         this.colDescs = olapTable.getSourceColumns();
@@ -103,11 +105,16 @@ public boolean moveNext() {
 
     @Override
     public void reset() {
-        this.iterator = allRows.iterator();
+        this.iterator = lookupTable.iterator();
     }
 
     @Override
     public void close() {
+        try {
+            lookupTable.close();
+        } catch (IOException e) {
+            logger.error("error when close lookup table", e);
+        }
     }
 
 }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index b4ebcbabc5..c6219a4dee 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -64,6 +64,7 @@
 import org.apache.kylin.rest.request.JobOptimizeRequest;
 import org.apache.kylin.rest.request.SQLRequest;
 import org.apache.kylin.rest.response.CubeInstanceResponse;
+import org.apache.kylin.rest.request.LookupSnapshotBuildRequest;
 import org.apache.kylin.rest.response.CuboidTreeResponse;
 import org.apache.kylin.rest.response.EnvelopeResponse;
 import org.apache.kylin.rest.response.GeneralResponse;
@@ -298,6 +299,25 @@ public CubeInstance rebuildLookupSnapshot(@PathVariable String cubeName, @PathVa
         }
     }
 
+    /**
+     * Force rebuild a cube's lookup table snapshot
+     *
+     * @throws IOException
+     */
+    @RequestMapping(value = "/{cubeName}/refresh_lookup", method = { RequestMethod.PUT }, produces = { "application/json" })
+    @ResponseBody
+    public JobInstance reBuildLookupSnapshot(@PathVariable String cubeName, @RequestBody LookupSnapshotBuildRequest request) {
+        try {
+            final CubeManager cubeMgr = cubeService.getCubeManager();
+            final CubeInstance cube = cubeMgr.getCube(cubeName);
+            String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
+            return jobService.submitLookupSnapshotJob(cube, request.getLookupTableName(), request.getSegmentIDs(), submitter);
+        } catch (IOException e) {
+            logger.error(e.getLocalizedMessage(), e);
+            throw new InternalErrorException(e.getLocalizedMessage());
+        }
+    }
+
     /**
      * Delete a cube segment
      *
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
index 7ada8cc25b..66621c7dd6 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
@@ -32,6 +32,7 @@
 import org.apache.kylin.rest.request.CardinalityRequest;
 import org.apache.kylin.rest.request.HiveTableRequest;
 import org.apache.kylin.rest.service.TableACLService;
+import org.apache.kylin.rest.response.TableSnapshotResponse;
 import org.apache.kylin.rest.service.TableService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -208,4 +209,31 @@ public CardinalityRequest generateCardinality(@PathVariable String tableNames, @
         }
     }
 
+    @RequestMapping(value = "/{project}/{tableName}/{snapshotID}/snapshotLocalCache", method = { RequestMethod.PUT })
+    @ResponseBody
+    public void updateSnapshotLocalCache(@PathVariable final String project, @PathVariable final String tableName, @PathVariable final String snapshotID) {
+        tableService.updateSnapshotLocalCache(project, tableName, snapshotID);
+    }
+
+    @RequestMapping(value = "/{tableName}/{snapshotID}/snapshotLocalCache/state", method = { RequestMethod.GET })
+    @ResponseBody
+    public String getSnapshotLocalCacheState(@PathVariable final String tableName, @PathVariable final String snapshotID) {
+        return tableService.getSnapshotLocalCacheState(tableName, snapshotID);
+    }
+
+    @RequestMapping(value = "/{tableName}/{snapshotID}/snapshotLocalCache", method = { RequestMethod.DELETE })
+    @ResponseBody
+    public void removeSnapshotLocalCache(@PathVariable final String tableName, @PathVariable final String snapshotID) {
+        tableService.removeSnapshotLocalCache(tableName, snapshotID);
+    }
+
+    @RequestMapping(value = "/{project}/{tableName}/snapshots", method = { RequestMethod.GET })
+    @ResponseBody
+    public List<TableSnapshotResponse> getTableSnapshots(@PathVariable final String project, @PathVariable final String tableName) throws IOException {
+        return tableService.getLookupTableSnapshots(project, tableName);
+    }
+
+    public void setTableService(TableService tableService) {
+        this.tableService = tableService;
+    }
 }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
index 1aec429064..5ee5c7a370 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
@@ -73,7 +73,7 @@ public MetadataCleanupJob(KylinConfig config) {
 
         // two level resources, snapshot tables and cube statistics
         for (String resourceRoot : new String[] { ResourceStore.SNAPSHOT_RESOURCE_ROOT,
-                ResourceStore.CUBE_STATISTICS_ROOT }) {
+                ResourceStore.CUBE_STATISTICS_ROOT, ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT}) {
             for (String dir : noNull(store.listResources(resourceRoot))) {
                 for (String res : noNull(store.listResources(dir))) {
                     if (store.getResourceTimestamp(res) < newResourceTimeCut)
@@ -97,6 +97,7 @@ public MetadataCleanupJob(KylinConfig config) {
         // exclude resources in use
         Set<String> activeResources = Sets.newHashSet();
         for (CubeInstance cube : cubeManager.listAllCubes()) {
+            activeResources.addAll(cube.getSnapshots().values());
             for (CubeSegment segment : cube.getSegments()) {
                 activeResources.addAll(segment.getSnapshotPaths());
                 activeResources.addAll(segment.getDictionaryPaths());
diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java
index 126c5987e7..4c8c42643b 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -28,6 +29,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -35,6 +37,8 @@
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,10 +64,17 @@
                 ? config.getHBaseTableNamePrefix()
                 : (namespace + ":" + config.getHBaseTableNamePrefix());
         HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
+
         List<String> allTablesNeedToBeDropped = new ArrayList<String>();
+
+        boolean hasExtLookupTable = false;
         for (HTableDescriptor desc : tableDescriptors) {
             String host = desc.getValue(IRealizationConstants.HTableTag);
             if (config.getMetadataUrlPrefix().equalsIgnoreCase(host)) {
+                // check if there are hbase lookup table
+                if (desc.getTableName().getNameAsString().contains(IRealizationConstants.LookupHbaseStorageLocationPrefix)) {
+                    hasExtLookupTable = true;
+                }
                 //only take care htables that belongs to self, and created more than 2 days
                 allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
             }
@@ -88,6 +99,11 @@
         
         logger.info(allTablesNeedToBeDropped.size() + " HTable(s) to clean up");
 
+        if (hasExtLookupTable) {
+            List<String> useExtLookupTables = getAllUsedExtLookupTables();
+            logger.info("Exclude tables:{}, as they are referred by snapshots.", useExtLookupTables);
+            allTablesNeedToBeDropped.removeAll(useExtLookupTables);
+        }
         if (delete) {
             // drop tables
             ExecutorService executorService = Executors.newSingleThreadExecutor();
@@ -115,6 +131,27 @@
         return allTablesNeedToBeDropped;
     }
 
+    private static List<String> getAllUsedExtLookupTables() throws IOException {
+        List<String> result = Lists.newArrayList();
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        final Set<String> activeSnapshotSet = ExtTableSnapshotInfoManager.getInstance(config).getAllExtSnapshotResPaths();
+
+        for (String extSnapshotResource : activeSnapshotSet) {
+            try {
+                ExtTableSnapshotInfo extTableSnapshot = ExtTableSnapshotInfoManager.getInstance(config).getSnapshot(
+                        extSnapshotResource);
+                if (extTableSnapshot != null) {
+                    if (ExtTableSnapshotInfo.STORAGE_TYPE_HBASE.equals(extTableSnapshot.getStorageType())) {
+                        result.add(extTableSnapshot.getStorageLocationIdentifier());
+                    }
+                }
+            } catch (Exception e) {
+                logger.error("error fetch ext table snapshot:" + extSnapshotResource, e);
+            }
+        }
+        return result;
+    }
+
     static class DeleteHTableRunnable implements Callable {
         HBaseAdmin hbaseAdmin;
         String htableName;
diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/LookupSnapshotBuildRequest.java b/server-base/src/main/java/org/apache/kylin/rest/request/LookupSnapshotBuildRequest.java
new file mode 100644
index 0000000000..06adf8ac14
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/request/LookupSnapshotBuildRequest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.request;
+
+import java.util.List;
+
+public class LookupSnapshotBuildRequest {
+    private String cubeName;
+    private String lookupTableName;
+    private List<String> segmentIDs;
+
+    public String getCubeName() {
+        return cubeName;
+    }
+
+    public void setCubeName(String cubeName) {
+        this.cubeName = cubeName;
+    }
+
+    public String getLookupTableName() {
+        return lookupTableName;
+    }
+
+    public void setLookupTableName(String lookupTableName) {
+        this.lookupTableName = lookupTableName;
+    }
+
+    public List<String> getSegmentIDs() {
+        return segmentIDs;
+    }
+
+    public void setSegmentIDs(List<String> segmentIDs) {
+        this.segmentIDs = segmentIDs;
+    }
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/TableSnapshotResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/TableSnapshotResponse.java
new file mode 100644
index 0000000000..05747644ca
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/response/TableSnapshotResponse.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.response;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class TableSnapshotResponse implements Serializable {
+    private static final long serialVersionUID = -8707176301793624704L;
+    public static final String TYPE_EXT = "ext";
+    public static final String TYPE_INNER = "inner";
+
+    private String snapshotID;
+
+    private String snapshotType; // can be ext or inner
+
+    private String storageType;
+
+    private long lastBuildTime;
+
+    private long sourceTableSize;
+
+    private long sourceTableLastModifyTime;
+
+    private List<String> cubesAndSegmentsUsage;
+
+    public String getSnapshotID() {
+        return snapshotID;
+    }
+
+    public void setSnapshotID(String snapshotID) {
+        this.snapshotID = snapshotID;
+    }
+
+    public String getSnapshotType() {
+        return snapshotType;
+    }
+
+    public void setSnapshotType(String snapshotType) {
+        this.snapshotType = snapshotType;
+    }
+
+    public String getStorageType() {
+        return storageType;
+    }
+
+    public void setStorageType(String storageType) {
+        this.storageType = storageType;
+    }
+
+    public long getLastBuildTime() {
+        return lastBuildTime;
+    }
+
+    public void setLastBuildTime(long lastBuildTime) {
+        this.lastBuildTime = lastBuildTime;
+    }
+
+    public long getSourceTableSize() {
+        return sourceTableSize;
+    }
+
+    public void setSourceTableSize(long sourceTableSize) {
+        this.sourceTableSize = sourceTableSize;
+    }
+
+    public long getSourceTableLastModifyTime() {
+        return sourceTableLastModifyTime;
+    }
+
+    public void setSourceTableLastModifyTime(long sourceTableLastModifyTime) {
+        this.sourceTableLastModifyTime = sourceTableLastModifyTime;
+    }
+
+    public List<String> getCubesAndSegmentsUsage() {
+        return cubesAndSegmentsUsage;
+    }
+
+    public void setCubesAndSegmentsUsage(List<String> cubesAndSegmentsUsage) {
+        this.cubesAndSegmentsUsage = cubesAndSegmentsUsage;
+    }
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 4317ed5491..c0d9e5697e 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -36,6 +36,8 @@
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.BatchOptimizeJobCheckpointBuilder;
 import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.LookupSnapshotBuildJob;
+import org.apache.kylin.engine.mr.LookupSnapshotJobBuilder;
 import org.apache.kylin.engine.mr.common.JobInfoConverter;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JobInstance;
@@ -385,6 +387,14 @@ public JobInstance submitRecoverSegmentOptimizeJob(CubeSegment segment, String s
         return optimizeJobInstance;
     }
 
+    public JobInstance submitLookupSnapshotJob(CubeInstance cube, String lookupTable, List<String> segmentIDs, String submitter) throws IOException {
+        LookupSnapshotBuildJob job = new LookupSnapshotJobBuilder(cube, lookupTable, segmentIDs, submitter).build();
+        getExecutableManager().addJob(job);
+
+        JobInstance jobInstance = getLookupSnapshotBuildJobInstance(job);
+        return jobInstance;
+    }
+
     private void checkCubeDescSignature(CubeInstance cube) {
         Message msg = MsgPicker.getMsg();
 
@@ -480,6 +490,28 @@ protected JobInstance getSingleJobInstance(AbstractExecutable job) {
         return result;
     }
 
+    protected JobInstance getLookupSnapshotBuildJobInstance(LookupSnapshotBuildJob job) {
+        if (job == null) {
+            return null;
+        }
+
+        final JobInstance result = new JobInstance();
+        result.setName(job.getName());
+        result.setRelatedCube(CubingExecutableUtil.getCubeName(job.getParams()));
+        result.setRelatedSegment(CubingExecutableUtil.getSegmentId(job.getParams()));
+        result.setLastModified(job.getLastModified());
+        result.setSubmitter(job.getSubmitter());
+        result.setUuid(job.getId());
+        result.setType(CubeBuildTypeEnum.BUILD);
+        result.setStatus(JobInfoConverter.parseToJobStatus(job.getStatus()));
+        result.setDuration(job.getDuration() / 1000);
+        for (int i = 0; i < job.getTasks().size(); ++i) {
+            AbstractExecutable task = job.getTasks().get(i);
+            result.addStep(JobInfoConverter.parseToJobStep(task, i, getExecutableManager().getOutput(task.getId())));
+        }
+        return result;
+    }
+
     protected JobInstance getCheckpointJobInstance(AbstractExecutable job) {
         Message msg = MsgPicker.getMsg();
 
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index ace1686f1a..786daa6a17 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -29,10 +30,19 @@
 import java.util.Set;
 import java.util.UUID;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.dict.lookup.IExtLookupTableCache.CacheState;
+import org.apache.kylin.dict.lookup.LookupProviderFactory;
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
@@ -49,6 +59,9 @@
 import org.apache.kylin.rest.msg.MsgPicker;
 import org.apache.kylin.rest.response.TableDescResponse;
 import org.apache.kylin.rest.util.AclEvaluate;
+import org.apache.kylin.rest.response.TableSnapshotResponse;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.IReadableTable.TableSignature;
 import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
@@ -387,6 +400,109 @@ public void calculateCardinalityIfNotPresent(String[] tables, String submitter,
         }
     }
 
+    public void updateSnapshotLocalCache(String project, String tableName, String snapshotID) {
+        ExtTableSnapshotInfoManager snapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(getConfig());
+        ExtTableSnapshotInfo extTableSnapshotInfo = snapshotInfoManager.getSnapshot(tableName, snapshotID);
+        TableDesc tableDesc = getTableManager().getTableDesc(tableName, project);
+        if (extTableSnapshotInfo == null) {
+            throw new IllegalArgumentException("cannot find ext snapshot info for table:" + tableName + " snapshot:" + snapshotID);
+        }
+        LookupProviderFactory.rebuildLocalCache(tableDesc, extTableSnapshotInfo);
+    }
+
+    public void removeSnapshotLocalCache(String tableName, String snapshotID) {
+        ExtTableSnapshotInfoManager snapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(getConfig());
+        ExtTableSnapshotInfo extTableSnapshotInfo = snapshotInfoManager.getSnapshot(tableName, snapshotID);
+        if (extTableSnapshotInfo == null) {
+            throw new IllegalArgumentException("cannot find ext snapshot info for table:" + tableName + " snapshot:" + snapshotID);
+        }
+        LookupProviderFactory.removeLocalCache(extTableSnapshotInfo);
+    }
+
+    public String getSnapshotLocalCacheState(String tableName, String snapshotID) {
+        ExtTableSnapshotInfoManager snapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(getConfig());
+        ExtTableSnapshotInfo extTableSnapshotInfo = snapshotInfoManager.getSnapshot(tableName, snapshotID);
+        if (extTableSnapshotInfo == null) {
+            throw new IllegalArgumentException("cannot find ext snapshot info for table:" + tableName + " snapshot:" + snapshotID);
+        }
+        CacheState cacheState = LookupProviderFactory.getCacheState(extTableSnapshotInfo);
+        return cacheState.name();
+    }
+
+    public List<TableSnapshotResponse> getLookupTableSnapshots(String project, String tableName) throws IOException {
+        TableDesc tableDesc = getTableManager().getTableDesc(tableName, project);
+        IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc);
+        TableSignature signature = hiveTable.getSignature();
+        return internalGetLookupTableSnapshots(tableName, signature);
+    }
+
+    List<TableSnapshotResponse> internalGetLookupTableSnapshots(String tableName, TableSignature signature) throws IOException {
+        ExtTableSnapshotInfoManager extSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(getConfig());
+        SnapshotManager snapshotManager = SnapshotManager.getInstance(getConfig());
+        List<ExtTableSnapshotInfo> extTableSnapshots = extSnapshotInfoManager.getSnapshots(tableName);
+        List<SnapshotTable> metaStoreTableSnapshots = snapshotManager.getSnapshots(tableName, signature);
+
+        Map<String, List<String>> snapshotUsageMap = getSnapshotUsages();
+
+        List<TableSnapshotResponse> result = Lists.newArrayList();
+        for (ExtTableSnapshotInfo extTableSnapshot : extTableSnapshots) {
+            TableSnapshotResponse response = new TableSnapshotResponse();
+            response.setSnapshotID(extTableSnapshot.getId());
+            response.setSnapshotType(TableSnapshotResponse.TYPE_EXT);
+            response.setLastBuildTime(extTableSnapshot.getLastBuildTime());
+            response.setStorageType(extTableSnapshot.getStorageType());
+            response.setSourceTableSize(extTableSnapshot.getSignature().getSize());
+            response.setSourceTableLastModifyTime(extTableSnapshot.getSignature().getLastModifiedTime());
+            response.setCubesAndSegmentsUsage(snapshotUsageMap.get(extTableSnapshot.getResourcePath()));
+            result.add(response);
+        }
+
+        for (SnapshotTable metaStoreTableSnapshot : metaStoreTableSnapshots) {
+            TableSnapshotResponse response = new TableSnapshotResponse();
+            response.setSnapshotID(metaStoreTableSnapshot.getId());
+            response.setSnapshotType(TableSnapshotResponse.TYPE_INNER);
+            response.setLastBuildTime(metaStoreTableSnapshot.getLastBuildTime());
+            response.setStorageType(SnapshotTable.STORAGE_TYPE_METASTORE);
+            response.setSourceTableSize(metaStoreTableSnapshot.getSignature().getSize());
+            response.setSourceTableLastModifyTime(metaStoreTableSnapshot.getSignature().getLastModifiedTime());
+            response.setCubesAndSegmentsUsage(snapshotUsageMap.get(metaStoreTableSnapshot.getResourcePath()));
+            result.add(response);
+        }
+
+        return result;
+    }
+
+    /**
+     * @return Map of SnapshotID, CubeName or SegmentName list
+     */
+    private Map<String, List<String>> getSnapshotUsages() {
+        CubeManager cubeManager = CubeManager.getInstance(getConfig());
+        Map<String, List<String>> snapshotCubeSegmentMap = Maps.newHashMap();
+        for (CubeInstance cube : cubeManager.listAllCubes()) {
+            Collection<String> cubeSnapshots = cube.getSnapshots().values();
+            for (String cubeSnapshot : cubeSnapshots) {
+                List<String> usages = snapshotCubeSegmentMap.get(cubeSnapshot);
+                if (usages == null) {
+                    usages = Lists.newArrayList();
+                    snapshotCubeSegmentMap.put(cubeSnapshot, usages);
+                }
+                usages.add(cube.getName());
+            }
+            for (CubeSegment segment : cube.getSegments()) {
+                Collection<String> segmentSnapshots = segment.getSnapshotPaths();
+                for (String segmentSnapshot : segmentSnapshots) {
+                    List<String> usages = snapshotCubeSegmentMap.get(segmentSnapshot);
+                    if (usages == null) {
+                        usages = Lists.newArrayList();
+                        snapshotCubeSegmentMap.put(segmentSnapshot, usages);
+                    }
+                    usages.add(cube.getName() + ":" + segment.getName());
+                }
+            }
+        }
+        return snapshotCubeSegmentMap;
+    }
+
     /**
      * Generate cardinality for table This will trigger a hadoop job
      * The result will be merged into table exd info
diff --git a/server-base/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java b/server-base/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java
new file mode 100644
index 0000000000..86d34ac06e
--- /dev/null
+++ b/server-base/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.service;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.rest.response.TableSnapshotResponse;
+import org.apache.kylin.source.IReadableTable.TableSignature;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TableServiceTest extends LocalFileMetadataTestCase {
+    private TableService tableService;
+
+    @Before
+    public void setUp() {
+        this.createTestMetadata();
+        tableService = new TableService();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testGetTableSnapshots() throws IOException {
+        TableSignature tableSignature = new TableSignature("TEST_CAL_DT.csv", 100, System.currentTimeMillis());
+        List<TableSnapshotResponse> snapshotResponseList = tableService.internalGetLookupTableSnapshots("EDW.TEST_CAL_DT", tableSignature);
+        Assert.assertEquals(8, snapshotResponseList.size());
+    }
+}
diff --git a/server/src/main/resources/kylinSecurity.xml b/server/src/main/resources/kylinSecurity.xml
index f9c0d71d2e..c08ae709c6 100644
--- a/server/src/main/resources/kylinSecurity.xml
+++ b/server/src/main/resources/kylinSecurity.xml
@@ -250,6 +250,7 @@
             <scr:intercept-url pattern="/api/admin/public_config" access="permitAll"/>
             <scr:intercept-url pattern="/api/projects" access="permitAll"/>
             <scr:intercept-url pattern="/api/admin*/**" access="hasRole('ROLE_ADMIN')"/>
+            <scr:intercept-url pattern="/api/tables/**/snapshotLocalCache/**" access="permitAll"/>
             <scr:intercept-url pattern="/api/**" access="isAuthenticated()"/>
 
             <scr:form-login login-page="/login" />
@@ -295,6 +296,7 @@
             <scr:intercept-url pattern="/api/admin/config" access="permitAll"/>
             <scr:intercept-url pattern="/api/projects*/*" access="isAuthenticated()"/>
             <scr:intercept-url pattern="/api/admin*/**" access="hasRole('ROLE_ADMIN')"/>
+            <scr:intercept-url pattern="/api/tables/**/snapshotLocalCache/**" access="permitAll"/>
             <scr:intercept-url pattern="/api/**" access="isAuthenticated()"/>
 
             <scr:form-login login-page="/login" />
diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml
index 9bf62f0034..4709c08f97 100644
--- a/storage-hbase/pom.xml
+++ b/storage-hbase/pom.xml
@@ -96,6 +96,12 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4-rule-agent</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java
new file mode 100644
index 0000000000..6101fca629
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.cube.model.SnapshotTableDesc;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.lookup.LookupExecutableUtil;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.SourceManager;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+public class HBaseLookupMRSteps {
+    protected static final Logger logger = LoggerFactory.getLogger(HBaseLookupMRSteps.class);
+    private CubeInstance cube;
+    private JobEngineConfig config;
+
+    public HBaseLookupMRSteps(CubeInstance cube) {
+        this.cube = cube;
+        this.config = new JobEngineConfig(cube.getConfig());
+    }
+
+    public void addMaterializeLookupTablesSteps(DefaultChainedExecutable jobFlow) {
+        CubeDesc cubeDesc = cube.getDescriptor();
+        Set<String> allLookupTables = Sets.newHashSet();
+        for (DimensionDesc dim : cubeDesc.getDimensions()) {
+            TableRef table = dim.getTableRef();
+            if (cubeDesc.getModel().isLookupTable(table)) {
+                allLookupTables.add(table.getTableIdentity());
+            }
+        }
+        List<SnapshotTableDesc> snapshotTableDescs = cubeDesc.getSnapshotTableDescList();
+        for (SnapshotTableDesc snapshotTableDesc : snapshotTableDescs) {
+            if (ExtTableSnapshotInfo.STORAGE_TYPE_HBASE.equals(snapshotTableDesc.getStorageType())
+                    && allLookupTables.contains(snapshotTableDesc.getTableName())) {
+                addMaterializeLookupTableSteps(jobFlow, snapshotTableDesc.getTableName(), snapshotTableDesc);
+            }
+        }
+    }
+
+    public void addMaterializeLookupTableSteps(DefaultChainedExecutable jobFlow, String tableName, SnapshotTableDesc snapshotTableDesc) {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        ExtTableSnapshotInfoManager extTableSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig);
+        TableDesc tableDesc = TableMetadataManager.getInstance(kylinConfig).getTableDesc(tableName, cube.getProject());
+        IReadableTable sourceTable = SourceManager.createReadableTable(tableDesc);
+        try {
+            ExtTableSnapshotInfo latestSnapshot = extTableSnapshotInfoManager.getLatestSnapshot(sourceTable.getSignature(), tableName);
+            if (latestSnapshot != null) {
+                logger.info("there is latest snapshot exist for table:{}, skip build snapshot step.", tableName);
+                jobFlow.addExtraInfo(BatchConstants.LOOKUP_EXT_SNAPSHOT_CONTEXT_PFX + latestSnapshot.getTableName(),
+                        latestSnapshot.getResourcePath());
+                return;
+            }
+        } catch (IOException ioException) {
+            throw new RuntimeException(ioException);
+        }
+        logger.info("add build snapshot steps for table:{}", tableName);
+        String snapshotID = genLookupSnapshotID();
+        addLookupTableConvertToHFilesStep(jobFlow, tableName, snapshotID);
+        addLookupTableHFilesBulkLoadStep(jobFlow, tableName, snapshotID);
+        if (snapshotTableDesc !=null && snapshotTableDesc.isEnableLocalCache()) {
+            addUpdateSnapshotQueryCacheStep(jobFlow, tableName, snapshotID);
+        }
+    }
+
+    private String genLookupSnapshotID() {
+        return UUID.randomUUID().toString();
+    }
+
+    private void addLookupTableConvertToHFilesStep(DefaultChainedExecutable jobFlow, String tableName, String snapshotID) {
+        MapReduceExecutable createHFilesStep = new MapReduceExecutable();
+        createHFilesStep
+                .setName(ExecutableConstants.STEP_NAME_MATERIALIZE_LOOKUP_TABLE_CONVERT_HFILE + ":" + tableName);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd);
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, cube.getName());
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT,
+                getLookupTableHFilePath(tableName, jobFlow.getId()));
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_TABLE_NAME, tableName);
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobFlow.getId());
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_LOOKUP_SNAPSHOT_ID, snapshotID);
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+                "Kylin_LookupTable_HFile_Generator_" + tableName + "_Step");
+
+        createHFilesStep.setMapReduceParams(cmd.toString());
+        createHFilesStep.setMapReduceJobClass(LookupTableToHFileJob.class);
+        createHFilesStep.setCounterSaveAs(BatchConstants.LOOKUP_EXT_SNAPSHOT_SRC_RECORD_CNT_PFX + tableName);
+
+        jobFlow.addTask(createHFilesStep);
+    }
+
+    private void addLookupTableHFilesBulkLoadStep(DefaultChainedExecutable jobFlow, String tableName, String snapshotID) {
+        HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
+        bulkLoadStep.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_LOOKUP_TABLE_BULK_LOAD + ":" + tableName);
+
+        StringBuilder cmd = new StringBuilder();
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT,
+                getLookupTableHFilePath(tableName, jobFlow.getId()));
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_TABLE_NAME, tableName);
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobFlow.getId());
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_LOOKUP_SNAPSHOT_ID, snapshotID);
+
+        bulkLoadStep.setJobParams(cmd.toString());
+        bulkLoadStep.setJobClass(LookupTableHFilesBulkLoadJob.class);
+        jobFlow.addTask(bulkLoadStep);
+    }
+
+    private void addUpdateSnapshotQueryCacheStep(DefaultChainedExecutable jobFlow, String tableName, String snapshotID) {
+        UpdateSnapshotCacheForQueryServersStep updateSnapshotCacheStep = new UpdateSnapshotCacheForQueryServersStep();
+        updateSnapshotCacheStep.setName(ExecutableConstants.STEP_NAME_LOOKUP_SNAPSHOT_CACHE_UPDATE + ":" + tableName);
+
+        LookupExecutableUtil.setProjectName(cube.getProject(), updateSnapshotCacheStep.getParams());
+        LookupExecutableUtil.setLookupTableName(tableName, updateSnapshotCacheStep.getParams());
+        LookupExecutableUtil.setLookupSnapshotID(snapshotID, updateSnapshotCacheStep.getParams());
+        jobFlow.addTask(updateSnapshotCacheStep);
+    }
+
+    private String getLookupTableHFilePath(String tableName, String jobId) {
+        return HBaseConnection.makeQualifiedPathInHBaseCluster(JobBuilderSupport.getJobWorkingDir(config, jobId) + "/"
+                + tableName + "/hfile/");
+    }
+
+    public void appendMapReduceParameters(StringBuilder buf) {
+        appendMapReduceParameters(buf, JobEngineConfig.DEFAUL_JOB_CONF_SUFFIX);
+    }
+
+    public void appendMapReduceParameters(StringBuilder buf, String jobType) {
+        try {
+            String jobConf = config.getHadoopJobConfFilePath(jobType);
+            if (jobConf != null && jobConf.length() > 0) {
+                buf.append(" -conf ").append(jobConf);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMaterializer.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMaterializer.java
new file mode 100644
index 0000000000..cf28ed6d13
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMaterializer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.model.SnapshotTableDesc;
+import org.apache.kylin.engine.mr.ILookupMaterializer;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class HBaseLookupMaterializer implements ILookupMaterializer{
+
+    @Override
+    public void materializeLookupTable(DefaultChainedExecutable jobFlow, CubeInstance cube, String lookupTableName) {
+        HBaseLookupMRSteps lookupMRSteps = new HBaseLookupMRSteps(cube);
+        SnapshotTableDesc snapshotTableDesc = cube.getDescriptor().getSnapshotTableDesc(lookupTableName);
+        lookupMRSteps.addMaterializeLookupTableSteps(jobFlow, lookupTableName, snapshotTableDesc);
+    }
+
+    @Override
+    public void materializeLookupTablesForCube(DefaultChainedExecutable jobFlow, CubeInstance cube) {
+        HBaseLookupMRSteps lookupMRSteps = new HBaseLookupMRSteps(cube);
+        lookupMRSteps.addMaterializeLookupTablesSteps(jobFlow);
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupProvider.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupProvider.java
new file mode 100644
index 0000000000..3e8c2c5d6f
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.IExtLookupProvider;
+import org.apache.kylin.dict.lookup.IExtLookupTableCache;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.cache.RocksDBLookupTableCache;
+import org.apache.kylin.engine.mr.ILookupMaterializer;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  Use HBase as lookup table storage
+ */
+public class HBaseLookupProvider implements IExtLookupProvider{
+    protected static final Logger logger = LoggerFactory.getLogger(HBaseLookupProvider.class);
+
+
+    @Override
+    public ILookupTable getLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) {
+        return new HBaseLookupTable(tableDesc, extTableSnapshot);
+    }
+
+    @Override
+    public IExtLookupTableCache getLocalCache() {
+        return RocksDBLookupTableCache.getInstance(KylinConfig.getInstanceFromEnv());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+        if (engineInterface == ILookupMaterializer.class) {
+            return (I) new HBaseLookupMaterializer();
+        } else {
+            throw new RuntimeException("Cannot adapt to " + engineInterface);
+        }
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoder.java
new file mode 100644
index 0000000000..9ceabd2721
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoder.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ShardingHash;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.dict.lookup.AbstractLookupRowEncoder;
+import org.apache.kylin.metadata.model.TableDesc;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.storage.hbase.lookup.HBaseLookupRowEncoder.HBaseRow;
+
+/**
+ * encode/decode original table row to hBase row
+ * 
+ */
+public class HBaseLookupRowEncoder extends AbstractLookupRowEncoder<HBaseRow> {
+    public static final String CF_STRING = "F";
+    public static final byte[] CF = Bytes.toBytes(CF_STRING);
+
+    private int shardNum;
+
+    public HBaseLookupRowEncoder(TableDesc tableDesc, String[] keyColumns, int shardNum) {
+        super(tableDesc, keyColumns);
+        this.shardNum = shardNum;
+    }
+
+    @Override
+    public HBaseRow encode(String[] row) {
+        String[] keys = getKeyData(row);
+        String[] values = getValueData(row);
+        byte[] rowKey = encodeRowKey(keys);
+        NavigableMap<byte[], byte[]> qualifierValMap = Maps
+                .newTreeMap(org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR);
+        for (int i = 0; i < values.length; i++) {
+            byte[] qualifier = Bytes.toBytes(String.valueOf(valueIndexes[i]));
+            byte[] byteValue = toBytes(values[i]);
+            qualifierValMap.put(qualifier, byteValue);
+        }
+        return new HBaseRow(rowKey, qualifierValMap);
+    }
+
+    @Override
+    public String[] decode(HBaseRow hBaseRow) {
+        if (hBaseRow == null) {
+            return null;
+        }
+        String[] result = new String[columnsNum];
+        fillKeys(hBaseRow.rowKey, result);
+        fillValues(hBaseRow.qualifierValMap, result);
+
+        return result;
+    }
+
+    public byte[] encodeRowKey(String[] keys) {
+        keyByteBuffer.clear();
+        for (String key : keys) {
+            if (key == null) {
+                throw new IllegalArgumentException("key cannot be null:" + Arrays.toString(keys));
+            }
+            byte[] byteKey = Bytes.toBytes(key);
+            keyByteBuffer.putShort((short) byteKey.length);
+            keyByteBuffer.put(byteKey);
+        }
+        byte[] result = new byte[RowConstants.ROWKEY_SHARDID_LEN + keyByteBuffer.position()];
+        System.arraycopy(keyByteBuffer.array(), 0, result, RowConstants.ROWKEY_SHARDID_LEN, keyByteBuffer.position());
+        short shard = ShardingHash.getShard(result, RowConstants.ROWKEY_SHARDID_LEN, result.length, shardNum);
+        BytesUtil.writeShort(shard, result, 0, RowConstants.ROWKEY_SHARDID_LEN);
+        return result;
+    }
+
+    private void fillKeys(byte[] rowKey, String[] result) {
+        int keyNum = keyIndexes.length;
+        ByteBuffer byteBuffer = ByteBuffer.wrap(rowKey);
+        byteBuffer.getShort(); // read shard
+        for (int i = 0; i < keyNum; i++) {
+            short keyLen = byteBuffer.getShort();
+            byte[] keyBytes = new byte[keyLen];
+            byteBuffer.get(keyBytes);
+            result[keyIndexes[i]] = Bytes.toString(keyBytes);
+        }
+    }
+
+    private void fillValues(Map<byte[], byte[]> qualifierValMap, String[] result) {
+        for (Entry<byte[], byte[]> qualifierValEntry : qualifierValMap.entrySet()) {
+            byte[] qualifier = qualifierValEntry.getKey();
+            byte[] value = qualifierValEntry.getValue();
+            int valIdx = Integer.valueOf(Bytes.toString(qualifier));
+            result[valIdx] = fromBytes(value);
+        }
+    }
+
+    public static class HBaseRow {
+        private byte[] rowKey;
+        private NavigableMap<byte[], byte[]> qualifierValMap;
+
+        public HBaseRow(byte[] rowKey, NavigableMap<byte[], byte[]> qualifierValMap) {
+            this.rowKey = rowKey;
+            this.qualifierValMap = qualifierValMap;
+        }
+
+        public byte[] getRowKey() {
+            return rowKey;
+        }
+
+        public NavigableMap<byte[], byte[]> getQualifierValMap() {
+            return qualifierValMap;
+        }
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupTable.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupTable.java
new file mode 100644
index 0000000000..78877f7954
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupTable.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.lookup.HBaseLookupRowEncoder.HBaseRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  Use HBase as lookup table storage
+ */
+public class HBaseLookupTable implements ILookupTable{
+    protected static final Logger logger = LoggerFactory.getLogger(HBaseLookupTable.class);
+
+    private TableName lookupTableName;
+    private Table table;
+
+    private HBaseLookupRowEncoder encoder;
+
+    public HBaseLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) {
+        String tableName = extTableSnapshot.getStorageLocationIdentifier();
+        this.lookupTableName = TableName.valueOf(tableName);
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        Connection connection = HBaseConnection.get(kylinConfig.getStorageUrl());
+        try {
+            table = connection.getTable(lookupTableName);
+        } catch (IOException e) {
+            throw new RuntimeException("error when connect HBase", e);
+        }
+
+        String[] keyColumns = extTableSnapshot.getKeyColumns();
+        encoder = new HBaseLookupRowEncoder(tableDesc, keyColumns, extTableSnapshot.getShardNum());
+    }
+
+    @Override
+    public String[] getRow(Array<String> key) {
+        byte[] encodedKey = encoder.encodeRowKey(key.data);
+        Get get = new Get(encodedKey);
+        try {
+            Result result = table.get(get);
+            if (result.isEmpty()) {
+                return null;
+            }
+            return encoder.decode(new HBaseRow(result.getRow(), result.getFamilyMap(HBaseLookupRowEncoder.CF)));
+        } catch (IOException e) {
+            throw new RuntimeException("error when get row from hBase", e);
+        }
+    }
+
+    @Override
+    public Iterator<String[]> iterator() {
+        return new HBaseScanBasedIterator(table);
+    }
+
+    @Override
+    public void close() throws IOException{
+        table.close();
+    }
+
+    private class HBaseScanBasedIterator implements Iterator<String[]> {
+        private Iterator<Result> scannerIterator;
+        private long counter;
+
+        public HBaseScanBasedIterator(Table table) {
+            try {
+                Scan scan = new Scan();
+                scan.setCaching(1000);
+                ResultScanner scanner = table.getScanner(HBaseLookupRowEncoder.CF);
+                scannerIterator = scanner.iterator();
+            } catch (IOException e) {
+                logger.error("error when scan HBase", e);
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return scannerIterator.hasNext();
+        }
+
+        @Override
+        public String[] next() {
+            counter ++;
+            if (counter % 100000 == 0) {
+                logger.info("scanned {} rows from hBase", counter);
+            }
+            Result result = scannerIterator.next();
+            byte[] rowKey = result.getRow();
+            NavigableMap<byte[], byte[]> qualifierValMap = result.getFamilyMap(HBaseLookupRowEncoder.CF);
+            return encoder.decode(new HBaseRow(rowKey, qualifierValMap));
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("remove is not supported");
+        }
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/KVSortReducerWithDupKeyCheck.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/KVSortReducerWithDupKeyCheck.java
new file mode 100644
index 0000000000..3daffa3cd0
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/KVSortReducerWithDupKeyCheck.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Most code from {@link KeyValueSortReducer}, add logic to check whether the row key has duplicated
+ * if there is duplicated key, throws IllegalStateException
+ */
+public class KVSortReducerWithDupKeyCheck extends KeyValueSortReducer {
+    protected void reduce(
+            ImmutableBytesWritable row,
+            java.lang.Iterable<KeyValue> kvs,
+            org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
+            throws java.io.IOException, InterruptedException {
+        TreeSet<KeyValue> map = new TreeSet<>(KeyValue.COMPARATOR);
+
+        TreeSet<byte[]> qualifierSet = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+        for (KeyValue kv : kvs) {
+            byte[] qualifier = CellUtil.cloneQualifier(kv);
+            if (qualifierSet.contains(qualifier)) {
+                throw new IllegalStateException("there is duplicate key:" + row);
+            }
+            qualifierSet.add(qualifier);
+            try {
+                map.add(kv.clone());
+            } catch (CloneNotSupportedException e) {
+                throw new java.io.IOException(e);
+            }
+        }
+        context.setStatus("Read " + map.getClass());
+        int index = 0;
+        for (KeyValue kv : map) {
+            context.write(row, kv);
+            if (++index % 100 == 0)
+                context.setStatus("Wrote " + index);
+        }
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableHFilesBulkLoadJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableHFilesBulkLoadJob.java
new file mode 100644
index 0000000000..5598ed9ed5
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableHFilesBulkLoadJob.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.io.IOException;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LookupTableHFilesBulkLoadJob extends AbstractHadoopJob {
+
+    protected static final Logger logger = LoggerFactory.getLogger(LookupTableHFilesBulkLoadJob.class);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_TABLE_NAME);
+        options.addOption(OPTION_CUBING_JOB_ID);
+        options.addOption(OPTION_LOOKUP_SNAPSHOT_ID);
+        parseOptions(options, args);
+
+        String tableName = getOptionValue(OPTION_TABLE_NAME);
+        String cubingJobID = getOptionValue(OPTION_CUBING_JOB_ID);
+        String snapshotID = getOptionValue(OPTION_LOOKUP_SNAPSHOT_ID);
+
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        ExecutableManager execMgr = ExecutableManager.getInstance(kylinConfig);
+        DefaultChainedExecutable job = (DefaultChainedExecutable) execMgr.getJob(cubingJobID);
+
+        ExtTableSnapshotInfoManager extTableSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig);
+        ExtTableSnapshotInfo snapshot = extTableSnapshotInfoManager.getSnapshot(tableName, snapshotID);
+        long srcTableRowCnt = Long.valueOf(job.findExtraInfoBackward(BatchConstants.LOOKUP_EXT_SNAPSHOT_SRC_RECORD_CNT_PFX + tableName, "-1"));
+        logger.info("update table:{} snapshot row count:{}", tableName, srcTableRowCnt);
+        snapshot.setRowCnt(srcTableRowCnt);
+        snapshot.setLastBuildTime(System.currentTimeMillis());
+        extTableSnapshotInfoManager.updateSnapshot(snapshot);
+
+        String hTableName = snapshot.getStorageLocationIdentifier();
+        // e.g
+        // /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
+        // end with "/"
+        String input = getOptionValue(OPTION_INPUT_PATH);
+
+        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
+        FsShell shell = new FsShell(conf);
+
+        int exitCode = -1;
+        int retryCount = 10;
+        while (exitCode != 0 && retryCount >= 1) {
+            exitCode = shell.run(new String[] { "-chmod", "-R", "777", input });
+            retryCount--;
+            Thread.sleep(5000);
+        }
+
+        if (exitCode != 0) {
+            logger.error("Failed to change the file permissions: " + input);
+            throw new IOException("Failed to change the file permissions: " + input);
+        }
+
+        String[] newArgs = new String[2];
+        newArgs[0] = input;
+        newArgs[1] = hTableName;
+
+        logger.debug("Start to run LoadIncrementalHFiles");
+        int ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs);
+        logger.debug("End to run LoadIncrementalHFiles");
+        return ret;
+    }
+
+    public static void main(String[] args) throws Exception {
+        int exitCode = ToolRunner.run(new LookupTableHFilesBulkLoadJob(), args);
+        System.exit(exitCode);
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java
new file mode 100644
index 0000000000..aac0108d7f
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.cli.Options;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinVersion;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.IReadableTable.TableSignature;
+import org.apache.kylin.source.SourceManager;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.steps.CubeHTableUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LookupTableToHFileJob extends AbstractHadoopJob {
+    protected static final Logger logger = LoggerFactory.getLogger(LookupTableToHFileJob.class);
+
+    private static String ALPHA_NUM = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+
+    private static int HBASE_TABLE_LENGTH = 10;
+
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_OUTPUT_PATH);
+            options.addOption(OPTION_TABLE_NAME);
+            options.addOption(OPTION_CUBING_JOB_ID);
+            options.addOption(OPTION_LOOKUP_SNAPSHOT_ID);
+            parseOptions(options, args);
+
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+            String tableName = getOptionValue(OPTION_TABLE_NAME);
+            String cubingJobID = getOptionValue(OPTION_CUBING_JOB_ID);
+            String lookupSnapshotID = getOptionValue(OPTION_LOOKUP_SNAPSHOT_ID);
+
+            KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+            CubeManager cubeMgr = CubeManager.getInstance(kylinConfig);
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+
+            TableDesc tableDesc = TableMetadataManager.getInstance(kylinConfig).getTableDesc(tableName,
+                    cube.getProject());
+
+            ExtTableSnapshotInfoManager extSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig);
+            removeSnapshotIfExist(extSnapshotInfoManager, kylinConfig, tableName, lookupSnapshotID);
+
+            IReadableTable sourceTable = SourceManager.createReadableTable(tableDesc);
+
+            logger.info("create HTable for source table snapshot:{}", tableName);
+            Pair<String, Integer> hTableNameAndShard = createHTable(tableName, sourceTable, kylinConfig);
+            String[] keyColumns = getLookupKeyColumns(cube, tableName);
+            ExtTableSnapshotInfo snapshot = createSnapshotResource(extSnapshotInfoManager, tableName, lookupSnapshotID,
+                    keyColumns, hTableNameAndShard.getFirst(), hTableNameAndShard.getSecond(), sourceTable);
+            logger.info("created snapshot information at:{}", snapshot.getResourcePath());
+            saveSnapshotInfoToJobContext(kylinConfig, cubingJobID, snapshot);
+
+            job = Job.getInstance(HBaseConfiguration.create(getConf()), getOptionValue(OPTION_JOB_NAME));
+
+            setJobClasspath(job, cube.getConfig());
+            // For separate HBase cluster, note the output is a qualified HDFS path if "kylin.storage.hbase.cluster-fs" is configured, ref HBaseMRSteps.getHFilePath()
+            HBaseConnection.addHBaseClusterNNHAConfiguration(job.getConfiguration());
+
+            FileOutputFormat.setOutputPath(job, output);
+
+            IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(tableDesc);
+            tableInputFormat.configureJob(job);
+            job.setMapperClass(LookupTableToHFileMapper.class);
+
+            // set job configuration
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_TABLE_NAME, tableName);
+            // set block replication to 3 for hfiles
+            job.getConfiguration().set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
+            job.getConfiguration().set(BatchConstants.CFG_SHARD_NUM, String.valueOf(hTableNameAndShard.getSecond()));
+            // add metadata to distributed cache
+            attachCubeMetadata(cube, job.getConfiguration());
+
+            Connection conn = getHBaseConnection(kylinConfig);
+            HTable htable = (HTable) conn.getTable(TableName.valueOf(hTableNameAndShard.getFirst()));
+
+            // Automatic config !
+            HFileOutputFormat2.configureIncrementalLoad(job, htable, htable.getRegionLocator());
+
+            job.setReducerClass(KVSortReducerWithDupKeyCheck.class);
+
+            this.deletePath(job.getConfiguration(), output);
+
+            return waitForCompletion(job);
+        } finally {
+            if (job != null)
+                cleanupTempConfFile(job.getConfiguration());
+        }
+    }
+
+    private void removeSnapshotIfExist(ExtTableSnapshotInfoManager extSnapshotInfoManager, KylinConfig kylinConfig,
+            String tableName, String lookupSnapshotID) throws IOException {
+        ExtTableSnapshotInfo snapshotInfo = null;
+        try {
+            snapshotInfo = extSnapshotInfoManager.getSnapshot(tableName, lookupSnapshotID);
+        } catch (Exception e) {
+            // swallow the exception, means not snapshot exist of this snapshot id
+        }
+        if (snapshotInfo == null) {
+            return;
+        }
+        logger.info("the table:{} snapshot:{} exist, remove it", tableName, lookupSnapshotID);
+        extSnapshotInfoManager.removeSnapshot(tableName, lookupSnapshotID);
+        String hTableName = snapshotInfo.getStorageLocationIdentifier();
+        logger.info("remove related HBase table:{} for snapshot:{}", hTableName, lookupSnapshotID);
+        Connection conn = getHBaseConnection(kylinConfig);
+        Admin admin = conn.getAdmin();
+        admin.deleteTable(TableName.valueOf(hTableName));
+    }
+
+    private String[] getLookupKeyColumns(CubeInstance cube, String tableName) {
+        CubeDesc cubeDesc = cube.getDescriptor();
+        DataModelDesc modelDesc = cubeDesc.getModel();
+        TableRef lookupTableRef = null;
+        for (TableRef tableRef : modelDesc.getLookupTables()) {
+            if (tableRef.getTableIdentity().equalsIgnoreCase(tableName)) {
+                lookupTableRef = tableRef;
+                break;
+            }
+        }
+        if (lookupTableRef == null) {
+            throw new IllegalStateException("cannot find table in model:" + tableName);
+        }
+        JoinDesc joinDesc = modelDesc.getJoinByPKSide(lookupTableRef);
+        TblColRef[] keyColRefs = joinDesc.getPrimaryKeyColumns();
+        String[] result = new String[keyColRefs.length];
+        for (int i = 0; i < keyColRefs.length; i++) {
+            result[i] = keyColRefs[i].getName();
+        }
+        return result;
+    }
+
+    private void saveSnapshotInfoToJobContext(KylinConfig kylinConfig, String jobID, ExtTableSnapshotInfo snapshot) {
+        ExecutableManager execMgr = ExecutableManager.getInstance(kylinConfig);
+        DefaultChainedExecutable job = (DefaultChainedExecutable) execMgr.getJob(jobID);
+        job.addExtraInfo(BatchConstants.LOOKUP_EXT_SNAPSHOT_CONTEXT_PFX + snapshot.getTableName(),
+                snapshot.getResourcePath());
+    }
+
+    /**
+     *
+     * @param sourceTableName
+     * @param sourceTable
+     * @param kylinConfig
+     * @return Pair of HTableName and shard number
+     * @throws IOException
+     */
+    private Pair<String, Integer> createHTable(String sourceTableName, IReadableTable sourceTable,
+            KylinConfig kylinConfig) throws IOException {
+        TableSignature signature = sourceTable.getSignature();
+        int shardNum = calculateShardNum(kylinConfig, signature.getSize());
+        Connection conn = getHBaseConnection(kylinConfig);
+        Admin admin = conn.getAdmin();
+        String hTableName = genHTableName(kylinConfig, admin, sourceTableName);
+
+        TableName tableName = TableName.valueOf(hTableName);
+        HTableDescriptor hTableDesc = new HTableDescriptor(tableName);
+        hTableDesc.setCompactionEnabled(false);
+        hTableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
+        hTableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
+        hTableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
+        String commitInfo = KylinVersion.getGitCommitInfo();
+        if (!StringUtils.isEmpty(commitInfo)) {
+            hTableDesc.setValue(IRealizationConstants.HTableGitTag, commitInfo);
+        }
+
+        HColumnDescriptor cf = CubeHTableUtil.createColumnFamily(kylinConfig, HBaseLookupRowEncoder.CF_STRING, false);
+        hTableDesc.addFamily(cf);
+
+        try {
+            if (shardNum > 1) {
+                admin.createTable(hTableDesc, getSplitsByShardNum(shardNum));
+            } else {
+                admin.createTable(hTableDesc);
+            }
+        } finally {
+            IOUtils.closeQuietly(admin);
+        }
+        return new Pair<>(hTableName, shardNum);
+    }
+
+    private int calculateShardNum(KylinConfig kylinConfig, long dataSize) {
+        long shardSize = kylinConfig.getExtTableSnapshotShardingMB() * 1024 * 1024;
+        return dataSize < shardSize ? 1 : (int) (Math.ceil(dataSize / shardSize));
+    }
+
+    private byte[][] getSplitsByShardNum(int shardNum) {
+        byte[][] result = new byte[shardNum - 1][];
+        for (int i = 1; i < shardNum; ++i) {
+            byte[] split = new byte[RowConstants.ROWKEY_SHARDID_LEN];
+            BytesUtil.writeUnsigned(i, split, 0, RowConstants.ROWKEY_SHARDID_LEN);
+            result[i - 1] = split;
+        }
+        return result;
+    }
+
+    private ExtTableSnapshotInfo createSnapshotResource(ExtTableSnapshotInfoManager extSnapshotInfoManager,
+            String tableName, String snapshotID, String[] keyColumns, String hTableName, int shardNum,
+            IReadableTable sourceTable) throws IOException {
+        return extSnapshotInfoManager.createSnapshot(sourceTable.getSignature(), tableName, snapshotID, keyColumns,
+                shardNum, ExtTableSnapshotInfo.STORAGE_TYPE_HBASE, hTableName);
+    }
+
+    private String genHTableName(KylinConfig kylinConfig, Admin admin, String tableName) throws IOException {
+        String namePrefix = kylinConfig.getHBaseTableNamePrefix()
+                + IRealizationConstants.LookupHbaseStorageLocationPrefix + tableName + "_";
+        String namespace = kylinConfig.getHBaseStorageNameSpace();
+        String hTableName;
+        Random ran = new Random();
+        do {
+            StringBuffer sb = new StringBuffer();
+            if ((namespace.equals("default") || namespace.equals("")) == false) {
+                sb.append(namespace).append(":");
+            }
+            sb.append(namePrefix);
+            for (int i = 0; i < HBASE_TABLE_LENGTH; i++) {
+                sb.append(ALPHA_NUM.charAt(ran.nextInt(ALPHA_NUM.length())));
+            }
+            hTableName = sb.toString();
+        } while (hTableExists(admin, hTableName));
+
+        return hTableName;
+    }
+
+    private boolean hTableExists(Admin admin, String hTableName) throws IOException {
+        return admin.tableExists(TableName.valueOf(hTableName));
+    }
+
+    private Connection getHBaseConnection(KylinConfig kylinConfig) throws IOException {
+        return HBaseConnection.get(kylinConfig.getStorageUrl());
+    }
+
+    public static void main(String[] args) throws Exception {
+        int exitCode = ToolRunner.run(new LookupTableToHFileJob(), args);
+        System.exit(exitCode);
+    }
+
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java
new file mode 100644
index 0000000000..4be9533b08
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.hbase.lookup.HBaseLookupRowEncoder.HBaseRow;
+
+public class LookupTableToHFileMapper<KEYIN> extends KylinMapper<KEYIN, Object, ImmutableBytesWritable, KeyValue> {
+    ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
+
+    private String cubeName;
+    private CubeDesc cubeDesc;
+    private String tableName;
+    private int shardNum;
+    private IMRTableInputFormat lookupTableInputFormat;
+    private long timestamp = 0;
+    private HBaseLookupRowEncoder encoder;
+
+    @Override
+    protected void doSetup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
+        tableName = context.getConfiguration().get(BatchConstants.CFG_TABLE_NAME);
+        shardNum = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_SHARD_NUM));
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        CubeManager cubeMgr = CubeManager.getInstance(config);
+        cubeDesc = cubeMgr.getCube(cubeName).getDescriptor();
+        DataModelDesc modelDesc = cubeDesc.getModel();
+        TableDesc tableDesc = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(
+                tableName, cubeDesc.getProject());
+        TableRef lookupTableRef = null;
+        for (TableRef tableRef : modelDesc.getLookupTables()) {
+            if (tableRef.getTableIdentity().equalsIgnoreCase(tableName)) {
+                lookupTableRef = tableRef;
+                break;
+            }
+        }
+        JoinDesc joinDesc = modelDesc.getJoinByPKSide(lookupTableRef);
+        TblColRef[] keyColRefs = joinDesc.getPrimaryKeyColumns();
+        String[] keyColumns = new String[keyColRefs.length];
+        for (int i = 0; i < keyColRefs.length; i++) {
+            keyColumns[i] = keyColRefs[i].getName();
+        }
+        encoder = new HBaseLookupRowEncoder(tableDesc, keyColumns, shardNum);
+        lookupTableInputFormat = MRUtil.getTableInputFormat(tableDesc);
+    }
+
+    @Override
+    public void doMap(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
+        Collection<String[]> rowCollection = lookupTableInputFormat.parseMapperInput(value);
+        for (String[] row : rowCollection) {
+            HBaseRow hBaseRow = encoder.encode(row);
+
+            byte[] rowKey = hBaseRow.getRowKey();
+            Map<byte[], byte[]> qualifierValMap = hBaseRow.getQualifierValMap();
+            outputKey.set(rowKey);
+            for (Entry<byte[], byte[]> qualifierValEntry : qualifierValMap.entrySet()) {
+                KeyValue outputValue = createKeyValue(rowKey, qualifierValEntry.getKey(), qualifierValEntry.getValue());
+                context.write(outputKey, outputValue);
+            }
+        }
+    }
+
+    private KeyValue createKeyValue(byte[] keyBytes, byte[] qualifier, byte[] value) {
+        return new KeyValue(keyBytes, 0, keyBytes.length, //
+                HBaseLookupRowEncoder.CF, 0, HBaseLookupRowEncoder.CF.length, //
+                qualifier, 0, qualifier.length, //
+                timestamp, KeyValue.Type.Put, //
+                value, 0, value.length);
+    }
+
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStep.java
new file mode 100644
index 0000000000..409116d08f
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStep.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.restclient.RestClient;
+import org.apache.kylin.dict.lookup.IExtLookupTableCache.CacheState;
+import org.apache.kylin.engine.mr.steps.lookup.LookupExecutableUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.execution.ExecuteResult.State;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateSnapshotCacheForQueryServersStep extends AbstractExecutable {
+    private static final Logger logger = LoggerFactory.getLogger(UpdateSnapshotCacheForQueryServersStep.class);
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        final String tableName = LookupExecutableUtil.getLookupTableName(this.getParams());
+        final String snapshotID = LookupExecutableUtil.getLookupSnapshotID(this.getParams());
+        final String projectName = LookupExecutableUtil.getProjectName(this.getParams());
+
+        final KylinConfig config = KylinConfig.getInstanceFromEnv();
+        int checkInterval = 10 * 1000;
+        int maxCheckTime = 10 * 60 * 1000;
+
+        StringWriter outputWriter = new StringWriter();
+        PrintWriter pw = new PrintWriter(outputWriter);
+        String[] restServers = config.getRestServers();
+        List<String> serversNeedCheck = Lists.newArrayList();
+        for (String restServer : restServers) {
+            logger.info("send build lookup table cache request to server: " + restServer);
+            try {
+                RestClient restClient = new RestClient(restServer);
+                restClient.buildLookupSnapshotCache(projectName, tableName, snapshotID);
+                serversNeedCheck.add(restServer);
+            } catch (IOException e) {
+                logger.error("error when send build cache request to rest server:" + restServer, e);
+                pw.println("cache build fail for rest server:" + restServer);
+            }
+        }
+        if (serversNeedCheck.isEmpty()) {
+            return new ExecuteResult(State.SUCCEED, outputWriter.toString());
+        }
+
+        List<String> completeServers = Lists.newArrayList();
+        long startTime = System.currentTimeMillis();
+        while ((System.currentTimeMillis() - startTime) < maxCheckTime) {
+            serversNeedCheck.removeAll(completeServers);
+            if (serversNeedCheck.isEmpty()) {
+                break;
+            }
+            for (String restServer : serversNeedCheck) {
+                logger.info("check lookup table cache build status for server: " + restServer);
+                try {
+                    RestClient restClient = new RestClient(restServer);
+                    String stateName = restClient.getLookupSnapshotCacheState(tableName, snapshotID);
+                    if (!stateName.equals(CacheState.IN_BUILDING.name())) {
+                        completeServers.add(restServer);
+                        pw.println("cache build complete for rest server:" + restServer + " cache state:" + stateName);
+                    }
+                } catch (IOException e) {
+                    logger.error("error when send build cache request to rest server:" + restServer, e);
+                }
+            }
+            try {
+                Thread.sleep(checkInterval);
+            } catch (InterruptedException e) {
+                logger.error("interrupted", e);
+            }
+        }
+        serversNeedCheck.removeAll(completeServers);
+        if (!serversNeedCheck.isEmpty()) {
+            pw.println();
+            pw.println("check timeout!");
+            pw.println("servers not complete:" + serversNeedCheck);
+        }
+        return new ExecuteResult(State.SUCCEED, outputWriter.toString());
+    }
+
+}
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoderTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoderTest.java
new file mode 100644
index 0000000000..2b21ae3616
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoderTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.NavigableMap;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.storage.hbase.lookup.HBaseLookupRowEncoder.HBaseRow;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class HBaseLookupRowEncoderTest extends LocalFileMetadataTestCase {
+    private TableDesc tableDesc;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        TableMetadataManager metadataManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        tableDesc = metadataManager.getTableDesc("TEST_COUNTRY", "default");
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testEnDeCode() {
+        HBaseLookupRowEncoder lookupRowEncoder = new HBaseLookupRowEncoder(tableDesc, new String[] { "COUNTRY" }, 1);
+        String[] row = new String[] { "AD", "42.546245", "1.601554", "Andorra" };
+        HBaseRow hBaseRow = lookupRowEncoder.encode(row);
+
+        assertEquals(6, hBaseRow.getRowKey().length);
+        assertEquals(3, hBaseRow.getQualifierValMap().size());
+        NavigableMap<byte[], byte[]> qualifierMap = hBaseRow.getQualifierValMap();
+        assertEquals("42.546245", Bytes.toString(qualifierMap.get(Bytes.toBytes("1"))));
+        assertEquals("1.601554", Bytes.toString(qualifierMap.get(Bytes.toBytes("2"))));
+        String[] decodeRow = lookupRowEncoder.decode(hBaseRow);
+        assertArrayEquals(row, decodeRow);
+    }
+
+    @Test
+    public void testEnDeCodeWithNullValue() {
+        HBaseLookupRowEncoder lookupRowEncoder = new HBaseLookupRowEncoder(tableDesc, new String[] { "COUNTRY" }, 1);
+        String[] row = new String[] { "AD", "42.546245", "1.601554", null };
+        HBaseRow hBaseRow = lookupRowEncoder.encode(row);
+
+        assertEquals(6, hBaseRow.getRowKey().length);
+        assertEquals(3, hBaseRow.getQualifierValMap().size());
+        NavigableMap<byte[], byte[]> qualifierMap = hBaseRow.getQualifierValMap();
+        assertEquals("42.546245", Bytes.toString(qualifierMap.get(Bytes.toBytes("1"))));
+        assertEquals("1.601554", Bytes.toString(qualifierMap.get(Bytes.toBytes("2"))));
+        String[] decodeRow = lookupRowEncoder.decode(hBaseRow);
+        assertNull(decodeRow[3]);
+        assertArrayEquals(row, decodeRow);
+    }
+
+    @Test
+    public void testEnDeCodeWithMultiKeys() {
+        HBaseLookupRowEncoder lookupRowEncoder = new HBaseLookupRowEncoder(tableDesc,
+                new String[] { "COUNTRY", "NAME" }, 1);
+        String[] row = new String[] { "AD", "42.546245", "1.601554", "Andorra" };
+        HBaseRow hBaseRow = lookupRowEncoder.encode(row);
+
+        assertEquals(2, hBaseRow.getQualifierValMap().size());
+        NavigableMap<byte[], byte[]> qualifierMap = hBaseRow.getQualifierValMap();
+        assertEquals("42.546245", Bytes.toString(qualifierMap.get(Bytes.toBytes("1"))));
+        assertEquals("1.601554", Bytes.toString(qualifierMap.get(Bytes.toBytes("2"))));
+        String[] decodeRow = lookupRowEncoder.decode(hBaseRow);
+        assertArrayEquals(row, decodeRow);
+    }
+
+}
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJobTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJobTest.java
new file mode 100644
index 0000000000..cbd046103f
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJobTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+//package org.apache.kylin.storage.hbase.lookup;
+//
+//import org.apache.hadoop.conf.Configuration;
+//import org.apache.hadoop.mapreduce.Job;
+//import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+//import org.apache.kylin.cube.CubeSegment;
+//import org.apache.kylin.engine.mr.CubingJob;
+//import org.apache.kylin.job.engine.JobEngineConfig;
+//import org.junit.After;
+//import org.junit.Before;
+//import org.junit.Rule;
+//import org.junit.Test;
+//import org.powermock.api.mockito.PowerMockito;
+//import org.powermock.core.classloader.annotations.PrepareForTest;
+//import org.powermock.modules.junit4.rule.PowerMockRule;
+//
+//@PrepareForTest({ LookupTableToHFileJob.class, Job.class})
+//public class LookupTableToHFileJobTest extends LocalFileMetadataTestCase {
+//
+//    @Rule
+//    public PowerMockRule rule = new PowerMockRule();
+//
+//    @Before
+//    public void setup() throws Exception {
+//        createTestMetadata();
+//    }
+//
+//    @After
+//    public void after() throws Exception {
+//        cleanupTestMetadata();
+//    }
+//
+//    @Test
+//    public void testRun() throws Exception {
+//        String cubeName = "test_kylin_cube_with_slr_1_new_segment";
+//        String segmentID = "198va32a-a33e-4b69-83dd-0bb8b1f8c53b";
+//        CubeInstance cubeInstance = CubeManager.getInstance(getTestConfig()).getCube(cubeName);
+//        CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentID);
+//
+//        Configuration conf = HadoopUtil.getCurrentConfiguration();
+//        conf.set("fs.defaultFS", "file:///");
+//        conf.set("mapreduce.framework.name", "local");
+//        conf.set("mapreduce.application.framework.path", "");
+//        conf.set("fs.file.impl.disable.cache", "true");
+//
+//        FileSystem localFileSystem = new LocalFileSystem();
+//        URI uri = URI.create("file:///");
+//        localFileSystem.initialize(uri, conf);
+//
+//        Job mockedJob = createMockMRJob(conf);
+//        PowerMockito.stub(PowerMockito.method(Job.class, "getInstance", Configuration.class, String.class))
+//                .toReturn(mockedJob);
+//        PowerMockito.stub(PowerMockito.method(Job.class, "getInstance", Configuration.class, String.class))
+//                .toReturn(mockedJob);
+//
+//        StringBuilder cmd = new StringBuilder();
+//        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+//                "Build_Lookup_Table_For_Segment_20130331080000_20131212080000_Step");
+//        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME,
+//                cubeName);
+//        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID,
+//                segmentID);
+//        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getOutputPath());
+//        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_TABLE_NAME, "EDW.TEST_SITES");
+//
+//        LookupTableToHFileJob job = new LookupTableToHFileJob();
+//        job.setConf(conf);
+//        job.setAsync(true);
+//
+//        String[] args = cmd.toString().trim().split("\\s+");
+//        job.run(args);
+//    }
+//
+//    private String getOutputPath() {
+//        return "_tmp_output";
+//    }
+//
+//    private CubingJob createMockCubingJob(CubeSegment cubeSeg) {
+//        JobEngineConfig jobEngineConfig = new JobEngineConfig(getTestConfig());
+//        CubingJob cubingJob = CubingJob.createBuildJob(cubeSeg, "unitTest", jobEngineConfig);
+//
+//        return cubingJob;
+//    }
+//
+//    private Job createMockMRJob(Configuration conf) throws Exception {
+//        Job job = PowerMockito.mock(Job.class);
+//        PowerMockito.when(job.getConfiguration()).thenReturn(conf);
+//        PowerMockito.doNothing().when(job).submit();
+//        return job;
+//    }
+
+//}
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStepTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStepTest.java
new file mode 100644
index 0000000000..e98762dbab
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStepTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.DefaultContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class UpdateSnapshotCacheForQueryServersStepTest extends LocalFileMetadataTestCase {
+    private KylinConfig kylinConfig;
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testExecute() throws ExecuteException {
+        UpdateSnapshotCacheForQueryServersStep step = new UpdateSnapshotCacheForQueryServersStep();
+        ExecuteResult result = step.doWork(new DefaultContext(Maps.<String, Executable>newConcurrentMap(), kylinConfig));
+        System.out.println(result.output());
+        assertTrue(result.succeed());
+    }
+}
diff --git a/webapp/app/js/controllers/cubeAdvanceSetting.js b/webapp/app/js/controllers/cubeAdvanceSetting.js
index 5687e12793..bb1bb1aa23 100755
--- a/webapp/app/js/controllers/cubeAdvanceSetting.js
+++ b/webapp/app/js/controllers/cubeAdvanceSetting.js
@@ -445,4 +445,63 @@ KylinApp.controller('CubeAdvanceSettingCtrl', function ($scope, $modal,cubeConfi
       $scope.$emit('AdvancedSettingEdited');
     });
   }
+
+  $scope.newSnapshot = {
+    select: {}
+  };
+
+  $scope.removeSnapshotTable = function(index) {
+    $scope.cubeMetaFrame.snapshot_table_desc_list.splice(index, 1);
+  };
+
+  $scope.addSnapshot = function(newSnapshot) {
+    if (!$scope.cubeMetaFrame.snapshot_table_desc_list) {
+       $scope.cubeMetaFrame.snapshot_table_desc_list = [];
+    }
+    if (!newSnapshot.table_name || !newSnapshot.storage_type) {
+      swal('Oops...', 'Snapshot table name or storage should not be empty', 'warning');
+      return;
+    } else if ($scope.cubeMetaFrame.snapshot_table_desc_list.length){
+      var existSnapshot = _.find($scope.cubeMetaFrame.snapshot_table_desc_list, function(snapshot){ return snapshot.table_name === newSnapshot.table_name;});
+      if (!!existSnapshot) {
+        swal('Oops...', 'Snapshot table already existed', 'warning');
+        return;
+      }
+    }
+    $scope.cubeMetaFrame.snapshot_table_desc_list.push(angular.copy(newSnapshot));
+    $scope.newSnapshot.select = {};
+  };
+
+  $scope.changeSnapshotStorage = function(snapshot) {
+    if (snapshot.storage_type == 'hbase') {
+      snapshot.global = true;
+    }
+  };
+
+  $scope.changeSnapshotTable = function(changeSnapshot, beforeTableName, snapshotTableDescList) {
+    var existSnapshot = _.find(snapshotTableDescList, function(snapshot) {
+      return snapshot.table_name === changeSnapshot.table_name;
+    });
+    if (!!existSnapshot) {
+      changeSnapshot.table_name = beforeTableName;
+      swal('Oops...', 'Snapshot table already existed', 'warning');
+    }
+  };
+
+  $scope.getCubeLookups = function() {
+    var modelDesc = modelsManager.getModel($scope.cubeMetaFrame.model_name);
+    var modelLookups = modelDesc ? modelDesc.lookups : [];
+    var cubeLookups = [];
+    angular.forEach(modelLookups, function(modelLookup, index) {
+      var dimensionLookup = _.find($scope.cubeMetaFrame.dimensions, function(dimension){ return dimension.table === modelLookup.alias;});
+      if (!!dimensionLookup) {
+        if (cubeLookups.indexOf(modelLookup.table) === -1) {
+          cubeLookups.push(modelLookup.table);
+        }
+      }
+    });
+    return cubeLookups;
+  };
+
+  $scope.cubeLookups = $scope.getCubeLookups();
 });
diff --git a/webapp/app/js/controllers/cubes.js b/webapp/app/js/controllers/cubes.js
index cbd6fad1bb..136d86eacf 100644
--- a/webapp/app/js/controllers/cubes.js
+++ b/webapp/app/js/controllers/cubes.js
@@ -542,6 +542,27 @@ KylinApp.controller('CubesCtrl', function ($scope, $q, $routeParams, $location,
        });
      };
 
+    $scope.startLookupRefresh = function(cube) {
+      $scope.loadDetail(cube).then(function () {
+        $scope.metaModel={
+          model:cube.model
+        };
+        $modal.open({
+          templateUrl: 'lookupRefresh.html',
+          controller: lookupRefreshCtrl,
+          resolve: {
+            cube: function () {
+              return cube;
+            },
+            scope:function(){
+              return $scope;
+            }
+          }
+        });
+        }
+      );
+    };
+
   });
 
 
@@ -780,3 +801,109 @@ var deleteSegmentCtrl = function($scope, $modalInstance, CubeService, SweetAlert
     });
   };
 };
+
+var lookupRefreshCtrl = function($scope, scope, CubeList, $modalInstance, CubeService, cube, SweetAlert, loadingRequest) {
+  $scope.cubeList = CubeList;
+  $scope.cube = cube;
+  $scope.dispalySegment = false;
+
+  $scope.getLookups = function() {
+    var modelLookups = cube.model ? cube.model.lookups : [];
+    var cubeLookups = [];
+    angular.forEach(modelLookups, function(modelLookup, index) {
+      var dimensionTables = _.find(cube.detail.dimensions, function(dimension){ return dimension.table === modelLookup.alias;});
+      if (!!dimensionTables) {
+        if (cubeLookups.indexOf(modelLookup.table) === -1) {
+          cubeLookups.push(modelLookup.table);
+        }
+      }
+    });
+    return cubeLookups;
+  };
+
+  $scope.cubeLookups = $scope.getLookups();
+
+  $scope.lookup = {
+    select: {}
+  };
+
+  $scope.getReadySegment = function(segment) {
+    return segment.status === 'READY';
+  };
+
+  $scope.cancel = function () {
+    $modalInstance.dismiss('cancel');
+  };
+
+  $scope.updateLookupTable = function(tableName) {
+    var lookupTable = _.find(cube.detail.snapshot_table_desc_list, function(table){ return table.table_name == tableName});
+    if (!!lookupTable && lookupTable.global) {
+      $scope.dispalySegment = false;
+      $scope.lookup.select.segments = [];
+    } else {
+      $scope.dispalySegment = true;
+    }
+  };
+
+  $scope.selectAllSegments = function(allSegments) {
+    if (allSegments) {
+      $scope.lookup.select.segments = $scope.cube.segments;
+    } else {
+      $scope.lookup.select.segments = [];
+    }
+  };
+
+  $scope.refresh = function() {
+    if (!$scope.lookup.select.table_name) {
+      SweetAlert.swal('Warning', 'Lookup table should not be empty', 'warning');
+      return;
+    }
+
+    // cube advance lookup table
+    var lookupTable = _.find(cube.detail.snapshot_table_desc_list, function(table){ return table.table_name == $scope.lookup.select.table_name});
+    if (!!lookupTable) {
+      if (!lookupTable.global && $scope.lookup.select.segments.length == 0) {
+        SweetAlert.swal('Warning', 'Segment should not be empty', 'warning');
+        return;
+      }
+    } else {
+      // cube lookup table
+      lookupTable = _.find($scope.cubeLookups, function(table){ return table == $scope.lookup.select.table_name});
+      if (!lookupTable) {
+        SweetAlert.swal('Warning', 'Lookup table not existed in cube', 'warning');
+        return;
+      } else {
+        if ($scope.lookup.select.segments.length == 0) {
+          SweetAlert.swal('Warning', 'Segment should not be empty', 'warning');
+          return;
+        }
+      }
+    }
+
+    var lookupSnapshotBuildRequest = {
+      lookupTableName: $scope.lookup.select.table_name,
+      segmentIDs: _.map($scope.lookup.select.segments, function(segment){ return segment.uuid})
+    };
+
+    loadingRequest.show();
+    CubeService.lookupRefresh({cubeId: cube.name}, lookupSnapshotBuildRequest, function (job) {
+      loadingRequest.hide();
+      $modalInstance.dismiss('cancel');
+      SweetAlert.swal('Success!', 'Lookup refresh job was submitted successfully', 'success');
+      scope.refreshCube(cube).then(function(_cube){
+          $scope.cubeList.cubes[$scope.cubeList.cubes.indexOf(cube)] = _cube;
+        });
+    }, function (e) {
+       loadingRequest.hide();
+      if (e.data && e.data.exception) {
+        var message = e.data.exception;
+
+        var msg = !!(message) ? message : 'Failed to take action.';
+        SweetAlert.swal('Oops...', msg, 'error');
+      } else {
+        SweetAlert.swal('Oops...', "Failed to take action.", 'error');
+      }
+    });
+  };
+
+};
diff --git a/webapp/app/js/controllers/sourceMeta.js b/webapp/app/js/controllers/sourceMeta.js
index 8a795d58bd..49d899849c 100755
--- a/webapp/app/js/controllers/sourceMeta.js
+++ b/webapp/app/js/controllers/sourceMeta.js
@@ -925,3 +925,31 @@ KylinApp
 
   });
 
+/*snapshot controller*/
+KylinApp
+  .controller('TableSnapshotCtrl', function ($scope, TableService, CubeService, uiGridConstants) {
+    $scope.initSnapshots = function() {
+      var tableFullName = $scope.tableModel.selectedSrcTable.database + '.' + $scope.tableModel.selectedSrcTable.name
+      TableService.getSnapshots({tableName: tableFullName, pro: $scope.projectModel.selectedProject}, {}, function (data) {
+        var orgData = JSON.parse(angular.toJson(data));
+        angular.forEach(orgData, function(snapshot) {
+          if(!!snapshot.cubesAndSegmentsUsage && snapshot.cubesAndSegmentsUsage.length > 0) {
+            snapshot.usageInfo = '';
+            angular.forEach(snapshot.cubesAndSegmentsUsage, function(info) {
+              snapshot.usageInfo += info;
+              snapshot.usageInfo += '</br>';
+            });
+          } else {
+            snapshot.usageInfo = 'No Usage Info';
+          }
+        });
+        $scope.tableSnapshots = orgData;
+      });
+    };
+    $scope.$watch('tableModel.selectedSrcTable', function (newValue, oldValue) {
+      if (!newValue || !newValue.name) {
+        return;
+      }
+      $scope.initSnapshots();
+    });
+  }); 
\ No newline at end of file
diff --git a/webapp/app/js/directives/select.js b/webapp/app/js/directives/select.js
index 7327af9d13..038cf570cc 100644
--- a/webapp/app/js/directives/select.js
+++ b/webapp/app/js/directives/select.js
@@ -1627,7 +1627,7 @@ uis.directive('uiSelectMultiple', ['uiSelectMinErr','$timeout', function(uiSelec
                   }
                 }
             }
-            if (angular.equals(result.toUpperCase(),value.toUpperCase())){
+            if (angular.equals((typeof result =='string') ? result.toUpperCase() : result, (typeof value == 'string') ? value.toUpperCase() : value)) {
               resultMultiple.unshift(list[p]);
               return true;
             }
diff --git a/webapp/app/js/model/cubeConfig.js b/webapp/app/js/model/cubeConfig.js
index e163d75cd8..eacb9154c5 100644
--- a/webapp/app/js/model/cubeConfig.js
+++ b/webapp/app/js/model/cubeConfig.js
@@ -114,6 +114,10 @@ KylinApp.constant('cubeConfig', {
     {name:"Segment Dictionary", value:"org.apache.kylin.dict.global.SegmentAppendTrieDictBuilder"}
   ],
   needSetLengthEncodingList:['fixed_length','fixed_length_hex','int','integer'],
+  snapshotStorageTypes: [
+    {name: 'Meta Store', value: 'metaStore'},
+    {name: 'HBase', value: 'hbase'}
+  ],
   baseChartOptions: {
     chart: {
       type: 'sunburstChart',
diff --git a/webapp/app/js/model/tableConfig.js b/webapp/app/js/model/tableConfig.js
index 39895315b1..0dc4a52b87 100644
--- a/webapp/app/js/model/tableConfig.js
+++ b/webapp/app/js/model/tableConfig.js
@@ -121,7 +121,15 @@ KylinApp.constant('tableConfig', {
       "fixed_length_hex",
       "integer"
     ]
-  }
+  },
+  snapshotTheaditems: [
+    {attr: 'snapshotID', name: 'ID'},
+    {attr: 'storageType', name: 'Storage Type'},
+    {attr: 'lastBuildTime', name: 'Last Build Time'},
+    {attr: 'sourceTableLastModifyTime', name: 'Source Table Last Modify Time'},
+    {attr: 'sourceTableSize', name: 'Size'},
+    {attr: 'usageInfo', name: 'Useage Info'}
+  ]
 
 
 });
diff --git a/webapp/app/js/services/cubes.js b/webapp/app/js/services/cubes.js
index 614052104f..537e5c11d7 100644
--- a/webapp/app/js/services/cubes.js
+++ b/webapp/app/js/services/cubes.js
@@ -79,6 +79,7 @@ KylinApp.factory('CubeService', ['$resource', function ($resource, config) {
       }
     },
     optimize: {method: 'PUT', params: {action: 'optimize'}, isArray: false},
-    autoMigrate: {method: 'POST', params: {action: 'migrate'}, isArray: false}
+    autoMigrate: {method: 'POST', params: {action: 'migrate'}, isArray: false},
+    lookupRefresh: {method: 'PUT', params: {action: 'refresh_lookup'}, isArray: false}
   });
 }]);
diff --git a/webapp/app/js/services/tables.js b/webapp/app/js/services/tables.js
index 7d8bc1ae38..9a62cf8c7c 100755
--- a/webapp/app/js/services/tables.js
+++ b/webapp/app/js/services/tables.js
@@ -24,6 +24,7 @@ KylinApp.factory('TableService', ['$resource', function ($resource, config) {
     unLoadHiveTable: {method: 'DELETE', params: {}, isArray: false},
     genCardinality: {method: 'PUT', params: {action: 'cardinality'}, isArray: false},
     showHiveDatabases: {method: 'GET', params: {action:'hive'}, cache: true, isArray: true},
-    showHiveTables: {method: 'GET', params: {action:'hive'}, cache: true, isArray: true}
+    showHiveTables: {method: 'GET', params: {action:'hive'}, cache: true, isArray: true},
+    getSnapshots: {method: 'GET', params: {action: 'snapshots'}, isArray: true}
   });
 }]);
diff --git a/webapp/app/less/app.less b/webapp/app/less/app.less
index eca76ffd0f..7aa7283560 100644
--- a/webapp/app/less/app.less
+++ b/webapp/app/less/app.less
@@ -929,4 +929,11 @@ pre {
 div[title="Cube Info Detail"].popover {
   max-width: 1024px;
   with: min-content;
+}
+/*snapshot usage info tooltip*/
+td.snapshot-usage .tooltip {
+  font-size: 16px;
+}
+td.snapshot-usage .tooltip-inner {
+  max-width: 1024px;
 }
\ No newline at end of file
diff --git a/webapp/app/partials/cubeDesigner/advanced_settings.html b/webapp/app/partials/cubeDesigner/advanced_settings.html
index 7b9193b389..fb7a1b114e 100755
--- a/webapp/app/partials/cubeDesigner/advanced_settings.html
+++ b/webapp/app/partials/cubeDesigner/advanced_settings.html
@@ -503,6 +503,99 @@ <h4 class="box-title text-info">Edit Dictionaries</h4>
              <button class="btn btn-link" ng-click="clearNewDictionaries()">Cancel</button>
            </div>
          </div>
+         <!-- Advanced Lookup Table-->
+         <div class="form-group large-popover" style="margin-bottom:30px;">
+           <h3 style="margin-left:42px;margin-bottom:30px;">Advanced Snapshot Table  <i kylinpopover placement="right" title="Cube Engine" template="AdvanceSnapshotTableTip.html" class="fa fa-info-circle"></i></h3>
+           <div style="margin-left:42px">
+             <!-- edit mode-->
+             <div ng-if="state.mode=='edit'" class="box-body">
+               <table class="table table-hover table-bordered list" style="table-layout: fixed;margin-left:42px;width:92%;">
+                 <thead>
+                   <tr>
+                     <th style="width:60%">Snapshot Table</th>
+                     <th style="width:25%">Type</th>
+                     <th style="width:10%">Global</th>
+                     <th style="width:5%"></th>
+                   </tr>
+                 </thead>
+                 <tbody>
+                   <tr ng-repeat="snapshot in cubeMetaFrame.snapshot_table_desc_list track by $index">
+                     <td>
+                       <select style="width:95%" chosen ng-model="snapshot.table_name"
+                               ng-change="changeSnapshotTable(snapshot, '{{snapshot.table_name}}', {{cubeMetaFrame.snapshot_table_desc_list}})"
+                               ng-options="tableName as tableName for tableName in cubeLookups">
+                         <option value=""></option>
+                       </select>
+                     </td>
+                     <td>
+                       <select style="width:95%" chosen ng-model="snapshot.storage_type"
+                               ng-change="changeSnapshotStorage(snapshot)"
+                               ng-options="storageType.value as storageType.name for storageType in cubeConfig.snapshotStorageTypes">
+                         <option value=""></option>
+                       </select>
+                     </td>
+                     <td>
+                       <input type="checkbox" ng-model="snapshot.global" ng-disabled="(snapshot.storage_type == 'hbase')">
+                     </td>
+                     <td>
+                       <button class="btn btn-xs btn-info" ng-click="removeSnapshotTable($index)">
+                         <i class="fa fa-minus"></i>
+                       </button>
+                     </td>
+                   </tr>
+                   <tr>
+                     <td>
+                       <select style="width:95%" chosen ng-model="newSnapshot.select.table_name"
+                               ng-options="tableName as tableName for tableName in cubeLookups">
+                         <option value=""></option>
+                       </select>
+                     </td>
+                     <td>
+                       <select style="width:95%" chosen ng-model="newSnapshot.select.storage_type"
+                               ng-change="changeSnapshotStorage(newSnapshot.select)"
+                               ng-options="storageType.value as storageType.name for storageType in cubeConfig.snapshotStorageTypes">
+                         <option value=""></option>
+                       </select>
+                     </td>
+                     <td>
+                       <input type="checkbox" ng-model="newSnapshot.select.global" ng-disabled="(newSnapshot.select.storage_type == 'hbase')">
+                     </td>
+                     <td>
+                       <button class="btn btn-xs btn-info" ng-click="addSnapshot(newSnapshot.select)">
+                         <i class="fa fa-plus"></i>
+                       </button>
+                     </td>
+                   </tr>
+                 </tbody>
+               </table>
+             </div>
+             <!-- view-->
+             <div ng-if="state.mode=='view'" class="box-body">
+               <table class="table table-hover table-bordered list" style="table-layout: fixed;margin-left:42px;width:92%;">
+                  <thead>
+                   <tr>
+                     <th style="width:60%">Snapshot Table</th>
+                     <th style="width:25%">Type</th>
+                     <th style="width:10%">Global</th>
+                   </tr>
+                 </thead>
+                 <tbody>
+                   <tr ng-repeat="snapshot in cubeMetaFrame.snapshot_table_desc_list track by $index">
+                     <td>
+                         <p>{{snapshot.table_name}}</p>
+                     </td>
+                     <td>
+                         <p>{{snapshot.storage_type}}</p>
+                     </td>
+                     <td>
+                         <input type="checkbox" ng-model="snapshot.global" disabled="true">
+                     </td>
+                   </tr>
+                 </tbody>
+               </table>
+             </div>
+           </div>
+         </div>
          <!--Edit ColumnFamily-->
          <div class="form-group large-popover" >
            <h3 style="margin-left:42px">Advanced ColumnFamily  <i kylinpopover placement="right" title="Advanced ColumnFamily" template="AdvancedColumnFamilyTip.html" class="fa fa-info-circle"></i></h3>
@@ -647,3 +740,9 @@ <h4>
     </h4>
   </div>
 </script>
+
+<script type="text/ng-template" id="AdvanceSnapshotTableTip.html">
+  <div>
+    <h4>Advance snapshot design for global lookup table and provide different storage type.</h4>
+  </div>
+</script>
diff --git a/webapp/app/partials/cubes/cubes.html b/webapp/app/partials/cubes/cubes.html
index 3fd5e61227..21b7a3494e 100644
--- a/webapp/app/partials/cubes/cubes.html
+++ b/webapp/app/partials/cubes/cubes.html
@@ -95,6 +95,7 @@
                         <li ng-if="cube.status!='DESCBROKEN'"><a ng-click="startJobSubmit(cube);">Build</a></li>
                         <li ng-if="cube.status!='DESCBROKEN'"><a ng-click="startRefresh(cube)">Refresh</a></li>
                         <li ng-if="cube.status!='DESCBROKEN'"><a ng-click="startMerge(cube)">Merge</a></li>
+                        <li ng-if="cube.status!='DESCBROKEN'"><a ng-click="startLookupRefresh(cube);">Lookup Refresh</a></li>
                         <li ng-if="cube.status=='READY' && (userService.hasRole('ROLE_ADMIN') || hasPermission('cube',cube, permissions.ADMINISTRATION.mask, permissions.MANAGEMENT.mask))"><a ng-click="disable(cube)">Disable</a></li>
                         <li ng-if="cube.status=='DISABLED' && (userService.hasRole('ROLE_ADMIN') || hasPermission('cube',cube, permissions.ADMINISTRATION.mask, permissions.MANAGEMENT.mask))"><a ng-click="enable(cube)">Enable</a></li>
                         <li ng-if="cube.status=='DISABLED' && (userService.hasRole('ROLE_ADMIN') || hasPermission('cube',cube, permissions.ADMINISTRATION.mask, permissions.MANAGEMENT.mask))"><a ng-click="startDeleteSegment(cube)">Delete Segment</a></li>
@@ -146,4 +147,5 @@
 <div ng-include="'partials/models/model_detail.html'"></div>
 <div ng-include="'partials/cubes/cube_clone.html'"></div>
 <div ng-include="'partials/cubes/cube_delete_segment.html'"></div>
+<div ng-include="'partials/jobs/lookup_refresh.html'"></div>
 </div>
diff --git a/webapp/app/partials/jobs/lookup_refresh.html b/webapp/app/partials/jobs/lookup_refresh.html
new file mode 100644
index 0000000000..ce8bbf92ab
--- /dev/null
+++ b/webapp/app/partials/jobs/lookup_refresh.html
@@ -0,0 +1,71 @@
+<!--
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+-->
+
+<script type="text/ng-template" id="lookupRefresh.html">
+  <div class="modal-header">
+    <h4 tooltip="refresh">LOOKUP REFRESH CONFIRM</h4>
+  </div>
+  <div class="modal-body" style="background-color: white">
+    <div ng-if="!!cube.detail.snapshot_table_desc_list" class="row">
+      <div class="col-md-2"></div>
+      <div class="col-md-8">
+        <table class="table table-striped list">
+          <tbody>
+            <tr>
+              <td style="width:30%">Lookup Table</td>
+              <td style="width:70%" colspan="2">
+                <select style="width:95%" chosen ng-model="lookup.select.table_name"
+                  ng-change="updateLookupTable(lookup.select.table_name)"
+                  ng-options="lookup as lookup for lookup in cubeLookups">
+                  <option value=""></option>
+                </select>
+              </td>
+            </tr>
+            <tr ng-if="dispalySegment">
+              <td style="width:30%">Segments</td>
+              <td style="width:60%">
+                <ui-select multiple ng-model="lookup.select.segments" theme="bootstrap" sortable="true" close-on-select="false" class="form-control">
+                  <ui-select-match placeholder="Select Segments...">{{$item.name}}</ui-select-match>
+                  <ui-select-choices repeat="segment in cube.segments | filter:getReadySegment">
+                    <div ng-bind-html="segment.name | highlight: $select.search"></div>
+                  </ui-select-choices>
+                </ui-select>
+              </td>
+              <td style="width:10%">
+                <input type="checkbox" ng-change="selectAllSegments(allSegments)" ng-model="allSegments">&nbsp;All
+              </td>
+            </tr>
+          </tbody>
+        </table>
+      </div>
+      <div class="col-md-2"></div>
+    </div>
+    <div ng-if="!cube.detail.snapshot_table_desc_list" class="row">
+      <div class="col-md-2"></div>
+      <div class="col-md-8">
+        <span>No lookup table defined. Please configurate lookup in model</span>
+      </div>
+      <div class="col-md-2"></div>
+    </div>
+  </div>
+
+  <div class="modal-footer">
+    <button class="btn btn-primary" ng-click="cancel()">Close</button>
+    <button ng-if="!!cube.detail.snapshot_table_desc_list" class="btn btn-success" ng-click="refresh()">Submit</button>
+  </div>
+</script>
\ No newline at end of file
diff --git a/webapp/app/partials/tables/table_detail.html b/webapp/app/partials/tables/table_detail.html
index d8209a8e4d..441e09311e 100644
--- a/webapp/app/partials/tables/table_detail.html
+++ b/webapp/app/partials/tables/table_detail.html
@@ -35,6 +35,9 @@ <h3 class="text-info">Table Schema:{{ tableModel.selectedSrcTable.name}}</h3>
         <li>
           <a data-toggle="tab"  href="#access">Access</a>
         </li>
+        <li>
+          <a data-toggle="tab" href="#snapshot">Snapshot</a>
+        </li>
       </ul>
       <div class="tab-content">
         <!--Schema-->
@@ -184,8 +187,53 @@ <h3 class="text-info">Table Schema:{{ tableModel.selectedSrcTable.name}}</h3>
               </div>
             </div>
           </div>
+        </div>
 
-
+        <!--snapshot-->
+        <div id="snapshot" class="tab-pane" ng-controller="TableSnapshotCtrl">
+          <div ng-if="tableSnapshots.length > 0">
+            <table class="table table-hover table-striped list">
+              <thead>
+                <tr style="cursor: pointer">
+                  <th ng-repeat="theaditem in tableConfig.snapshotTheaditems"
+                    ng-click="state.filterAttr= theaditem.attr;state.reverseColumn=theaditem.attr;state.filterReverse=!state.filterReverse;">
+                  {{theaditem.name}}
+                    <i ng-if="state.reverseColumn!= theaditem.attr"
+                       class="fa fa-unsorted"></i>
+                    <i ng-if="state.reverseColumn== theaditem.attr && !state.filterReverse"
+                       class="fa fa-sort-asc"></i>
+                    <i ng-if="state.reverseColumn== theaditem.attr && state.filterReverse"
+                       class="fa fa-sort-desc"></i>
+                  </th>
+                </tr>
+              </thead>
+              <tbody>
+                <tr ng-repeat="snapshot in tableSnapshots | filter: columnName | orderObjectBy:state.filterAttr:state.filterReverse">
+                  <td style="{{(snapshot.snapshotID == snapshot.snapshotID)? 'background-color:#EBF9FE':''}}">
+                    {{snapshot.snapshotID}}
+                  </td>
+                  <td style="{{(snapshot.snapshotID == snapshot.snapshotID)? 'background-color:#EBF9FE':''}}">
+                    {{snapshot.storageType}}
+                  </td>
+                  <td style="{{(snapshot.snapshotID == snapshot.snapshotID)? 'background-color:#EBF9FE':''}}">
+                    {{snapshot.lastBuildTime | utcToConfigTimeZone}}
+                  </td>
+                  <td style="{{(snapshot.snapshotID == snapshot.snapshotID)? 'background-color:#EBF9FE':''}}">
+                    {{snapshot.sourceTableLastModifyTime | utcToConfigTimeZone}}
+                  </td>
+                  <td style="{{(snapshot.snapshotID == snapshot.snapshotID)? 'background-color:#EBF9FE':''}}">
+                    {{snapshot.sourceTableSize | bytes}}
+                  </td>
+                  <td style="{{(snapshot.snapshotID == snapshot.snapshotID)? 'background-color:#EBF9FE':''}}" class="snapshot-usage">
+                    <i class="fa fa-list text-aqua" style="cursor: pointer;"  aria-hidden="true" tooltip-placement="left" tooltip-html-unsafe="<div style='text-align:left'>{{snapshot.usageInfo}}</div>"></i>
+                  </td>
+                </tr>
+              </tbody>
+            </table>
+          </div>
+          <div ng-if="tableSnapshots.length == 0">
+            <div no-result text="No Snapshot Info."></div>
+          </div>
         </div>
       </div>
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message