kylin-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] kyotoYaho closed pull request #108: Apache kylin 2895
Date Tue, 23 Oct 2018 12:40:29 GMT
kyotoYaho closed pull request #108: Apache kylin 2895
URL: https://github.com/apache/kylin/pull/108
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/cache/pom.xml b/cache/pom.xml
new file mode 100755
index 0000000000..65c99ab65d
--- /dev/null
+++ b/cache/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kylin-cache</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache Kylin - Cache</name>
+    <description>Apache Kylin - Cache</description>
+
+    <parent>
+        <artifactId>kylin</artifactId>
+        <groupId>org.apache.kylin</groupId>
+        <version>2.3.0-SNAPSHOT</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-metrics</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-jvm</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-context-support</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-test</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>net.sf.ehcache</groupId>
+            <artifactId>ehcache</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>net.spy</groupId>
+            <artifactId>spymemcached</artifactId>
+        </dependency>
+
+        <!-- Test & Env -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+            <!--MRUnit relies on older version of mockito, so cannot manage it globally-->
+            <version>${mockito.version}</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/cache/src/main/java/net/spy/memcached/RefinedKetamaNodeLocator.java b/cache/src/main/java/net/spy/memcached/RefinedKetamaNodeLocator.java
new file mode 100755
index 0000000000..edf7fbe7b2
--- /dev/null
+++ b/cache/src/main/java/net/spy/memcached/RefinedKetamaNodeLocator.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.spy.memcached;
+
+import net.spy.memcached.compat.SpyObject;
+import net.spy.memcached.util.DefaultKetamaNodeLocatorConfiguration;
+import net.spy.memcached.util.KetamaNodeLocatorConfiguration;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * Copyright (C) 2006-2009 Dustin Sallings
+ * Copyright (C) 2009-2011 Couchbase, Inc.
+ *
+ * This is a modified version of the Ketama consistent hash strategy from
+ * last.fm. This implementation may not be compatible with libketama as hashing
+ * is considered separate from node location.
+ * 
+ * The only modified method is the getSequence(). 
+ * The previous 7 may be too small to reduce the chance to get all down nodes.
+ *
+ * Note that this implementation does not currently supported weighted nodes.
+ *
+ * @see <a href="http://www.last.fm/user/RJ/journal/2007/04/10/392555/">RJ's
+ *      blog post</a>
+ */
+public final class RefinedKetamaNodeLocator extends SpyObject implements NodeLocator {
+
+    private final HashAlgorithm hashAlg;
+    private final Map<InetSocketAddress, Integer> weights;
+    private final boolean isWeightedKetama;
+    private final KetamaNodeLocatorConfiguration config;
+    private volatile TreeMap<Long, MemcachedNode> ketamaNodes;
+    private volatile Collection<MemcachedNode> allNodes;
+
+    /**
+     * Create a new KetamaNodeLocator using specified nodes and the specifed hash
+     * algorithm.
+     *
+     * @param nodes The List of nodes to use in the Ketama consistent hash
+     *          continuum
+     * @param alg The hash algorithm to use when choosing a node in the Ketama
+     *          consistent hash continuum
+     */
+    public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg) {
+        this(nodes, alg, KetamaNodeKeyFormatter.Format.SPYMEMCACHED, new HashMap<InetSocketAddress, Integer>());
+    }
+
+    /**
+     * Create a new KetamaNodeLocator with specific nodes, hash, node key format,
+     * and weight
+     *
+     * @param nodes The List of nodes to use in the Ketama consistent hash
+     *          continuum
+     * @param alg The hash algorithm to use when choosing a node in the Ketama
+     *          consistent hash continuum
+     * @param nodeKeyFormat the format used to name the nodes in Ketama, either
+     *          SPYMEMCACHED or LIBMEMCACHED
+     * @param weights node weights for ketama, a map from InetSocketAddress to
+     *          weight as Integer
+     */
+    public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg,
+            KetamaNodeKeyFormatter.Format nodeKeyFormat, Map<InetSocketAddress, Integer> weights) {
+        this(nodes, alg, weights, new DefaultKetamaNodeLocatorConfiguration(new KetamaNodeKeyFormatter(nodeKeyFormat)));
+    }
+
+    /**
+     * Create a new KetamaNodeLocator using specified nodes and the specifed hash
+     * algorithm and configuration.
+     *
+     * @param nodes The List of nodes to use in the Ketama consistent hash
+     *          continuum
+     * @param alg The hash algorithm to use when choosing a node in the Ketama
+     *          consistent hash continuum
+     * @param conf
+     */
+    public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg, KetamaNodeLocatorConfiguration conf) {
+        this(nodes, alg, new HashMap<InetSocketAddress, Integer>(), conf);
+    }
+
+    /**
+     * Create a new KetamaNodeLocator with specific nodes, hash, node key format,
+     * and weight
+     *
+     * @param nodes The List of nodes to use in the Ketama consistent hash
+     *          continuum
+     * @param alg The hash algorithm to use when choosing a node in the Ketama
+     *          consistent hash continuum
+     * @param weights node weights for ketama, a map from InetSocketAddress to
+     *          weight as Integer
+     * @param configuration node locator configuration
+     */
+    public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg,
+            Map<InetSocketAddress, Integer> nodeWeights, KetamaNodeLocatorConfiguration configuration) {
+        super();
+        allNodes = nodes;
+        hashAlg = alg;
+        config = configuration;
+        weights = nodeWeights;
+        isWeightedKetama = !weights.isEmpty();
+        setKetamaNodes(nodes);
+    }
+
+    private RefinedKetamaNodeLocator(TreeMap<Long, MemcachedNode> smn, Collection<MemcachedNode> an, HashAlgorithm alg,
+            Map<InetSocketAddress, Integer> nodeWeights, KetamaNodeLocatorConfiguration conf) {
+        super();
+        ketamaNodes = smn;
+        allNodes = an;
+        hashAlg = alg;
+        config = conf;
+        weights = nodeWeights;
+        isWeightedKetama = !weights.isEmpty();
+    }
+
+    public Collection<MemcachedNode> getAll() {
+        return allNodes;
+    }
+
+    public MemcachedNode getPrimary(final String k) {
+        MemcachedNode rv = getNodeForKey(hashAlg.hash(k));
+        assert rv != null : "Found no node for key " + k;
+        return rv;
+    }
+
+    long getMaxKey() {
+        return getKetamaNodes().lastKey();
+    }
+
+    MemcachedNode getNodeForKey(long hash) {
+        final MemcachedNode rv;
+        if (!ketamaNodes.containsKey(hash)) {
+            // Java 1.6 adds a ceilingKey method, but I'm still stuck in 1.5
+            // in a lot of places, so I'm doing this myself.
+            SortedMap<Long, MemcachedNode> tailMap = getKetamaNodes().tailMap(hash);
+            if (tailMap.isEmpty()) {
+                hash = getKetamaNodes().firstKey();
+            } else {
+                hash = tailMap.firstKey();
+            }
+        }
+        rv = getKetamaNodes().get(hash);
+        return rv;
+    }
+
+    /**
+     * the previous 7 may be too small to reduce the chance to get all down nodes
+     * @param k
+     * @return
+     */
+    public Iterator<MemcachedNode> getSequence(String k) {
+        // Seven searches gives us a 1 in 2^maxTry chance of hitting the
+        // same dead node all of the time.
+        int maxTry = config.getNodeRepetitions() + 1;
+        if (maxTry < 20) {
+            maxTry = 20;
+        }
+        return new KetamaIterator(k, maxTry, getKetamaNodes(), hashAlg);
+    }
+
+    public NodeLocator getReadonlyCopy() {
+        TreeMap<Long, MemcachedNode> smn = new TreeMap<Long, MemcachedNode>(getKetamaNodes());
+        Collection<MemcachedNode> an = new ArrayList<MemcachedNode>(allNodes.size());
+
+        // Rewrite the values a copy of the map.
+        for (Map.Entry<Long, MemcachedNode> me : smn.entrySet()) {
+            smn.put(me.getKey(), new MemcachedNodeROImpl(me.getValue()));
+        }
+
+        // Copy the allNodes collection.
+        for (MemcachedNode n : allNodes) {
+            an.add(new MemcachedNodeROImpl(n));
+        }
+
+        return new RefinedKetamaNodeLocator(smn, an, hashAlg, weights, config);
+    }
+
+    @Override
+    public void updateLocator(List<MemcachedNode> nodes) {
+        allNodes = nodes;
+        setKetamaNodes(nodes);
+    }
+
+    /**
+     * @return the ketamaNodes
+     */
+    protected TreeMap<Long, MemcachedNode> getKetamaNodes() {
+        return ketamaNodes;
+    }
+
+    /**
+     * Setup the KetamaNodeLocator with the list of nodes it should use.
+     *
+     * @param nodes a List of MemcachedNodes for this KetamaNodeLocator to use in
+     *          its continuum
+     */
+    protected void setKetamaNodes(List<MemcachedNode> nodes) {
+        TreeMap<Long, MemcachedNode> newNodeMap = new TreeMap<Long, MemcachedNode>();
+        int numReps = config.getNodeRepetitions();
+        int nodeCount = nodes.size();
+        int totalWeight = 0;
+
+        if (isWeightedKetama) {
+            for (MemcachedNode node : nodes) {
+                totalWeight += weights.get(node.getSocketAddress());
+            }
+        }
+
+        for (MemcachedNode node : nodes) {
+            if (isWeightedKetama) {
+
+                int thisWeight = weights.get(node.getSocketAddress());
+                float percent = (float) thisWeight / (float) totalWeight;
+                int pointerPerServer = (int) ((Math.floor(
+                        (float) (percent * (float) config.getNodeRepetitions() / 4 * (float) nodeCount + 0.0000000001)))
+                        * 4);
+                for (int i = 0; i < pointerPerServer / 4; i++) {
+                    for (long position : ketamaNodePositionsAtIteration(node, i)) {
+                        newNodeMap.put(position, node);
+                        getLogger().debug("Adding node %s with weight %s in position %d", node, thisWeight, position);
+                    }
+                }
+            } else {
+                // Ketama does some special work with md5 where it reuses chunks.
+                // Check to be backwards compatible, the hash algorithm does not
+                // matter for Ketama, just the placement should always be done using
+                // MD5
+                if (hashAlg == DefaultHashAlgorithm.KETAMA_HASH) {
+                    for (int i = 0; i < numReps / 4; i++) {
+                        for (long position : ketamaNodePositionsAtIteration(node, i)) {
+                            newNodeMap.put(position, node);
+                            getLogger().debug("Adding node %s in position %d", node, position);
+                        }
+                    }
+                } else {
+                    for (int i = 0; i < numReps; i++) {
+                        newNodeMap.put(hashAlg.hash(config.getKeyForNode(node, i)), node);
+                    }
+                }
+            }
+        }
+        assert newNodeMap.size() == numReps * nodes.size();
+        ketamaNodes = newNodeMap;
+    }
+
+    private List<Long> ketamaNodePositionsAtIteration(MemcachedNode node, int iteration) {
+        List<Long> positions = new ArrayList<Long>();
+        byte[] digest = DefaultHashAlgorithm.computeMd5(config.getKeyForNode(node, iteration));
+        for (int h = 0; h < 4; h++) {
+            Long k = ((long) (digest[3 + h * 4] & 0xFF) << 24) | ((long) (digest[2 + h * 4] & 0xFF) << 16)
+                    | ((long) (digest[1 + h * 4] & 0xFF) << 8) | (digest[h * 4] & 0xFF);
+            positions.add(k);
+        }
+        return positions;
+    }
+}
diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/CacheConstants.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/CacheConstants.java
new file mode 100755
index 0000000000..11652fbc7e
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/CacheConstants.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.cachemanager;
+
+public class CacheConstants {
+    public static final String QUERY_CACHE = "StorageCache";
+    public static final String ACL_CACHE = "AclCache";
+    public static final String USER_CACHE = "UserCache";
+}
diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/InstrumentedEhCacheCacheManager.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/InstrumentedEhCacheCacheManager.java
new file mode 100755
index 0000000000..6a86806b2e
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/InstrumentedEhCacheCacheManager.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.cachemanager;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.kylin.cache.ehcache.InstrumentedEhCacheCache;
+import org.apache.kylin.common.KylinConfig;
+import org.springframework.cache.Cache;
+import org.springframework.cache.ehcache.EhCacheCache;
+import org.springframework.cache.support.AbstractCacheManager;
+import org.springframework.util.Assert;
+
+import com.google.common.collect.Sets;
+
+import net.sf.ehcache.Ehcache;
+import net.sf.ehcache.Status;
+
+/**
+ * CacheManager backed by an EhCache {@link net.sf.ehcache.CacheManager}.
+ *
+ */
+public class InstrumentedEhCacheCacheManager extends AbstractCacheManager {
+
+    private net.sf.ehcache.CacheManager cacheManager;
+    private Map<String, String> metricsConfig = KylinConfig.getInstanceFromEnv().getKylinMetricsConf();
+    private boolean enableMetrics = false;
+
+    /**
+     * Return the backing EhCache {@link net.sf.ehcache.CacheManager}.
+     */
+    public net.sf.ehcache.CacheManager getCacheManager() {
+        return this.cacheManager;
+    }
+
+    /**
+     * Set the backing EhCache {@link net.sf.ehcache.CacheManager}.
+     */
+    public void setCacheManager(net.sf.ehcache.CacheManager cacheManager) {
+        this.cacheManager = cacheManager;
+        if ("true".equalsIgnoreCase(metricsConfig.get("ehcache.enabled"))) {
+            enableMetrics = true;
+        }
+    }
+
+    @Override
+    protected Collection<Cache> loadCaches() {
+        Assert.notNull(this.cacheManager, "A backing EhCache CacheManager is required");
+        Status status = this.cacheManager.getStatus();
+        Assert.isTrue(Status.STATUS_ALIVE.equals(status),
+                "An 'alive' EhCache CacheManager is required - current cache is " + status.toString());
+
+        String[] names = this.cacheManager.getCacheNames();
+        Collection<Cache> caches = Sets.newLinkedHashSetWithExpectedSize(names.length);
+        for (String name : names) {
+            if (enableMetrics) {
+                caches.add(new InstrumentedEhCacheCache(this.cacheManager.getEhcache(name)));
+            } else {
+                caches.add(new EhCacheCache(this.cacheManager.getEhcache(name)));
+            }
+        }
+        return caches;
+    }
+
+    @Override
+    public Cache getCache(String name) {
+        Cache cache = super.getCache(name);
+        if (cache == null) {
+            // check the EhCache cache again
+            // (in case the cache was added at runtime)
+            Ehcache ehcache = this.cacheManager.getEhcache(name);
+            if (ehcache != null) {
+                if (enableMetrics) {
+                    cache = new InstrumentedEhCacheCache(ehcache);
+                } else {
+                    cache = new EhCacheCache(ehcache);
+                }
+                addCache(cache);
+            }
+        }
+        return cache;
+    }
+
+}
diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/MemcachedCacheManager.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/MemcachedCacheManager.java
new file mode 100755
index 0000000000..39903dd3b0
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/MemcachedCacheManager.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.cachemanager;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import net.spy.memcached.MemcachedClientIF;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.cache.memcached.MemcachedCache;
+import org.apache.kylin.cache.memcached.MemcachedCacheConfig;
+import org.apache.kylin.cache.memcached.MemcachedChunkingCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cache.Cache;
+import org.springframework.cache.support.AbstractCacheManager;
+import org.springframework.cache.support.SimpleValueWrapper;
+
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class MemcachedCacheManager extends AbstractCacheManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(MemcachedCacheManager.class);
+    private static final Long ONE_MINUTE = 60 * 1000L;
+
+    @Autowired
+    private MemcachedCacheConfig memcachedCacheConfig;
+
+    private ScheduledExecutorService timer = Executors.newScheduledThreadPool(1,
+            new ThreadFactoryBuilder().setNameFormat("Memcached-HealthChecker").build());
+    private AtomicBoolean clusterHealth = new AtomicBoolean(true);
+
+    @Override
+    protected Collection<? extends Cache> loadCaches() {
+        Cache successCache = new MemCachedCacheAdaptor(
+                new MemcachedChunkingCache(MemcachedCache.create(memcachedCacheConfig, CacheConstants.QUERY_CACHE)));
+        Cache aclCache = new MemCachedCacheAdaptor(
+                new MemcachedCache(MemcachedCache.create(memcachedCacheConfig, CacheConstants.ACL_CACHE)));
+        Cache userCache = new MemCachedCacheAdaptor(
+                new MemcachedCache(MemcachedCache.create(memcachedCacheConfig, CacheConstants.USER_CACHE, 86400)));
+
+        addCache(successCache);
+        addCache(aclCache);
+        addCache(userCache);
+
+        Collection<String> names = getCacheNames();
+        Collection<Cache> caches = Lists.newArrayList();
+        for (String name : names) {
+            caches.add(getCache(name));
+        }
+
+        timer.scheduleWithFixedDelay(new MemcachedClusterHealthChecker(), ONE_MINUTE, ONE_MINUTE,
+                TimeUnit.MILLISECONDS);
+        return caches;
+    }
+
+    public boolean isClusterDown() {
+        return !clusterHealth.get();
+    }
+
+    @VisibleForTesting
+    void setClusterHealth(boolean ifHealth) {
+        clusterHealth.set(ifHealth);
+    }
+
+    public static class MemCachedCacheAdaptor implements Cache {
+        private MemcachedCache memcachedCache;
+
+        public MemCachedCacheAdaptor(MemcachedCache memcachedCache) {
+            this.memcachedCache = memcachedCache;
+        }
+
+        @Override
+        public String getName() {
+            return memcachedCache.getName();
+        }
+
+        @Override
+        public Object getNativeCache() {
+            return memcachedCache.getNativeCache();
+        }
+
+        @Override
+        public ValueWrapper get(Object key) {
+            byte[] value = memcachedCache.get(key);
+            if (value == null) {
+                return null;
+            }
+            return new SimpleValueWrapper(SerializationUtils.deserialize(value));
+        }
+
+        @Override
+        public void put(Object key, Object value) {
+            memcachedCache.put(key, value);
+        }
+
+        @Override
+        public void evict(Object key) {
+            memcachedCache.evict(key);
+        }
+
+        @Override
+        public void clear() {
+            memcachedCache.clear();
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <T> T get(Object key, Class<T> type) {
+            byte[] value = memcachedCache.get(key);
+            if (value == null) {
+                return null;
+            }
+            Object obj = SerializationUtils.deserialize(value);
+            if (obj != null && type != null && !type.isInstance(value)) {
+                throw new IllegalStateException(
+                        "Cached value is not of required type [" + type.getName() + "]: " + value);
+            }
+            return (T) obj;
+        }
+
+        @Override
+        //TODO
+        public <T> T get(Object key, Callable<T> valueLoader) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        //TODO implementation here doesn't guarantee the atomicity.
+        //Without atomicity, this method should not be invoked
+        public ValueWrapper putIfAbsent(Object key, Object value) {
+            byte[] existing = memcachedCache.get(key);
+            if (existing == null) {
+                memcachedCache.put(key, value);
+                return null;
+            } else {
+                return new SimpleValueWrapper(SerializationUtils.deserialize(existing));
+            }
+        }
+
+    }
+
+    private class MemcachedClusterHealthChecker implements Runnable {
+        @Override
+        public void run() {
+            Cache cache = getCache(CacheConstants.QUERY_CACHE);
+            MemcachedClientIF cacheClient = (MemcachedClientIF) cache.getNativeCache();
+            Collection<SocketAddress> liveServers = cacheClient.getAvailableServers();
+            Collection<SocketAddress> deadServers = cacheClient.getUnavailableServers();
+            if (liveServers.size() == 0) {
+                clusterHealth.set(false);
+                logger.error("All the servers in MemcachedCluster is down, UnavailableServers: " + deadServers);
+            } else {
+                clusterHealth.set(true);
+                if (deadServers.size() > liveServers.size()) {
+                    logger.warn("Half of the servers in MemcachedCluster is down, LiveServers: " + liveServers
+                            + ", UnavailableServers: " + deadServers);
+                }
+            }
+        }
+    }
+}
diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java
new file mode 100755
index 0000000000..913b1caaef
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.cachemanager;
+
+import java.util.Collection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cache.Cache;
+import org.springframework.cache.CacheManager;
+import org.springframework.cache.support.AbstractCacheManager;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+public class RemoteLocalFailOverCacheManager extends AbstractCacheManager {
+    private static final Logger logger = LoggerFactory.getLogger(RemoteLocalFailOverCacheManager.class);
+
+    @Autowired
+    private MemcachedCacheManager remoteCacheManager;
+
+    @Autowired
+    private CacheManager localCacheManager;
+
+    @Override
+    public void afterPropertiesSet() {
+        Preconditions.checkNotNull(localCacheManager, "localCacheManager is not injected yet");
+    }
+
+    @Override
+    protected Collection<? extends Cache> loadCaches() {
+        return null;
+    }
+
+    @Override
+    public Cache getCache(String name) {
+        if (remoteCacheManager == null || remoteCacheManager.isClusterDown()) {
+            logger.info("use local cache, because remote cache is not configured or down");
+            return localCacheManager.getCache(name);
+        } else {
+            return remoteCacheManager.getCache(name);
+        }
+    }
+
+    @VisibleForTesting
+    void disableRemoteCacheManager() {
+        remoteCacheManager.setClusterHealth(false);
+    }
+
+    @VisibleForTesting
+    void enableRemoteCacheManager() {
+        remoteCacheManager.setClusterHealth(true);
+    }
+}
diff --git a/cache/src/main/java/org/apache/kylin/cache/ehcache/InstrumentedEhCacheCache.java b/cache/src/main/java/org/apache/kylin/cache/ehcache/InstrumentedEhCacheCache.java
new file mode 100755
index 0000000000..28c7b3def5
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/ehcache/InstrumentedEhCacheCache.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.ehcache;
+
+import static org.apache.kylin.metrics.lib.impl.MetricsSystem.Metrics;
+import static org.apache.kylin.metrics.lib.impl.MetricsSystem.name;
+
+import java.util.concurrent.Callable;
+
+import org.springframework.cache.Cache;
+import org.springframework.cache.ehcache.EhCacheCache;
+import org.springframework.cache.support.SimpleValueWrapper;
+import org.springframework.util.Assert;
+
+import com.codahale.metrics.Gauge;
+
+import net.sf.ehcache.Ehcache;
+import net.sf.ehcache.Element;
+import net.sf.ehcache.Status;
+
+/**
+ * {@link Cache} implementation on top of an {@link Ehcache} instance.
+ *
+ */
+public class InstrumentedEhCacheCache implements Cache {
+
+    private final Ehcache cache;
+
+    /**
+     * Create an {@link EhCacheCache} instance.
+     * @param ehcache backing Ehcache instance
+     */
+    public InstrumentedEhCacheCache(Ehcache ehcache) {
+        Assert.notNull(ehcache, "Ehcache must not be null");
+        Status status = ehcache.getStatus();
+        Assert.isTrue(Status.STATUS_ALIVE.equals(status),
+                "An 'alive' Ehcache is required - current cache is " + status.toString());
+        this.cache = ehcache;
+
+        final String prefix = name(cache.getClass(), cache.getName());
+        Metrics.register(name(prefix, "hits"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().cacheHitCount();
+            }
+        });
+
+        Metrics.register(name(prefix, "in-memory-hits"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().localHeapHitCount();
+            }
+        });
+
+        Metrics.register(name(prefix, "misses"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().cacheMissCount();
+            }
+        });
+
+        Metrics.register(name(prefix, "in-memory-misses"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().localHeapMissCount();
+            }
+        });
+
+        Metrics.register(name(prefix, "objects"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().getSize();
+            }
+        });
+
+        Metrics.register(name(prefix, "in-memory-objects"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().getLocalHeapSize();
+            }
+        });
+
+        Metrics.register(name(prefix, "mean-get-time"), new Gauge<Double>() {
+            @Override
+            public Double getValue() {
+                return cache.getStatistics().cacheGetOperation().latency().average().value();
+            }
+        });
+
+        Metrics.register(name(prefix, "mean-search-time"), new Gauge<Double>() {
+            @Override
+            public Double getValue() {
+                return cache.getStatistics().cacheSearchOperation().latency().average().value();
+            }
+        });
+
+        Metrics.register(name(prefix, "eviction-count"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().cacheEvictionOperation().count().value();
+            }
+        });
+
+        Metrics.register(name(prefix, "writer-queue-size"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().getWriterQueueLength();
+            }
+        });
+    }
+
+    public String getName() {
+        return this.cache.getName();
+    }
+
+    public Ehcache getNativeCache() {
+        return this.cache;
+    }
+
+    public ValueWrapper get(Object key) {
+        Element element = this.cache.get(key);
+        return (element != null ? new SimpleValueWrapper(element.getObjectValue()) : null);
+    }
+
+    public void put(Object key, Object value) {
+        this.cache.put(new Element(key, value));
+    }
+
+    public void evict(Object key) {
+        this.cache.remove(key);
+    }
+
+    public void clear() {
+        this.cache.removeAll();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T get(Object key, Class<T> type) {
+        Element element = lookup(key);
+        Object value = (element != null ? element.getObjectValue() : null);
+        if (value != null && type != null && !type.isInstance(value)) {
+            throw new IllegalStateException("Cached value is not of required type [" + type.getName() + "]: " + value);
+        }
+        return (T) value;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T get(Object key, Callable<T> valueLoader) {
+        Element element = lookup(key);
+        if (element != null) {
+            return (T) element.getObjectValue();
+        } else {
+            this.cache.acquireWriteLockOnKey(key);
+            try {
+                element = lookup(key); // One more attempt with the write lock
+                if (element != null) {
+                    return (T) element.getObjectValue();
+                } else {
+                    return loadValue(key, valueLoader);
+                }
+            } finally {
+                this.cache.releaseWriteLockOnKey(key);
+            }
+        }
+    }
+
+    @Override
+    public ValueWrapper putIfAbsent(Object key, Object value) {
+        Element existingElement = this.cache.putIfAbsent(new Element(key, value));
+        return (existingElement != null ? new SimpleValueWrapper(existingElement.getObjectValue()) : null);
+    }
+
+    private Element lookup(Object key) {
+        return this.cache.get(key);
+    }
+
+    private <T> T loadValue(Object key, Callable<T> valueLoader) {
+        T value;
+        try {
+            value = valueLoader.call();
+        } catch (Throwable ex) {
+            throw new ValueRetrievalException(key, valueLoader, ex);
+        }
+        put(key, value);
+        return value;
+    }
+}
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/CacheStats.java b/cache/src/main/java/org/apache/kylin/cache/memcached/CacheStats.java
new file mode 100755
index 0000000000..93dda5ca84
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/CacheStats.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+public class CacheStats {
+    private final long numHits;
+    private final long numMisses;
+    private final long getBytes;
+    private final long getTime;
+    private final long numPut;
+    private final long putBytes;
+    private final long numEvictions;
+    private final long numTimeouts;
+    private final long numErrors;
+
+    public CacheStats(long getBytes, long getTime, long numPut, long putBytes, long numHits, long numMisses,
+            long numEvictions, long numTimeouts, long numErrors) {
+        this.getBytes = getBytes;
+        this.getTime = getTime;
+        this.numPut = numPut;
+        this.putBytes = putBytes;
+        this.numHits = numHits;
+        this.numMisses = numMisses;
+        this.numEvictions = numEvictions;
+        this.numTimeouts = numTimeouts;
+        this.numErrors = numErrors;
+    }
+
+    public long getNumHits() {
+        return numHits;
+    }
+
+    public long getNumMisses() {
+        return numMisses;
+    }
+
+    public long getNumGet() {
+        return numHits + numMisses;
+    }
+
+    public long getNumGetBytes() {
+        return getBytes;
+    }
+
+    public long getNumPutBytes() {
+        return putBytes;
+    }
+
+    public long getNumPut() {
+        return numPut;
+    }
+
+    public long getNumEvictions() {
+        return numEvictions;
+    }
+
+    public long getNumTimeouts() {
+        return numTimeouts;
+    }
+
+    public long getNumErrors() {
+        return numErrors;
+    }
+
+    public long numLookups() {
+        return numHits + numMisses;
+    }
+
+    public double hitRate() {
+        long lookups = numLookups();
+        return lookups == 0 ? 0 : numHits / (double) lookups;
+    }
+
+    public long avgGetBytes() {
+        return getBytes == 0 ? 0 : getBytes / numLookups();
+    }
+
+    public long getAvgGetTime() {
+        return getTime / numLookups();
+    }
+}
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/KeyHookLookup.java b/cache/src/main/java/org/apache/kylin/cache/memcached/KeyHookLookup.java
new file mode 100755
index 0000000000..3b794bac1b
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/KeyHookLookup.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * A Class implement this interface indicates that the key information need to be calculated from a first lookup from cache itself to get
+ * a hook.
+ * 
+ */
+public interface KeyHookLookup {
+    KeyHook lookupKeyHook(String key);
+
+    public static class KeyHook implements Serializable {
+        private static final long serialVersionUID = 2400159460862757991L;
+
+        private String[] chunkskey;
+        private byte[] values;
+
+        /**
+         * For de-serialization
+         */
+        public KeyHook() {
+        }
+
+        public KeyHook(String[] chunkskey, byte[] values) {
+            super();
+            this.chunkskey = chunkskey;
+            this.values = values;
+        }
+
+        public String[] getChunkskey() {
+            return chunkskey;
+        }
+
+        public void setChunkskey(String[] chunkskey) {
+            this.chunkskey = chunkskey;
+        }
+
+        public byte[] getValues() {
+            return values;
+        }
+
+        public void setValues(byte[] values) {
+            this.values = values;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + Arrays.hashCode(chunkskey);
+            result = prime * result + Arrays.hashCode(values);
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            KeyHook other = (KeyHook) obj;
+            if (!Arrays.equals(chunkskey, other.chunkskey))
+                return false;
+            if (!Arrays.equals(values, other.values))
+                return false;
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            if (chunkskey != null) {
+                builder.append("chunkskey_length:" + chunkskey.length);
+            } else {
+                builder.append("chunkskey_is_null");
+            }
+            builder.append("|");
+            if (values != null) {
+                builder.append("value_length:" + values.length);
+            } else {
+                builder.append("value_is_null");
+            }
+            return builder.toString();
+        }
+
+        //        @Override
+        //        public void writeExternal(ObjectOutput out) throws IOException {
+        //            if(chunkskey == null){
+        //                out.writeInt(0);
+        //            }else{
+        //                out.writeInt(chunkskey.length);
+        //                for (String chunkKey : chunkskey) {
+        //                    out.writeUTF(chunkKey);
+        //                }
+        //            }
+        //            if(values != null){
+        //                out.write(values);
+        //            }
+        //        }
+        //        
+        //        @Override
+        //        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        //            int keySize = in.readInt();
+        //            if(keySize > 0){
+        //                chunkskey = new String[keySize];
+        //                for (int i = 0; i < keySize; i++){
+        //                    chunkskey[i] = in.readUTF();
+        //                }
+        //            }
+        //            int available = in.available();
+        //            if(available > 0){
+        //                values = new byte[available];
+        //                in.read(values);
+        //            }
+        //        }
+    }
+}
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCache.java b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCache.java
new file mode 100755
index 0000000000..ad6c9f5b8f
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCache.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Shorts;
+import net.spy.memcached.AddrUtil;
+import net.spy.memcached.ConnectionFactory;
+import net.spy.memcached.ConnectionFactoryBuilder;
+import net.spy.memcached.DefaultHashAlgorithm;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.MemcachedClient;
+import net.spy.memcached.MemcachedClientIF;
+import net.spy.memcached.ops.ArrayOperationQueueFactory;
+import net.spy.memcached.ops.LinkedOperationQueueFactory;
+import net.spy.memcached.ops.OperationQueueFactory;
+import net.spy.memcached.transcoders.SerializingTranscoder;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.CompressionUtils;
+import org.apache.kylin.common.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.zip.DataFormatException;
+
+/**
+ * Cache backend by Memcached. The implementation leverages spymemcached client to talk to the servers.
+ * Memcached itself has a limitation to the size of the key. So the real key for cache lookup is hashed from the orginal key. 
+ * The implementation provdes a way for hash collsion detection. It can also compress/decompress the value bytes based on the preconfigred compression threshold to save network bandwidth and storage space.
+ * 
+ * @author mingmwang
+ *
+ */
+public class MemcachedCache {
+    public static final int MAX_PREFIX_LENGTH = MemcachedClientIF.MAX_KEY_LENGTH - 40 // length of namespace hash
+            - 40 // length of key hash
+            - 2; // length of separators
+    private static final Logger logger = LoggerFactory.getLogger(MemcachedCache.class);
+    private static final int DEFAULT_TTL = 7 * 24 * 3600;
+    protected final MemcachedCacheConfig config;
+    protected final MemcachedClientIF client;
+    protected final String memcachedPrefix;
+    protected final int compressThreshold;
+    protected final AtomicLong hitCount = new AtomicLong(0);
+    protected final AtomicLong missCount = new AtomicLong(0);
+    protected final AtomicLong readBytes = new AtomicLong(0);
+    protected final AtomicLong timeoutCount = new AtomicLong(0);
+    protected final AtomicLong errorCount = new AtomicLong(0);
+    protected final AtomicLong putCount = new AtomicLong(0);
+    protected final AtomicLong putBytes = new AtomicLong(0);
+    private final int timeToLiveSeconds;
+    protected AtomicLong cacheGetTime = new AtomicLong(0);
+
+    public MemcachedCache(final MemcachedClientIF client, final MemcachedCacheConfig config,
+            final String memcachedPrefix, int timeToLiveSeconds) {
+        Preconditions.checkArgument(memcachedPrefix.length() <= MAX_PREFIX_LENGTH,
+                "memcachedPrefix length [%d] exceeds maximum length [%d]", memcachedPrefix.length(), MAX_PREFIX_LENGTH);
+        this.memcachedPrefix = memcachedPrefix;
+        this.client = client;
+        this.config = config;
+        this.compressThreshold = config.getMaxObjectSize() / 2;
+        this.timeToLiveSeconds = timeToLiveSeconds;
+    }
+
+    public MemcachedCache(MemcachedCache cache) {
+        this(cache.client, cache.config, cache.memcachedPrefix, cache.timeToLiveSeconds);
+    }
+
+    /**
+     * Create and return the MemcachedCache. Each time call this method will create a new instance.
+     * @param config            The MemcachedCache configuration to control the cache behavior.
+     * @return
+     */
+    public static MemcachedCache create(final MemcachedCacheConfig config, String memcachedPrefix) {
+        return create(config, memcachedPrefix, DEFAULT_TTL);
+    }
+
+    public static MemcachedCache create(final MemcachedCacheConfig config, String memcachedPrefix, int timeToLive) {
+        try {
+            SerializingTranscoder transcoder = new SerializingTranscoder(config.getMaxObjectSize());
+            // always no compression inside, we compress/decompress outside
+            transcoder.setCompressionThreshold(Integer.MAX_VALUE);
+
+            OperationQueueFactory opQueueFactory;
+            int maxQueueSize = config.getMaxOperationQueueSize();
+            if (maxQueueSize > 0) {
+                opQueueFactory = new ArrayOperationQueueFactory(maxQueueSize);
+            } else {
+                opQueueFactory = new LinkedOperationQueueFactory();
+            }
+            String hostsStr = config.getHosts();
+            ConnectionFactory connectionFactory = new MemcachedConnectionFactoryBuilder()
+                    .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
+                    .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
+                    .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT).setDaemon(true)
+                    .setFailureMode(FailureMode.Redistribute).setTranscoder(transcoder).setShouldOptimize(true)
+                    .setOpQueueMaxBlockTime(config.getTimeout()).setOpTimeout(config.getTimeout())
+                    .setReadBufferSize(config.getReadBufferSize()).setOpQueueFactory(opQueueFactory).build();
+            return new MemcachedCache(new MemcachedClient(new MemcachedConnectionFactory(connectionFactory),
+                    AddrUtil.getAddresses(hostsStr)), config, memcachedPrefix, timeToLive);
+        } catch (IOException e) {
+            logger.error("Unable to create MemcachedCache instance.", e);
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public String getName() {
+        return memcachedPrefix;
+    }
+
+    public Object getNativeCache() {
+        return client;
+    }
+
+    protected String serializeKey(Object key) {
+        try {
+            return JsonUtil.writeValueAsString(key);
+        } catch (JsonProcessingException e) {
+            logger.warn("Can not convert key to String.", e);
+        }
+        return null;
+    }
+
+    protected byte[] serializeValue(Object value) {
+        return SerializationUtils.serialize((Serializable) value);
+    }
+
+    @VisibleForTesting
+    byte[] encodeValue(String keyS, Object value) {
+        if (keyS == null) {
+            return null;
+        }
+        return encodeValue(keyS.getBytes(Charsets.UTF_8), serializeValue(value));
+    }
+
+    /**
+     * This method is used to get value object based on key from the Cache. It converts key to json string first.
+     * And then it calls getBinary() method to compute hashed key from the original key string, and use this as the real key to do lookup from internal Cache.
+     * Then decodes the real values bytes from the cache lookup result, and leverages object serializer to convert value bytes to object.
+     */
+    public byte[] get(Object key) {
+        return get(serializeKey(key));
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     */
+    public byte[] get(String keyS) {
+        return getBinary(keyS);
+    }
+
+    /**
+     * This method is used to put key/value objects to the Cache. It converts key to json string and leverages object serializer to convert value object to bytes.
+     * And then it calls putBinary() method to compute hashed key from the original key string and encode the original key bytes into value bytes for hash conflicts detection.
+     */
+    public void put(Object key, Object value) {
+        put(serializeKey(key), value);
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     */
+    public void put(String keyS, Object value) {
+        if (keyS != null) {
+            putBinary(keyS, serializeValue(value), timeToLiveSeconds);
+        }
+    }
+
+    public void evict(Object key) {
+        if (key == null)
+            return;
+        evict(serializeKey(key));
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     */
+    public void evict(String keyS) {
+        if (keyS == null)
+            return;
+        client.delete(computeKeyHash(keyS));
+    }
+
+    public void clear() {
+        logger.warn("Clear Remote Cache!");
+        Future<Boolean> resultFuture = client.flush();
+        try {
+            boolean result = resultFuture.get();
+            logger.warn("Clear Remote Cache returned with result: " + result);
+        } catch (Exception e) {
+            logger.warn("Can't clear Remote Cache.", e);
+        }
+    }
+
+    public CacheStats getStats() {
+        return new CacheStats(readBytes.get(), cacheGetTime.get(), putCount.get(), putBytes.get(), hitCount.get(),
+                missCount.get(), 0, timeoutCount.get(), errorCount.get());
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     * @return the serialized value
+     */
+    protected byte[] getBinary(String keyS) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return null;
+        }
+        byte[] bytes = internalGet(computeKeyHash(keyS));
+        return decodeValue(keyS.getBytes(Charsets.UTF_8), bytes);
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     * @param valueB should be the serialized value
+     */
+    protected void putBinary(String keyS, byte[] valueB, int expiration) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return;
+        }
+        internalPut(computeKeyHash(keyS), encodeValue(keyS.getBytes(Charsets.UTF_8), valueB), expiration);
+    }
+
+    protected byte[] internalGet(String hashedKey) {
+        Future<Object> future;
+        long start = System.currentTimeMillis();
+        try {
+            future = client.asyncGet(hashedKey);
+        } catch (IllegalStateException e) {
+            // operation did not get queued in time (queue is full)
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", e);
+            return null;
+        } catch (Throwable t) {
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", t);
+            return null;
+        }
+
+        try {
+            byte[] result = (byte[]) future.get(config.getTimeout(), TimeUnit.MILLISECONDS);
+            cacheGetTime.addAndGet(System.currentTimeMillis() - start);
+            if (result != null) {
+                hitCount.incrementAndGet();
+                readBytes.addAndGet(result.length);
+            } else {
+                missCount.incrementAndGet();
+            }
+            return result;
+        } catch (TimeoutException e) {
+            timeoutCount.incrementAndGet();
+            future.cancel(false);
+            return null;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw Throwables.propagate(e);
+        } catch (ExecutionException e) {
+            errorCount.incrementAndGet();
+            logger.error("ExecutionException when pulling key meta from cache.", e);
+            return null;
+        }
+    }
+
+    private void internalPut(String hashedKey, byte[] encodedValue, int expiration) {
+        try {
+            client.set(hashedKey, expiration, encodedValue);
+            putCount.incrementAndGet();
+            putBytes.addAndGet(encodedValue.length);
+        } catch (IllegalStateException e) {
+            // operation did not get queued in time (queue is full)
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", e);
+        } catch (Throwable t) {
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", t);
+        }
+    }
+
+    protected byte[] encodeValue(byte[] key, byte[] valueB) {
+        byte[] compressed = null;
+        if (config.isEnableCompression() && (valueB.length + Ints.BYTES + key.length > compressThreshold)) {
+            try {
+                compressed = CompressionUtils.compress(ByteBuffer.allocate(Ints.BYTES + key.length + valueB.length)
+                        .putInt(key.length).put(key).put(valueB).array());
+            } catch (IOException e) {
+                compressed = null;
+                logger.warn("Compressing value bytes error.", e);
+            }
+        }
+        if (compressed != null) {
+            return ByteBuffer.allocate(Shorts.BYTES + compressed.length).putShort((short) 1).put(compressed).array();
+        } else {
+            return ByteBuffer.allocate(Shorts.BYTES + Ints.BYTES + key.length + valueB.length).putShort((short) 0)
+                    .putInt(key.length).put(key).put(valueB).array();
+        }
+    }
+
+    protected byte[] decodeValue(byte[] key, byte[] valueE) {
+        if (valueE == null)
+            return null;
+        ByteBuffer buf = ByteBuffer.wrap(valueE);
+        short enableCompression = buf.getShort();
+        byte[] uncompressed = null;
+        if (enableCompression == 1) {
+            byte[] value = new byte[buf.remaining()];
+            buf.get(value);
+            try {
+                uncompressed = CompressionUtils.decompress(value);
+            } catch (IOException | DataFormatException e) {
+                logger.error("Decompressing value bytes error.", e);
+                return null;
+            }
+        }
+        if (uncompressed != null) {
+            buf = ByteBuffer.wrap(uncompressed);
+        }
+        final int keyLength = buf.getInt();
+        byte[] keyBytes = new byte[keyLength];
+        buf.get(keyBytes);
+        if (!Arrays.equals(keyBytes, key)) {
+            logger.error("Keys do not match, possible hash collision!");
+            return null;
+        }
+        byte[] value = new byte[buf.remaining()];
+        buf.get(value);
+        return value;
+    }
+
+    protected String computeKeyHash(String key) {
+        // hash keys to keep things under 250 characters for memcached
+        return Joiner.on(":").skipNulls().join(KylinConfig.getInstanceFromEnv().getDeployEnv(), this.memcachedPrefix,
+                DigestUtils.shaHex(key));
+
+    }
+
+}
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCacheConfig.java b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCacheConfig.java
new file mode 100755
index 0000000000..723fd186bd
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCacheConfig.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import net.spy.memcached.DefaultConnectionFactory;
+
+public class MemcachedCacheConfig {
+    private long timeout = 500L;
+
+    // comma delimited list of memcached servers, given as host:port combination
+    private String hosts;
+
+    private int maxChunkSize = 1024;
+
+    private int maxObjectSize = 1024 * 1024;
+
+    // memcached client read buffer size, -1 uses the spymemcached library default
+    private int readBufferSize = DefaultConnectionFactory.DEFAULT_READ_BUFFER_SIZE;
+
+    // maximum operation queue size. 0 means unbounded
+    private int maxOperationQueueSize = 0;
+
+    // whether enable compress the value data or not
+    private boolean enableCompression = true;
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    public String getHosts() {
+        return hosts;
+    }
+
+    public void setHosts(String hosts) {
+        this.hosts = hosts;
+    }
+
+    public int getMaxChunkSize() {
+        return maxChunkSize;
+    }
+
+    public void setMaxChunkSize(int maxChunkSize) {
+        this.maxChunkSize = maxChunkSize;
+    }
+
+    public int getMaxObjectSize() {
+        return maxObjectSize;
+    }
+
+    public void setMaxObjectSize(int maxObjectSize) {
+        this.maxObjectSize = maxObjectSize;
+    }
+
+    public int getMaxOperationQueueSize() {
+        return maxOperationQueueSize;
+    }
+
+    public void setMaxOperationQueueSize(int maxOperationQueueSize) {
+        this.maxOperationQueueSize = maxOperationQueueSize;
+    }
+
+    public int getReadBufferSize() {
+        return readBufferSize;
+    }
+
+    public void setReadBufferSize(int readBufferSize) {
+        this.readBufferSize = readBufferSize;
+    }
+
+    public boolean isEnableCompression() {
+        return enableCompression;
+    }
+
+    public void setEnableCompression(boolean enableCompression) {
+        this.enableCompression = enableCompression;
+    }
+}
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedChunkingCache.java b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedChunkingCache.java
new file mode 100755
index 0000000000..1511f0feff
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedChunkingCache.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Shorts;
+import net.spy.memcached.internal.BulkFuture;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.common.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Subclass of MemcachedCache. It supports storing large objects.  Memcached itself has a limitation to the value size with default value of 1M.
+ * This implement extends the limit to 1G and can split huge bytes to multiple chunks. It will take care of the data integrity if part of the chunks lost(due to server restart or other reasons)
+ *
+ * @author mingmwang
+ */
+public class MemcachedChunkingCache extends MemcachedCache implements KeyHookLookup {
+    private static final Logger logger = LoggerFactory.getLogger(MemcachedChunkingCache.class);
+
+    public MemcachedChunkingCache(MemcachedCache cache) {
+        super(cache);
+        Preconditions.checkArgument(config.getMaxChunkSize() > 1, "maxChunkSize [%d] must be greater than 1",
+                config.getMaxChunkSize());
+        Preconditions.checkArgument(config.getMaxObjectSize() > 261, "maxObjectSize [%d] must be greater than 261",
+                config.getMaxObjectSize());
+    }
+
+    protected static byte[][] splitBytes(final byte[] data, final int nSplit) {
+        byte[][] dest = new byte[nSplit][];
+
+        final int splitSize = (data.length - 1) / nSplit + 1;
+        for (int i = 0; i < nSplit - 1; i++) {
+            dest[i] = Arrays.copyOfRange(data, i * splitSize, (i + 1) * splitSize);
+        }
+        dest[nSplit - 1] = Arrays.copyOfRange(data, (nSplit - 1) * splitSize, data.length);
+
+        return dest;
+    }
+
+    protected static int getValueSplit(MemcachedCacheConfig config, String keyS, int valueBLen) {
+        // the number 6 means the chunk number size never exceeds 6 bytes
+        final int VALUE_SIZE = config.getMaxObjectSize() - Shorts.BYTES - Ints.BYTES
+                - keyS.getBytes(Charsets.UTF_8).length - 6;
+        final int MAX_VALUE_SIZE = config.getMaxChunkSize() * VALUE_SIZE;
+        Preconditions.checkArgument(valueBLen <= MAX_VALUE_SIZE,
+                "the value bytes length [%d] exceeds maximum value size [%d]", valueBLen, MAX_VALUE_SIZE);
+        return (valueBLen - 1) / VALUE_SIZE + 1;
+    }
+
+    protected static Pair<KeyHook, byte[][]> getKeyValuePair(int nSplit, String keyS, byte[] valueB) {
+        KeyHook keyHook;
+        byte[][] splitValueB = null;
+        if (nSplit > 1) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Enable chunking for putting large cached object values, chunk size = " + nSplit
+                        + ", original value bytes size = " + valueB.length);
+            }
+            String[] chunkKeySs = new String[nSplit];
+            for (int i = 0; i < nSplit; i++) {
+                chunkKeySs[i] = keyS + i;
+            }
+            keyHook = new KeyHook(chunkKeySs, null);
+            splitValueB = splitBytes(valueB, nSplit);
+        } else {
+            if (logger.isDebugEnabled()) {
+                logger.debug(
+                        "Chunking not enabled, put the original value bytes to keyhook directly, original value bytes size = "
+                                + valueB.length);
+            }
+            keyHook = new KeyHook(null, valueB);
+        }
+
+        return new Pair<>(keyHook, splitValueB);
+    }
+
+    /**
+     * This method overrides the parent getBinary(), it gets the KeyHook from the Cache first and check the KeyHook that whether chunking is enabled or not.
+     */
+    @Override
+    public byte[] getBinary(String keyS) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return null;
+        }
+        KeyHook keyHook = lookupKeyHook(keyS);
+        if (keyHook == null) {
+            return null;
+        }
+
+        if (keyHook.getChunkskey() == null || keyHook.getChunkskey().length == 0) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Chunking not enabled, return the value bytes in the keyhook directly, value bytes size = "
+                        + keyHook.getValues().length);
+            }
+            return keyHook.getValues();
+        }
+
+        BulkFuture<Map<String, Object>> bulkFuture;
+        long start = System.currentTimeMillis();
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Chunking enabled, chunk size = " + keyHook.getChunkskey().length);
+        }
+
+        Map<String, String> keyLookup = computeKeyHash(Arrays.asList(keyHook.getChunkskey()));
+        try {
+            bulkFuture = client.asyncGetBulk(keyLookup.keySet());
+        } catch (IllegalStateException e) {
+            // operation did not get queued in time (queue is full)
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", e);
+            return null;
+        } catch (Throwable t) {
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", t);
+            return null;
+        }
+
+        try {
+            Map<String, Object> bulkResult = bulkFuture.get(config.getTimeout(), TimeUnit.MILLISECONDS);
+            cacheGetTime.addAndGet(System.currentTimeMillis() - start);
+            if (bulkResult.size() != keyHook.getChunkskey().length) {
+                missCount.incrementAndGet();
+                logger.warn("Some paritial chunks missing for query key:" + keyS);
+                //remove all the partital chunks here.
+                for (String partitalKey : bulkResult.keySet()) {
+                    client.delete(partitalKey);
+                }
+                deleteKeyHook(keyS);
+                return null;
+            }
+            hitCount.getAndAdd(keyHook.getChunkskey().length);
+            byte[][] bytesArray = new byte[keyHook.getChunkskey().length][];
+            for (Map.Entry<String, Object> entry : bulkResult.entrySet()) {
+                byte[] bytes = (byte[]) entry.getValue();
+                readBytes.addAndGet(bytes.length);
+                String originalKeyS = keyLookup.get(entry.getKey());
+                int idx = Integer.parseInt(originalKeyS.substring(keyS.length()));
+                bytesArray[idx] = decodeValue(originalKeyS.getBytes(Charsets.UTF_8), bytes);
+            }
+            return concatBytes(bytesArray);
+        } catch (TimeoutException e) {
+            timeoutCount.incrementAndGet();
+            bulkFuture.cancel(false);
+            return null;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw Throwables.propagate(e);
+        } catch (ExecutionException e) {
+            errorCount.incrementAndGet();
+            logger.error("ExecutionException when pulling item from cache.", e);
+            return null;
+        }
+    }
+
+    /**
+     * This method overrides the parent putBinary() method. It will split the large value bytes into multiple chunks to fit into the internal Cache.
+     * It generates a KeyHook to store the splitted chunked keys.
+     */
+    @Override
+    public void putBinary(String keyS, byte[] valueB, int expiration) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return;
+        }
+        int nSplit = getValueSplit(config, keyS, valueB.length);
+        Pair<KeyHook, byte[][]> keyValuePair = getKeyValuePair(nSplit, keyS, valueB);
+        KeyHook keyHook = keyValuePair.getFirst();
+        byte[][] splitValueB = keyValuePair.getSecond();
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("put key hook:{} to cache for hash key", keyHook);
+        }
+        super.putBinary(keyS, serializeValue(keyHook), expiration);
+        if (nSplit > 1) {
+            for (int i = 0; i < nSplit; i++) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Chunk[" + i + "] bytes size before encoding  = " + splitValueB[i].length);
+                }
+                super.putBinary(keyHook.getChunkskey()[i], splitValueB[i], expiration);
+            }
+        }
+    }
+
+    public void evict(String keyS) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return;
+        }
+        KeyHook keyHook = lookupKeyHook(keyS);
+        if (keyHook == null) {
+            return;
+        }
+
+        if (keyHook.getChunkskey() != null && keyHook.getChunkskey().length > 0) {
+            String[] chunkKeys = keyHook.getChunkskey();
+            for (String chunkKey : chunkKeys) {
+                super.evict(chunkKey);
+            }
+        }
+        super.evict(keyS);
+    }
+
+    protected Map<String, String> computeKeyHash(List<String> keySList) {
+        return Maps.uniqueIndex(keySList, new Function<String, String>() {
+            @Override
+            public String apply(String keyS) {
+                return computeKeyHash(keyS);
+            }
+        });
+    }
+
+    private void deleteKeyHook(String keyS) {
+        try {
+            super.evict(keyS);
+        } catch (IllegalStateException e) {
+            // operation did not get queued in time (queue is full)
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation: ", e);
+        }
+    }
+
+    private byte[] concatBytes(byte[]... bytesArray) {
+        int length = 0;
+        for (byte[] bytes : bytesArray) {
+            length += bytes.length;
+        }
+        byte[] result = new byte[length];
+        int destPos = 0;
+        for (byte[] bytes : bytesArray) {
+            System.arraycopy(bytes, 0, result, destPos, bytes.length);
+            destPos += bytes.length;
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug("Original value bytes size for all chunks  = " + result.length);
+        }
+
+        return result;
+    }
+
+    @Override
+    public KeyHook lookupKeyHook(String keyS) {
+        byte[] bytes = super.getBinary(keyS);
+        if (bytes == null) {
+            return null;
+        }
+        return (KeyHook) SerializationUtils.deserialize(bytes);
+    }
+}
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactory.java b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactory.java
new file mode 100755
index 0000000000..1e7c473995
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactory.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.kylin.common.KylinConfig;
+
+import net.spy.memcached.ConnectionFactory;
+import net.spy.memcached.ConnectionObserver;
+import net.spy.memcached.DefaultConnectionFactory;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.HashAlgorithm;
+import net.spy.memcached.MemcachedConnection;
+import net.spy.memcached.MemcachedNode;
+import net.spy.memcached.NodeLocator;
+import net.spy.memcached.OperationFactory;
+import net.spy.memcached.auth.AuthDescriptor;
+import net.spy.memcached.compat.SpyObject;
+import net.spy.memcached.metrics.MetricCollector;
+import net.spy.memcached.metrics.MetricType;
+import net.spy.memcached.metrics.NoopMetricCollector;
+import net.spy.memcached.ops.Operation;
+import net.spy.memcached.transcoders.Transcoder;
+
+public class MemcachedConnectionFactory extends SpyObject implements ConnectionFactory {
+    private ConnectionFactory underlying;
+    private Map<String, String> metricsConfig = KylinConfig.getInstanceFromEnv().getKylinMetricsConf();
+
+    public MemcachedConnectionFactory(ConnectionFactory underlying) {
+        this.underlying = underlying;
+    }
+
+    @Override
+    public MetricType enableMetrics() {
+        String metricType = metricsConfig.get("memcached.metricstype");
+        return metricType == null ? DefaultConnectionFactory.DEFAULT_METRIC_TYPE
+                : MetricType.valueOf(metricType.toUpperCase());
+    }
+
+    @Override
+    public MetricCollector getMetricCollector() {
+        String enableMetrics = metricsConfig.get("memcached.enabled");
+        if (enableMetrics().equals(MetricType.OFF) || enableMetrics == null
+                || "false".equalsIgnoreCase(enableMetrics)) {
+            getLogger().debug("Memcached metrics collection disabled.");
+            return new NoopMetricCollector();
+        } else {
+            getLogger().info("Memcached metrics collection enabled (Profile " + enableMetrics() + ").");
+            return new MemcachedMetrics();
+        }
+    }
+
+    @Override
+    public MemcachedConnection createConnection(List<InetSocketAddress> addrs) throws IOException {
+        return underlying.createConnection(addrs);
+    }
+
+    @Override
+    public MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c, int bufSize) {
+        return underlying.createMemcachedNode(sa, c, bufSize);
+    }
+
+    @Override
+    public BlockingQueue<Operation> createOperationQueue() {
+        return underlying.createOperationQueue();
+    }
+
+    @Override
+    public BlockingQueue<Operation> createReadOperationQueue() {
+        return underlying.createReadOperationQueue();
+    }
+
+    @Override
+    public BlockingQueue<Operation> createWriteOperationQueue() {
+        return underlying.createWriteOperationQueue();
+    }
+
+    @Override
+    public long getOpQueueMaxBlockTime() {
+        return underlying.getOpQueueMaxBlockTime();
+    }
+
+    @Override
+    public ExecutorService getListenerExecutorService() {
+        return underlying.getListenerExecutorService();
+    }
+
+    @Override
+    public boolean isDefaultExecutorService() {
+        return underlying.isDefaultExecutorService();
+    }
+
+    @Override
+    public NodeLocator createLocator(List<MemcachedNode> nodes) {
+        return underlying.createLocator(nodes);
+    }
+
+    @Override
+    public OperationFactory getOperationFactory() {
+        return underlying.getOperationFactory();
+    }
+
+    @Override
+    public long getOperationTimeout() {
+        return underlying.getOperationTimeout();
+    }
+
+    @Override
+    public boolean isDaemon() {
+        return underlying.isDaemon();
+    }
+
+    @Override
+    public boolean useNagleAlgorithm() {
+        return underlying.useNagleAlgorithm();
+    }
+
+    @Override
+    public Collection<ConnectionObserver> getInitialObservers() {
+        return underlying.getInitialObservers();
+    }
+
+    @Override
+    public FailureMode getFailureMode() {
+        return underlying.getFailureMode();
+    }
+
+    @Override
+    public Transcoder<Object> getDefaultTranscoder() {
+        return underlying.getDefaultTranscoder();
+    }
+
+    @Override
+    public boolean shouldOptimize() {
+        return underlying.shouldOptimize();
+    }
+
+    @Override
+    public int getReadBufSize() {
+        return underlying.getReadBufSize();
+    }
+
+    @Override
+    public HashAlgorithm getHashAlg() {
+        return underlying.getHashAlg();
+    }
+
+    @Override
+    public long getMaxReconnectDelay() {
+        return underlying.getMaxReconnectDelay();
+    }
+
+    @Override
+    public AuthDescriptor getAuthDescriptor() {
+        return underlying.getAuthDescriptor();
+    }
+
+    @Override
+    public int getTimeoutExceptionThreshold() {
+        return underlying.getTimeoutExceptionThreshold();
+    }
+
+    @Override
+    public long getAuthWaitTime() {
+        return underlying.getAuthWaitTime();
+    }
+}
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactoryBuilder.java b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactoryBuilder.java
new file mode 100755
index 0000000000..ed572c8742
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactoryBuilder.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+
+import net.spy.memcached.ArrayModNodeLocator;
+import net.spy.memcached.ConnectionFactory;
+import net.spy.memcached.ConnectionFactoryBuilder;
+import net.spy.memcached.ConnectionObserver;
+import net.spy.memcached.DefaultConnectionFactory;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.HashAlgorithm;
+import net.spy.memcached.MemcachedNode;
+import net.spy.memcached.NodeLocator;
+import net.spy.memcached.OperationFactory;
+import net.spy.memcached.RefinedKetamaNodeLocator;
+import net.spy.memcached.auth.AuthDescriptor;
+import net.spy.memcached.metrics.MetricCollector;
+import net.spy.memcached.metrics.MetricType;
+import net.spy.memcached.ops.Operation;
+import net.spy.memcached.transcoders.Transcoder;
+
+public class MemcachedConnectionFactoryBuilder extends ConnectionFactoryBuilder {
+    /**
+     * Get the ConnectionFactory set up with the provided parameters.
+     */
+    public ConnectionFactory build() {
+        return new DefaultConnectionFactory() {
+
+            @Override
+            public BlockingQueue<Operation> createOperationQueue() {
+                return opQueueFactory == null ? super.createOperationQueue() : opQueueFactory.create();
+            }
+
+            @Override
+            public BlockingQueue<Operation> createReadOperationQueue() {
+                return readQueueFactory == null ? super.createReadOperationQueue() : readQueueFactory.create();
+            }
+
+            @Override
+            public BlockingQueue<Operation> createWriteOperationQueue() {
+                return writeQueueFactory == null ? super.createReadOperationQueue() : writeQueueFactory.create();
+            }
+
+            @Override
+            public NodeLocator createLocator(List<MemcachedNode> nodes) {
+                switch (locator) {
+                case ARRAY_MOD:
+                    return new ArrayModNodeLocator(nodes, getHashAlg());
+                case CONSISTENT:
+                    return new RefinedKetamaNodeLocator(nodes, getHashAlg());
+                default:
+                    throw new IllegalStateException("Unhandled locator type: " + locator);
+                }
+            }
+
+            @Override
+            public Transcoder<Object> getDefaultTranscoder() {
+                return transcoder == null ? super.getDefaultTranscoder() : transcoder;
+            }
+
+            @Override
+            public FailureMode getFailureMode() {
+                return failureMode == null ? super.getFailureMode() : failureMode;
+            }
+
+            @Override
+            public HashAlgorithm getHashAlg() {
+                return hashAlg == null ? super.getHashAlg() : hashAlg;
+            }
+
+            public Collection<ConnectionObserver> getInitialObservers() {
+                return initialObservers;
+            }
+
+            @Override
+            public OperationFactory getOperationFactory() {
+                return opFact == null ? super.getOperationFactory() : opFact;
+            }
+
+            @Override
+            public long getOperationTimeout() {
+                return opTimeout == -1 ? super.getOperationTimeout() : opTimeout;
+            }
+
+            @Override
+            public int getReadBufSize() {
+                return readBufSize == -1 ? super.getReadBufSize() : readBufSize;
+            }
+
+            @Override
+            public boolean isDaemon() {
+                return isDaemon;
+            }
+
+            @Override
+            public boolean shouldOptimize() {
+                return shouldOptimize;
+            }
+
+            @Override
+            public boolean useNagleAlgorithm() {
+                return useNagle;
+            }
+
+            @Override
+            public long getMaxReconnectDelay() {
+                return maxReconnectDelay;
+            }
+
+            @Override
+            public AuthDescriptor getAuthDescriptor() {
+                return authDescriptor;
+            }
+
+            @Override
+            public long getOpQueueMaxBlockTime() {
+                return opQueueMaxBlockTime > -1 ? opQueueMaxBlockTime : super.getOpQueueMaxBlockTime();
+            }
+
+            @Override
+            public int getTimeoutExceptionThreshold() {
+                return timeoutExceptionThreshold;
+            }
+
+            @Override
+            public MetricType enableMetrics() {
+                return metricType == null ? super.enableMetrics() : metricType;
+            }
+
+            @Override
+            public MetricCollector getMetricCollector() {
+                return collector == null ? super.getMetricCollector() : collector;
+            }
+
+            @Override
+            public ExecutorService getListenerExecutorService() {
+                return executorService == null ? super.getListenerExecutorService() : executorService;
+            }
+
+            @Override
+            public boolean isDefaultExecutorService() {
+                return executorService == null;
+            }
+
+            @Override
+            public long getAuthWaitTime() {
+                return authWaitTime;
+            }
+        };
+
+    }
+}
diff --git a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedMetrics.java b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedMetrics.java
new file mode 100755
index 0000000000..bd4bcc7fe5
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedMetrics.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import static org.apache.kylin.metrics.lib.impl.MetricsSystem.Metrics;
+
+import java.util.Map;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.google.common.collect.Maps;
+
+import net.spy.memcached.metrics.AbstractMetricCollector;
+import net.spy.memcached.metrics.DefaultMetricCollector;
+import net.spy.memcached.metrics.MetricCollector;
+
+/**
+ * A {@link MetricCollector} that uses the Codahale Metrics library.
+ *
+ * The following system properies can be used to customize the behavior
+ * of the collector during runtime:
+ */
+public final class MemcachedMetrics extends AbstractMetricCollector {
+
+    /**
+     * Contains all registered {@link Counter}s.
+     */
+    private Map<String, Counter> counters;
+
+    /**
+     * Contains all registered {@link Meter}s.
+     */
+    private Map<String, Meter> meters;
+
+    /**
+     * Contains all registered {@link Histogram}s.
+     */
+    private Map<String, Histogram> histograms;
+
+    /**
+     * Create a new {@link DefaultMetricCollector}.
+     *
+     * Note that when this constructor is called, the reporter is also
+     * automatically established.
+     */
+    public MemcachedMetrics() {
+        counters = Maps.newConcurrentMap();
+        meters = Maps.newConcurrentMap();
+        histograms = Maps.newConcurrentMap();
+    }
+
+    @Override
+    public void addCounter(String name) {
+        if (!counters.containsKey(name)) {
+            counters.put(name, Metrics.counter(name));
+        }
+    }
+
+    @Override
+    public void removeCounter(String name) {
+        if (!counters.containsKey(name)) {
+            Metrics.remove(name);
+            counters.remove(name);
+        }
+    }
+
+    @Override
+    public void incrementCounter(String name, int amount) {
+        if (counters.containsKey(name)) {
+            counters.get(name).inc(amount);
+        }
+    }
+
+    @Override
+    public void decrementCounter(String name, int amount) {
+        if (counters.containsKey(name)) {
+            counters.get(name).dec(amount);
+        }
+    }
+
+    @Override
+    public void addMeter(String name) {
+        if (!meters.containsKey(name)) {
+            meters.put(name, Metrics.meter(name));
+        }
+    }
+
+    @Override
+    public void removeMeter(String name) {
+        if (meters.containsKey(name)) {
+            meters.remove(name);
+        }
+    }
+
+    @Override
+    public void markMeter(String name) {
+        if (meters.containsKey(name)) {
+            meters.get(name).mark();
+        }
+    }
+
+    @Override
+    public void addHistogram(String name) {
+        if (!histograms.containsKey(name)) {
+            histograms.put(name, Metrics.histogram(name));
+        }
+    }
+
+    @Override
+    public void removeHistogram(String name) {
+        if (histograms.containsKey(name)) {
+            histograms.remove(name);
+        }
+    }
+
+    @Override
+    public void updateHistogram(String name, int amount) {
+        if (histograms.containsKey(name)) {
+            histograms.get(name).update(amount);
+        }
+    }
+}
diff --git a/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java b/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java
new file mode 100644
index 0000000000..f0e0466a4f
--- /dev/null
+++ b/cache/src/test/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManagerTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.cachemanager;
+
+import static org.apache.kylin.cache.cachemanager.CacheConstants.QUERY_CACHE;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.cache.ehcache.EhCacheCache;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(locations = { "classpath:cacheContext.xml" })
+@ActiveProfiles("testing-memcached")
+public class RemoteLocalFailOverCacheManagerTest {
+
+    @Autowired
+    @Qualifier("cacheManager")
+    RemoteLocalFailOverCacheManager cacheManager;
+
+    @BeforeClass
+    public static void setupResource() throws Exception {
+        LocalFileMetadataTestCase.staticCreateTestMetadata();
+    }
+
+    @AfterClass
+    public static void tearDownResource() {
+    }
+
+    @Test
+    public void testCacheManager() {
+        cacheManager.disableRemoteCacheManager();
+        Assert.assertTrue("Memcached failover to ehcache", cacheManager.getCache(QUERY_CACHE) instanceof EhCacheCache);
+        cacheManager.enableRemoteCacheManager();
+        Assert.assertTrue("Memcached enabled",
+                cacheManager.getCache(QUERY_CACHE) instanceof MemcachedCacheManager.MemCachedCacheAdaptor);
+    }
+}
diff --git a/cache/src/test/java/org/apache/kylin/cache/memcached/MemcachedCacheTest.java b/cache/src/test/java/org/apache/kylin/cache/memcached/MemcachedCacheTest.java
new file mode 100644
index 0000000000..ccb9e938ad
--- /dev/null
+++ b/cache/src/test/java/org/apache/kylin/cache/memcached/MemcachedCacheTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kylin.cache.cachemanager.CacheConstants;
+import org.apache.kylin.cache.cachemanager.MemcachedCacheManager.MemCachedCacheAdaptor;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+import net.spy.memcached.MemcachedClient;
+import net.spy.memcached.internal.GetFuture;
+
+public class MemcachedCacheTest extends LocalFileMetadataTestCase {
+
+    private Map<String, String> keyValueMap;
+    private MemCachedCacheAdaptor memCachedAdaptor;
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+
+        keyValueMap = Maps.newHashMap();
+        keyValueMap.put("sql1", "value1");
+        keyValueMap.put("sql11", "value11");
+
+        MemcachedCacheConfig cacheConfig = new MemcachedCacheConfig();
+        MemcachedClient memcachedClient = mock(MemcachedClient.class);
+        MemcachedCache memcachedCache = new MemcachedCache(memcachedClient, cacheConfig, CacheConstants.QUERY_CACHE,
+                7 * 24 * 3600);
+        memCachedAdaptor = new MemCachedCacheAdaptor(memcachedCache);
+
+        //Mock put to cache
+        for (String key : keyValueMap.keySet()) {
+            String keyS = memcachedCache.serializeKey(key);
+            String hashedKey = memcachedCache.computeKeyHash(keyS);
+
+            String value = keyValueMap.get(key);
+            byte[] valueE = memcachedCache.encodeValue(keyS, value);
+
+            GetFuture<Object> future = mock(GetFuture.class);
+            when(future.get(cacheConfig.getTimeout(), TimeUnit.MILLISECONDS)).thenReturn(valueE);
+            when(memcachedClient.asyncGet(hashedKey)).thenReturn(future);
+        }
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testGet() {
+        for (String key : keyValueMap.keySet()) {
+            Assert.assertEquals("The value should not change", keyValueMap.get(key), memCachedAdaptor.get(key).get());
+        }
+    }
+}
diff --git a/cache/src/test/java/org/apache/kylin/cache/memcached/MemcachedChunkingCacheTest.java b/cache/src/test/java/org/apache/kylin/cache/memcached/MemcachedChunkingCacheTest.java
new file mode 100644
index 0000000000..7f05c7c8f0
--- /dev/null
+++ b/cache/src/test/java/org/apache/kylin/cache/memcached/MemcachedChunkingCacheTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import net.spy.memcached.MemcachedClient;
+import net.spy.memcached.internal.BulkFuture;
+import net.spy.memcached.internal.GetFuture;
+import org.apache.kylin.cache.cachemanager.CacheConstants;
+import org.apache.kylin.cache.cachemanager.MemcachedCacheManager.MemCachedCacheAdaptor;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.Pair;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MemcachedChunkingCacheTest extends LocalFileMetadataTestCase {
+
+    private Map<String, String> smallValueMap;
+    private Map<String, String> largeValueMap;
+    private MemCachedCacheAdaptor memCachedAdaptor;
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+        final int MAX_OBJECT_SIZE = 300;
+
+        smallValueMap = Maps.newHashMap();
+        smallValueMap.put("sql1", "value1");
+
+        largeValueMap = Maps.newHashMap();
+        largeValueMap.put("sql2", Strings.repeat("value2", MAX_OBJECT_SIZE));
+
+        MemcachedCacheConfig cacheConfig = new MemcachedCacheConfig();
+        cacheConfig.setMaxObjectSize(MAX_OBJECT_SIZE);
+        MemcachedClient memcachedClient = mock(MemcachedClient.class);
+        MemcachedCache memcachedCache = new MemcachedCache(memcachedClient, cacheConfig, CacheConstants.QUERY_CACHE,
+                7 * 24 * 3600);
+        MemcachedChunkingCache memcachedChunkingCache = new MemcachedChunkingCache(memcachedCache);
+        memCachedAdaptor = new MemCachedCacheAdaptor(memcachedChunkingCache);
+
+        //Mock put to cache
+        for (String key : smallValueMap.keySet()) {
+            String keyS = memcachedCache.serializeKey(key);
+            String hashedKey = memcachedCache.computeKeyHash(keyS);
+
+            String value = smallValueMap.get(key);
+            byte[] valueB = memcachedCache.serializeValue(value);
+            KeyHookLookup.KeyHook keyHook = new KeyHookLookup.KeyHook(null, valueB);
+            byte[] valueE = memcachedCache.encodeValue(keyS, keyHook);
+
+            GetFuture<Object> future = mock(GetFuture.class);
+            when(memcachedClient.asyncGet(hashedKey)).thenReturn(future);
+
+            when(future.get(cacheConfig.getTimeout(), TimeUnit.MILLISECONDS)).thenReturn(valueE);
+        }
+
+        //Mock put large value to cache
+        for (String key : largeValueMap.keySet()) {
+            String keyS = memcachedCache.serializeKey(key);
+            String hashedKey = memcachedCache.computeKeyHash(keyS);
+
+            String value = largeValueMap.get(key);
+            byte[] valueB = memcachedCache.serializeValue(value);
+            int nSplit = MemcachedChunkingCache.getValueSplit(cacheConfig, keyS, valueB.length);
+            Pair<KeyHookLookup.KeyHook, byte[][]> keyValuePair = MemcachedChunkingCache.getKeyValuePair(nSplit, keyS,
+                    valueB);
+            KeyHookLookup.KeyHook keyHook = keyValuePair.getFirst();
+            byte[][] splitValueB = keyValuePair.getSecond();
+
+            //For key
+            byte[] valueE = memcachedCache.encodeValue(keyS, keyHook);
+            GetFuture<Object> future = mock(GetFuture.class);
+            when(memcachedClient.asyncGet(hashedKey)).thenReturn(future);
+            when(future.get(cacheConfig.getTimeout(), TimeUnit.MILLISECONDS)).thenReturn(valueE);
+
+            //For splits
+            Map<String, String> keyLookup = memcachedChunkingCache
+                    .computeKeyHash(Arrays.asList(keyHook.getChunkskey()));
+            Map<String, Object> bulkResult = Maps.newHashMap();
+            for (int i = 0; i < nSplit; i++) {
+                String splitKeyS = keyHook.getChunkskey()[i];
+                bulkResult.put(memcachedCache.computeKeyHash(splitKeyS),
+                        memcachedCache.encodeValue(splitKeyS.getBytes(Charsets.UTF_8), splitValueB[i]));
+            }
+
+            BulkFuture<Map<String, Object>> bulkFuture = mock(BulkFuture.class);
+            when(memcachedClient.asyncGetBulk(keyLookup.keySet())).thenReturn(bulkFuture);
+            when(bulkFuture.get(cacheConfig.getTimeout(), TimeUnit.MILLISECONDS)).thenReturn(bulkResult);
+        }
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testGet() {
+        for (String key : smallValueMap.keySet()) {
+            Assert.assertEquals("The value should not change", smallValueMap.get(key), memCachedAdaptor.get(key).get());
+        }
+        for (String key : largeValueMap.keySet()) {
+            Assert.assertEquals("The value should not change", largeValueMap.get(key), memCachedAdaptor.get(key).get());
+        }
+    }
+
+    @Test
+    public void testSplitBytes() {
+        byte[] data = new byte[8];
+        for (int i = 0; i < data.length; i++) {
+            data[i] = (byte) i;
+        }
+
+        int nSplit;
+        byte[][] dataSplits;
+
+        nSplit = 2;
+        dataSplits = MemcachedChunkingCache.splitBytes(data, nSplit);
+        Assert.assertEquals(nSplit, dataSplits.length);
+        Assert.assertArrayEquals(dataSplits[0], new byte[] { 0, 1, 2, 3 });
+        Assert.assertArrayEquals(dataSplits[1], new byte[] { 4, 5, 6, 7 });
+
+        nSplit = 3;
+        dataSplits = MemcachedChunkingCache.splitBytes(data, nSplit);
+        Assert.assertEquals(nSplit, dataSplits.length);
+        Assert.assertArrayEquals(dataSplits[0], new byte[] { 0, 1, 2 });
+        Assert.assertArrayEquals(dataSplits[1], new byte[] { 3, 4, 5 });
+        Assert.assertArrayEquals(dataSplits[2], new byte[] { 6, 7 });
+    }
+}
diff --git a/cache/src/test/resources/cacheContext.xml b/cache/src/test/resources/cacheContext.xml
new file mode 100644
index 0000000000..2dc4b0857b
--- /dev/null
+++ b/cache/src/test/resources/cacheContext.xml
@@ -0,0 +1,49 @@
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:context="http://www.springframework.org/schema/context"
+       xmlns:cache="http://www.springframework.org/schema/cache"
+       xmlns:p="http://www.springframework.org/schema/p"
+       xmlns="http://www.springframework.org/schema/beans"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans
+    http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
+    http://www.springframework.org/schema/context
+    http://www.springframework.org/schema/context/spring-context-4.3.xsd
+    http://www.springframework.org/schema/cache
+    http://www.springframework.org/schema/cache/spring-cache.xsd">
+
+    <description>Kylin Rest Service</description>
+    <context:annotation-config/>
+
+    <!-- Cache Config -->
+    <cache:annotation-driven/>
+
+    <beans profile="testing-memcached">
+        <bean id="ehcache"
+              class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean"
+              p:configLocation="classpath:ehcache-test.xml" p:shared="true"/>
+        <bean id="localCacheManager" class="org.apache.kylin.cache.cachemanager.InstrumentedEhCacheCacheManager"
+              p:cacheManager-ref="ehcache"/>
+
+        <bean id="remoteCacheManager" class="org.apache.kylin.cache.cachemanager.MemcachedCacheManager"/>
+        <bean id="memcachedCacheConfig" class="org.apache.kylin.cache.memcached.MemcachedCacheConfig">
+            <property name="timeout" value="500"/>
+            <property name="hosts" value="sandbox:11211"/>
+        </bean>
+
+        <bean id="cacheManager" class="org.apache.kylin.cache.cachemanager.RemoteLocalFailOverCacheManager"/>
+    </beans>
+
+</beans>
\ No newline at end of file
diff --git a/cache/src/test/resources/ehcache-test.xml b/cache/src/test/resources/ehcache-test.xml
new file mode 100644
index 0000000000..9a5c70ee7d
--- /dev/null
+++ b/cache/src/test/resources/ehcache-test.xml
@@ -0,0 +1,23 @@
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<ehcache maxBytesLocalHeap="256M">>
+    <cache name="StorageCache"
+           eternal="false"
+           timeToIdleSeconds="86400"
+           memoryStoreEvictionPolicy="LRU"
+    >
+        <persistence strategy="none"/>
+    </cache>
+</ehcache>
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index eee2d099e9..fe6e26dd4f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -18,18 +18,9 @@
 
 package org.apache.kylin.common;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.SortedSet;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.text.StrSubstitutor;
 import org.apache.hadoop.fs.FileSystem;
@@ -42,9 +33,17 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.SortedSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * An abstract class to encapsulate access to a set of 'properties'.
@@ -1239,6 +1238,14 @@ public int getScanThreshold() {
         return Integer.parseInt(getOptional("kylin.query.scan-threshold", "10000000"));
     }
 
+    public boolean isLazyQueryEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.query.lazy-query-enabled", "false"));
+    }
+
+    public long getLazyQueryWaitingTimeoutMilliSeconds() {
+        return Long.parseLong(getOptional("kylin.query.lazy-query-waiting-timeout-milliseconds", "60000L"));
+    }
+
     public int getQueryConcurrentRunningThresholdForProject() {
         // by default there's no limitation
         return Integer.parseInt(getOptional("kylin.query.project-concurrent-running-threshold", "0"));
@@ -1317,9 +1324,30 @@ public boolean isQueryCacheEnabled() {
         return Boolean.parseBoolean(this.getOptional("kylin.query.cache-enabled", "true"));
     }
 
+    public boolean isQueryCacheSignatureEnabled() {
+        return Boolean.parseBoolean(this.getOptional("kylin.query.cache-signature-enabled", "false"));
+    }
+
     public boolean isQueryIgnoreUnknownFunction() {
         return Boolean.parseBoolean(this.getOptional("kylin.query.ignore-unknown-function", "false"));
     }
+    
+    public String getMemCachedHosts() {
+        return getRequired("kylin.cache.memcached.hosts");
+    }
+
+    public boolean isQuerySegmentCacheEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.query.segment-cache-enabled", "false"));
+    }
+
+    public long getQuerySegmentCacheTimeoutMilliSeconds() {
+        return Long.parseLong(getOptional("kylin.query.segment-cache-timeout-milliseconds", "2000"));
+    }
+
+    // define the maximum size for each segment in one query that can be cached, in megabytes
+    public int getQuerySegmentCacheMaxMB() {
+        return Integer.parseInt(getOptional("kylin.query.segment-cache-max-mb", "200"));
+    }
 
     public String getQueryAccessController() {
         return getOptional("kylin.query.access-controller", null);
@@ -1413,6 +1441,11 @@ public String getQueryRealizationFilter() {
         return getOptional("kylin.query.realization-filter", null);
     }
 
+    public String getSQLResponseSignatureClass() {
+        return this.getOptional("kylin.query.signature-class",
+                "org.apache.kylin.rest.signature.FactTableRealizationSetCalculator");
+    }
+
     // ============================================================================
     // SERVER
     // ============================================================================
@@ -1479,6 +1512,10 @@ public String getLDAPAdminRole() {
         return getOptional("kylin.security.acl.admin-role", "");
     }
 
+    public boolean isServerAclCacheEnabled() {
+        return Boolean.parseBoolean(this.getOptional("kylin.server.acl-cache-enabled", "true"));
+    }
+
     // ============================================================================
     // WEB
     // ============================================================================
@@ -1583,7 +1620,7 @@ public String getKylinSystemCubeSinkDefaultClass() {
     }
 
     public String getKylinMetricsSubjectSuffix() {
-        return getOptional("kylin.core.metric.subject-suffix", getDeployEnv());
+        return getOptional("kylin.metric.subject-suffix", getDeployEnv());
     }
 
     public String getKylinMetricsSubjectJob() {
@@ -1632,4 +1669,7 @@ public String getAutoMigrateCubeDestConfig() {
         return getOptional("kylin.tool.auto-migrate-cube.dest-config", "");
     }
 
+    public Map<String, String> getKylinMetricsConf() {
+        return getPropertiesByPrefix("kylin.metrics.");
+    }
 }
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 1aa94d3e8d..a707ec5f0c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -23,6 +23,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -170,7 +171,7 @@ public Throwable getThrowable() {
     }
 
     public void addContext(int ctxId, String type, boolean ifCube) {
-        Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = null;
+        ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = null;
         if (ifCube) {
             cubeSegmentStatisticsMap = Maps.newConcurrentMap();
         }
@@ -195,6 +196,52 @@ public void setContextRealization(int ctxId, String realizationName, int realiza
         return Lists.newArrayList(cubeSegmentStatisticsResultMap.values());
     }
 
+    public CubeSegmentStatistics getCubeSegmentStatistics(int ctxId, String cubeName, String segmentName) {
+        CubeSegmentStatisticsResult cubeSegmentStatisticsResult = cubeSegmentStatisticsResultMap.get(ctxId);
+        if (cubeSegmentStatisticsResult == null) {
+            logger.warn("CubeSegmentStatisticsResult should be initialized for context {}", ctxId);
+            return null;
+        }
+        ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap;
+        if (cubeSegmentStatisticsMap == null) {
+            logger.warn(
+                    "cubeSegmentStatisticsMap should be initialized for CubeSegmentStatisticsResult with query type {}", cubeSegmentStatisticsResult.queryType);
+            return null;
+        }
+        ConcurrentMap<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName);
+        if (segmentStatisticsMap == null) {
+            logger.warn(
+                    "cubeSegmentStatistic should be initialized for cube {}", cubeName);
+            return null;
+        }
+        CubeSegmentStatistics segmentStatistics = segmentStatisticsMap.get(segmentName);
+        if (segmentStatistics == null) {
+            logger.warn(
+                    "segmentStatistics should be initialized for cube {} with segment{}", cubeName, segmentName);
+            return null;
+        }
+        return segmentStatistics;
+    }
+
+    public void addCubeSegmentStatistics(int ctxId, CubeSegmentStatistics cubeSegmentStatistics) {
+        CubeSegmentStatisticsResult cubeSegmentStatisticsResult = cubeSegmentStatisticsResultMap.get(ctxId);
+        if (cubeSegmentStatisticsResult == null) {
+            logger.warn("CubeSegmentStatisticsResult should be initialized for context {}", ctxId);
+            return;
+        }
+        ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap;
+        if (cubeSegmentStatisticsMap == null) {
+            logger.warn(
+                    "cubeSegmentStatisticsMap should be initialized for CubeSegmentStatisticsResult with query type {}", cubeSegmentStatisticsResult.queryType);
+            return;
+        }
+        String cubeName = cubeSegmentStatistics.cubeName;
+        cubeSegmentStatisticsMap.putIfAbsent(cubeName, Maps.<String, CubeSegmentStatistics> newConcurrentMap());
+        ConcurrentMap<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName);
+
+        segmentStatisticsMap.put(cubeSegmentStatistics.getSegmentName(), cubeSegmentStatistics);
+    }
+
     public void addRPCStatistics(int ctxId, String rpcServer, String cubeName, String segmentName, long sourceCuboidId,
             long targetCuboidId, long filterMask, Exception e, long rpcCallTimeMs, long skippedRows, long scannedRows,
             long returnedRows, long aggregatedRows, long scannedBytes) {
@@ -206,28 +253,25 @@ public void addRPCStatistics(int ctxId, String rpcServer, String cubeName, Strin
 
         CubeSegmentStatisticsResult cubeSegmentStatisticsResult = cubeSegmentStatisticsResultMap.get(ctxId);
         if (cubeSegmentStatisticsResult == null) {
-            logger.warn("CubeSegmentStatisticsResult should be initialized for context " + ctxId);
+            logger.warn("CubeSegmentStatisticsResult should be initialized for context {}", ctxId);
             return;
         }
-        Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap;
+        ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap;
         if (cubeSegmentStatisticsMap == null) {
             logger.warn(
-                    "cubeSegmentStatisticsMap should be initialized for CubeSegmentStatisticsResult with query type "
-                            + cubeSegmentStatisticsResult.queryType);
+                    "cubeSegmentStatisticsMap should be initialized for CubeSegmentStatisticsResult with query type {}",
+                    cubeSegmentStatisticsResult.queryType);
             return;
         }
-        Map<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName);
-        if (segmentStatisticsMap == null) {
-            segmentStatisticsMap = Maps.newConcurrentMap();
-            cubeSegmentStatisticsMap.put(cubeName, segmentStatisticsMap);
-        }
+        cubeSegmentStatisticsMap.putIfAbsent(cubeName, Maps.<String, CubeSegmentStatistics> newConcurrentMap());
+        ConcurrentMap<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName);
+
+        CubeSegmentStatistics old = segmentStatisticsMap.putIfAbsent(segmentName, new CubeSegmentStatistics());
         CubeSegmentStatistics segmentStatistics = segmentStatisticsMap.get(segmentName);
-        if (segmentStatistics == null) {
-            segmentStatistics = new CubeSegmentStatistics();
-            segmentStatisticsMap.put(segmentName, segmentStatistics);
+        if (old == null) {
             segmentStatistics.setWrapper(cubeName, segmentName, sourceCuboidId, targetCuboidId, filterMask);
-        }
-        if (segmentStatistics.sourceCuboidId != sourceCuboidId || segmentStatistics.targetCuboidId != targetCuboidId
+        } else if (segmentStatistics.sourceCuboidId != sourceCuboidId
+                || segmentStatistics.targetCuboidId != targetCuboidId
                 || segmentStatistics.filterMask != filterMask) {
             StringBuilder inconsistency = new StringBuilder();
             if (segmentStatistics.sourceCuboidId != sourceCuboidId) {
@@ -388,8 +432,8 @@ public void setWrapper(String cubeName, String segmentName, long sourceCuboidId,
             this.filterMask = filterMask;
         }
 
-        public void addRPCStats(long callTimeMs, long skipCount, long scanCount, long returnCount, long aggrCount,
-                long scanBytes, boolean ifSuccess) {
+        public synchronized void addRPCStats(long callTimeMs, long skipCount, long scanCount, long returnCount,
+                long aggrCount, long scanBytes, boolean ifSuccess) {
             this.callCount++;
             this.callTimeSum += callTimeMs;
             if (this.callTimeMax < callTimeMs) {
@@ -527,7 +571,7 @@ public String toString() {
         protected static final long serialVersionUID = 1L;
 
         private String queryType;
-        private Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap;
+        private ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap;
         private String realization;
         private int realizationType;
 
@@ -535,7 +579,7 @@ public CubeSegmentStatisticsResult() {
         }
 
         public CubeSegmentStatisticsResult(String queryType,
-                Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap) {
+                ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap) {
             this.queryType = queryType;
             this.cubeSegmentStatisticsMap = cubeSegmentStatisticsMap;
         }
@@ -561,7 +605,7 @@ public void setQueryType(String queryType) {
         }
 
         public void setCubeSegmentStatisticsMap(
-                Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap) {
+                ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap) {
             this.cubeSegmentStatisticsMap = cubeSegmentStatisticsMap;
         }
 
@@ -570,7 +614,7 @@ public String getQueryType() {
 
         }
 
-        public Map<String, Map<String, CubeSegmentStatistics>> getCubeSegmentStatisticsMap() {
+        public ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> getCubeSegmentStatisticsMap() {
             return cubeSegmentStatisticsMap;
         }
 
diff --git a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
index efc5030fa9..9fd7e05ff6 100644
--- a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
+++ b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
@@ -81,6 +81,10 @@ public static boolean getDisableCache() {
         return getBoolean(DEBUG_TOGGLE_DISABLE_QUERY_CACHE);
     }
 
+    public static boolean getDisableSegmentCache() {
+        return getBoolean(DEBUG_TOGGLE_DISABLE_QUERY_SEGMENT_CACHE);
+    }
+
     public static boolean getDisableFuzzyKey() {
         return getBoolean(DEBUG_TOGGLE_DISABLE_FUZZY_KEY);
     }
@@ -180,6 +184,18 @@ public static void cleanToggles() {
      */
     public final static String DEBUG_TOGGLE_DISABLE_QUERY_CACHE = "DEBUG_TOGGLE_DISABLE_QUERY_CACHE";
 
+    /**
+     * set DEBUG_TOGGLE_DISABLE_QUERY_SEGMENT_CACHE=true to prevent using segment cache for current query
+     *
+     *
+     *
+     example:(put it into request body)
+     "backdoorToggles": {
+     "DEBUG_TOGGLE_DISABLE_QUERY_SEGMENT_CACHE": "true"
+     }
+     */
+    public final static String DEBUG_TOGGLE_DISABLE_QUERY_SEGMENT_CACHE = "DEBUG_TOGGLE_DISABLE_QUERY_SEGMENT_CACHE";
+
     /**
      * set DEBUG_TOGGLE_HBASE_CUBE_QUERY_VERSION=v1/v2 to control which version CubeStorageQuery to use
      *
diff --git a/dev-support/checkstyle.xml b/dev-support/checkstyle.xml
index 45325bf950..eb16194c5b 100644
--- a/dev-support/checkstyle.xml
+++ b/dev-support/checkstyle.xml
@@ -38,7 +38,9 @@
             <property name="ignorePattern"
                       value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
         </module>
-        <module name="MethodLength"/>
+        <module name="MethodLength">
+            <property name="max" value="300"/>
+        </module>
         <module name="MethodParamPad"/>
         <module name="ParenPad"/>
         <module name="EmptyStatement"/>
diff --git a/pom.xml b/pom.xml
index 5b264b10d0..056ec54502 100644
--- a/pom.xml
+++ b/pom.xml
@@ -108,6 +108,7 @@
         <xerces.version>2.11.0</xerces.version>
         <xalan.version>2.7.2</xalan.version>
         <ehcache.version>2.10.2.2.21</ehcache.version>
+        <memcached.verion>2.12.3</memcached.verion>
         <apache-httpclient.version>4.2.5</apache-httpclient.version>
         <roaring.version>0.6.18</roaring.version>
         <cglib.version>3.2.4</cglib.version>
@@ -249,6 +250,11 @@
                 <artifactId>kylin-core-storage</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.kylin</groupId>
+                <artifactId>kylin-cache</artifactId>
+                <version>${project.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.kylin</groupId>
                 <artifactId>kylin-engine-mr</artifactId>
@@ -688,6 +694,11 @@
                 <artifactId>metrics-core</artifactId>
                 <version>${dropwizard.version}</version>
             </dependency>
+            <dependency>
+                <groupId>io.dropwizard.metrics</groupId>
+                <artifactId>metrics-jvm</artifactId>
+                <version>${dropwizard.version}</version>
+            </dependency>
 
             <!-- Test -->
             <dependency>
@@ -773,6 +784,11 @@
                 <artifactId>ehcache</artifactId>
                 <version>${ehcache.version}</version>
             </dependency>
+            <dependency>
+                <groupId>net.spy</groupId>
+                <artifactId>spymemcached</artifactId>
+                <version>${memcached.verion}</version>
+            </dependency>
             <dependency>
                 <groupId>org.opensaml</groupId>
                 <artifactId>opensaml</artifactId>
@@ -1169,6 +1185,7 @@
         <module>core-metrics</module>
         <module>metrics-reporter-hive</module>
         <module>metrics-reporter-kafka</module>
+        <module>cache</module>
     </modules>
 
     <reporting>
diff --git a/server-base/pom.xml b/server-base/pom.xml
index be21ff185f..976c5c26e4 100644
--- a/server-base/pom.xml
+++ b/server-base/pom.xml
@@ -74,6 +74,10 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-source-kafka</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-cache</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>net.sf.ehcache</groupId>
diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
index 0bdf037230..552a7342eb 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
@@ -22,7 +22,7 @@
 import java.util.List;
 
 import org.apache.commons.lang3.SerializationUtils;
-import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContext.CubeSegmentStatisticsResult;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,9 +73,16 @@
     protected boolean queryPushDown = false;
 
     protected byte[] queryStatistics;
-    
+
     protected String traceUrl = null;
 
+    // it's sql response signature for cache checking, no need to return and should be JsonIgnore
+    protected String signature;
+
+    // it's a temporary flag, no need to return and should be JsonIgnore
+    // indicating the lazy query start time, -1 indicating not enabled
+    protected long lazyQueryStartTime = -1L;
+
     public SQLResponse() {
     }
 
@@ -205,20 +212,42 @@ public String getTraceUrl() {
     public void setTraceUrl(String traceUrl) {
         this.traceUrl = traceUrl;
     }
-    
+
+    @JsonIgnore
+    public String getSignature() {
+        return signature;
+    }
+
+    public void setSignature(String signature) {
+        this.signature = signature;
+    }
+
+    @JsonIgnore
+    public long getLazyQueryStartTime() {
+        return lazyQueryStartTime;
+    }
+
+    public void setLazyQueryStartTime(long lazyQueryStartTime) {
+        this.lazyQueryStartTime = lazyQueryStartTime;
+    }
+
+    @JsonIgnore
+    public boolean isRunning() {
+        return this.lazyQueryStartTime >= 0;
+    }
+
     @JsonIgnore
-    public List<QueryContext.CubeSegmentStatisticsResult> getCubeSegmentStatisticsList() {
+    public List<CubeSegmentStatisticsResult> getCubeSegmentStatisticsList() {
         try {
-            return queryStatistics == null ? Lists.<QueryContext.CubeSegmentStatisticsResult> newArrayList()
-                    : (List<QueryContext.CubeSegmentStatisticsResult>) SerializationUtils.deserialize(queryStatistics);
+            return queryStatistics == null ? Lists.<CubeSegmentStatisticsResult> newArrayList()
+                    : (List<CubeSegmentStatisticsResult>) SerializationUtils.deserialize(queryStatistics);
         } catch (Exception e) { // deserialize exception should not block query
             logger.warn("Error while deserialize queryStatistics due to " + e);
             return Lists.newArrayList();
         }
     }
 
-    public void setCubeSegmentStatisticsList(
-            List<QueryContext.CubeSegmentStatisticsResult> cubeSegmentStatisticsList) {
+    public void setCubeSegmentStatisticsList(List<CubeSegmentStatisticsResult> cubeSegmentStatisticsList) {
         try {
             this.queryStatistics = cubeSegmentStatisticsList == null ? null
                     : SerializationUtils.serialize((Serializable) cubeSegmentStatisticsList);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/AceInfo.java b/server-base/src/main/java/org/apache/kylin/rest/service/AceInfo.java
index 0be1019e5c..55d40e563a 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/AceInfo.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/AceInfo.java
@@ -18,12 +18,14 @@
 
 package org.apache.kylin.rest.service;
 
+import java.io.Serializable;
+
 import org.springframework.security.acls.model.AccessControlEntry;
 
 /**
  * Created by xiefan on 17-5-2.
  */
-class AceInfo {
+class AceInfo implements Serializable {
     private SidInfo sidInfo;
     private int permissionMask;
 
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java b/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java
index 1eab0e5604..ec22b03f62 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java
@@ -26,6 +26,8 @@
 import java.util.List;
 import java.util.Map;
 
+import javax.annotation.PostConstruct;
+
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
@@ -39,6 +41,8 @@
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.cache.Cache;
+import org.springframework.cache.CacheManager;
 import org.springframework.security.acls.domain.AccessControlEntryImpl;
 import org.springframework.security.acls.domain.AclAuthorizationStrategy;
 import org.springframework.security.acls.domain.AclImpl;
@@ -65,12 +69,15 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.databind.JsonMappingException;
+import com.google.common.base.Preconditions;
 
 @Component("aclService")
 public class AclService implements MutableAclService {
 
     private static final Logger logger = LoggerFactory.getLogger(AclService.class);
 
+    public static final String ACL_CACHE = "AclCache";
+
     private final Field fieldAces = FieldUtils.getField(AclImpl.class, "aces");
 
     private final Field fieldAcl = FieldUtils.getField(AccessControlEntryImpl.class, "acl");
@@ -79,6 +86,9 @@
 
     public static final Serializer<AclRecord> SERIALIZER = new JsonSerializer<>(AclRecord.class);
 
+    @Autowired
+    private CacheManager cacheManager;
+
     @Autowired
     protected PermissionGrantingStrategy permissionGrantingStrategy;
 
@@ -103,6 +113,11 @@ public AclService() throws IOException {
         aclStore = ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
     }
 
+    @PostConstruct
+    public void init() throws IOException {
+        Preconditions.checkNotNull(cacheManager, "cacheManager is not injected yet");
+    }
+
     @Override
     public List<ObjectIdentity> findChildren(ObjectIdentity parentIdentity) {
         List<ObjectIdentity> oids = new ArrayList<ObjectIdentity>();
@@ -124,14 +139,17 @@ public AclService() throws IOException {
 
     @Override
     public Acl readAclById(ObjectIdentity object) throws NotFoundException {
-        Map<ObjectIdentity, Acl> aclsMap = readAclsById(Arrays.asList(object), null);
-        return aclsMap.get(object);
+        return readAclById(object, null);
     }
 
     @Override
     public Acl readAclById(ObjectIdentity object, List<Sid> sids) throws NotFoundException {
+        return readAclById(object, sids, KylinConfig.getInstanceFromEnv().isServerAclCacheEnabled());
+    }
+
+    public Acl readAclById(ObjectIdentity object, List<Sid> sids, boolean ifPutToCache) throws NotFoundException {
         Message msg = MsgPicker.getMsg();
-        Map<ObjectIdentity, Acl> aclsMap = readAclsById(Arrays.asList(object), sids);
+        Map<ObjectIdentity, Acl> aclsMap = readAclsById(Arrays.asList(object), sids, ifPutToCache);
         if (!aclsMap.containsKey(object)) {
             throw new BadRequestException(String.format(msg.getNO_ACL_ENTRY(), object));
         }
@@ -145,22 +163,43 @@ public Acl readAclById(ObjectIdentity object, List<Sid> sids) throws NotFoundExc
 
     @Override
     public Map<ObjectIdentity, Acl> readAclsById(List<ObjectIdentity> oids, List<Sid> sids) throws NotFoundException {
+        return readAclsById(oids, sids, KylinConfig.getInstanceFromEnv().isServerAclCacheEnabled());
+    }
+
+    public Map<ObjectIdentity, Acl> readAclsById(List<ObjectIdentity> oids, List<Sid> sids, boolean ifCacheEnabled)
+            throws NotFoundException {
         Message msg = MsgPicker.getMsg();
         Map<ObjectIdentity, Acl> aclMaps = new HashMap<ObjectIdentity, Acl>();
         try {
             for (ObjectIdentity oid : oids) {
-                AclRecord record = aclStore.getResource(getQueryKeyById(String.valueOf(oid.getIdentifier())),
-                        AclRecord.class, SERIALIZER);
+                String aclKey = getQueryKeyById(oid);
+
+                AclRecord record = null;
+                if (ifCacheEnabled) {
+                    Cache.ValueWrapper aclCacheValue = cacheManager.getCache(ACL_CACHE).get(aclKey);
+                    if (aclCacheValue != null) {
+                        record = (AclRecord) aclCacheValue.get();
+                    }
+                }
+                if (record == null) {
+                    record = aclStore.getResource(aclKey, AclRecord.class, SERIALIZER);
+                }
+
                 if (record != null && record.getOwnerInfo() != null) {
+                    if (ifCacheEnabled) {
+                        cacheManager.getCache(ACL_CACHE).put(aclKey, record);
+                    }
+
                     SidInfo owner = record.getOwnerInfo();
-                    Sid ownerSid = owner.isPrincipal() ? new PrincipalSid(owner.getSid()) : new GrantedAuthoritySid(owner.getSid());
+                    Sid ownerSid = owner.isPrincipal() ? new PrincipalSid(owner.getSid())
+                            : new GrantedAuthoritySid(owner.getSid());
                     boolean entriesInheriting = record.isEntriesInheriting();
 
                     Acl parentAcl = null;
                     DomainObjectInfo parent = record.getParentDomainObjectInfo();
                     if (parent != null) {
                         ObjectIdentity parentObject = new ObjectIdentityImpl(parent.getType(), parent.getId());
-                        parentAcl = readAclById(parentObject, null);
+                        parentAcl = readAclById(parentObject, null, ifCacheEnabled);
                     }
 
                     AclImpl acl = new AclImpl(oid, oid.getIdentifier(), aclAuthorizationStrategy, permissionGrantingStrategy, parentAcl, null, entriesInheriting, ownerSid);
@@ -193,7 +232,7 @@ public MutableAcl createAcl(ObjectIdentity objectIdentity) throws AlreadyExistsE
         PrincipalSid sid = new PrincipalSid(auth);
         try {
             AclRecord record = new AclRecord(new DomainObjectInfo(objectIdentity), null, new SidInfo(sid), true, null);
-            aclStore.putResource(getQueryKeyById(String.valueOf(objectIdentity.getIdentifier())), record, 0,
+            aclStore.putResource(getQueryKeyById(objectIdentity), record, 0,
                     SERIALIZER);
             logger.debug("ACL of " + objectIdentity + " created successfully.");
         } catch (IOException e) {
@@ -213,7 +252,9 @@ public void deleteAcl(ObjectIdentity objectIdentity, boolean deleteChildren) thr
             for (ObjectIdentity oid : children) {
                 deleteAcl(oid, deleteChildren);
             }
-            aclStore.deleteResource(getQueryKeyById(String.valueOf(objectIdentity.getIdentifier())));
+            String aclKey = getQueryKeyById(objectIdentity);
+            aclStore.deleteResource(aclKey);
+            cacheManager.getCache(ACL_CACHE).evict(aclKey);
             logger.debug("ACL of " + objectIdentity + " deleted successfully.");
         } catch (IOException e) {
             throw new InternalErrorException(e);
@@ -224,13 +265,13 @@ public void deleteAcl(ObjectIdentity objectIdentity, boolean deleteChildren) thr
     public MutableAcl updateAcl(MutableAcl mutableAcl) throws NotFoundException {
         Message msg = MsgPicker.getMsg();
         try {
-            readAclById(mutableAcl.getObjectIdentity());
+            readAclById(mutableAcl.getObjectIdentity(), null, false);
         } catch (NotFoundException e) {
             throw e;
         }
 
         try {
-            String id = getQueryKeyById(String.valueOf(mutableAcl.getObjectIdentity().getIdentifier()));
+            String id = getQueryKeyById(mutableAcl.getObjectIdentity());
             AclRecord record = aclStore.getResource(id, AclRecord.class, SERIALIZER);
             if (mutableAcl.getParentAcl() != null) {
                 record.setParentDomainObjectInfo(new DomainObjectInfo(mutableAcl.getParentAcl().getObjectIdentity()));
@@ -252,6 +293,7 @@ public MutableAcl updateAcl(MutableAcl mutableAcl) throws NotFoundException {
                 allAceInfo.put(String.valueOf(aceInfo.getSidInfo().getSid()), aceInfo);
             }
             aclStore.putResourceWithoutCheck(id, record, System.currentTimeMillis(), SERIALIZER);
+            cacheManager.getCache(ACL_CACHE).evict(id);
             logger.debug("ACL of " + mutableAcl.getObjectIdentity() + " updated successfully.");
         } catch (IOException e) {
             throw new InternalErrorException(e);
@@ -304,6 +346,10 @@ private void setAces(AclImpl acl, List<AccessControlEntry> aces) {
         }
     }
 
+    private static String getQueryKeyById(ObjectIdentity oid) {
+        return getQueryKeyById(String.valueOf(oid.getIdentifier()));
+    }
+
     public static String getQueryKeyById(String id) {
         return DIR_PREFIX + id;
     }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 10ab90b82b..d52aa81b5f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.rest.service;
 
+import static org.apache.kylin.cache.cachemanager.CacheConstants.QUERY_CACHE;
+
 import java.io.IOException;
 
 import java.util.Map;
@@ -117,9 +119,12 @@ public void notifyMetadataChange(String entity, Event event, String cacheKey) th
 
     public void cleanDataCache(String project) {
         if (cacheManager != null) {
-            logger.info("cleaning cache for project " + project + " (currently remove all entries)");
-            cacheManager.getCache(QueryService.SUCCESS_QUERY_CACHE).removeAll();
-            cacheManager.getCache(QueryService.EXCEPTION_QUERY_CACHE).removeAll();
+            if (getConfig().isQueryCacheSignatureEnabled()) {
+                logger.info("cleaning cache for project " + project + " (currently remove nothing)");
+            } else {
+                logger.info("cleaning cache for project " + project + " (currently remove all entries)");
+                cacheManager.getCache(QUERY_CACHE).removeAll();
+            }
         } else {
             logger.warn("skip cleaning cache for project " + project);
         }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/DomainObjectInfo.java b/server-base/src/main/java/org/apache/kylin/rest/service/DomainObjectInfo.java
index f07a65eefe..453bc31339 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/DomainObjectInfo.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/DomainObjectInfo.java
@@ -18,10 +18,12 @@
 
 package org.apache.kylin.rest.service;
 
+import java.io.Serializable;
+
 import org.springframework.security.acls.model.ObjectIdentity;
 
 
-class DomainObjectInfo {
+class DomainObjectInfo implements Serializable {
     private String id;
     private String type;
 
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 5f67aa183b..3d43e6c052 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -22,11 +22,9 @@
 import com.google.common.base.CharMatcher;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import net.sf.ehcache.Cache;
-import net.sf.ehcache.CacheManager;
-import net.sf.ehcache.Element;
 import org.apache.calcite.avatica.ColumnMetaData.Rep;
 import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.jdbc.CalcitePrepare;
@@ -34,6 +32,7 @@
 import org.apache.calcite.prepare.OnlyPrepareEarlyAbortException;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -85,6 +84,7 @@
 import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.kylin.rest.util.AclPermissionUtil;
 import org.apache.kylin.rest.util.QueryRequestLimits;
+import org.apache.kylin.rest.util.SQLResponseSignatureUtil;
 import org.apache.kylin.rest.util.TableauInterceptor;
 import org.apache.kylin.shaded.htrace.org.apache.htrace.Sampler;
 import org.apache.kylin.shaded.htrace.org.apache.htrace.Trace;
@@ -94,6 +94,8 @@
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.cache.Cache;
+import org.springframework.cache.CacheManager;
 import org.springframework.security.access.AccessDeniedException;
 import org.springframework.security.core.GrantedAuthority;
 import org.springframework.security.core.context.SecurityContextHolder;
@@ -125,6 +127,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kylin.cache.cachemanager.CacheConstants.QUERY_CACHE;
 import static org.apache.kylin.common.util.CheckUtil.checkCondition;
 
 /**
@@ -133,8 +136,6 @@
 @Component("queryService")
 public class QueryService extends BasicService {
 
-    public static final String SUCCESS_QUERY_CACHE = "StorageCache";
-    public static final String EXCEPTION_QUERY_CACHE = "ExceptionQueryCache";
     public static final String QUERY_STORE_PATH_PREFIX = "/query/";
     private static final Logger logger = LoggerFactory.getLogger(QueryService.class);
     final BadQueryDetector badQueryDetector = new BadQueryDetector();
@@ -208,7 +209,7 @@ public SQLResponse update(SQLRequest sqlRequest) throws Exception {
             columnMetas.add(new SelectedColumnMeta(false, false, false, false, 1, false, Integer.MAX_VALUE, "c0", "c0",
                     null, null, null, Integer.MAX_VALUE, 128, 1, "char", false, false, false));
 
-            return buildSqlResponse(true, r.getFirst(), columnMetas);
+            return buildSqlResponse(sqlRequest.getProject(), true, r.getFirst(), columnMetas);
 
         } catch (Exception e) {
             logger.info("pushdown engine failed to finish current non-select query");
@@ -295,6 +296,12 @@ public void logQuery(final String queryId, final SQLRequest request, final SQLRe
             }
         }
 
+        if (realizationNames.isEmpty()) {
+            if (!Strings.isNullOrEmpty(response.getCube())) {
+                CollectionUtils.addAll(realizationNames, response.getCube().split(","));
+            }
+        }
+
         int resultRowCount = 0;
         if (!response.getIsException() && response.getResults() != null) {
             resultRowCount = response.getResults().size();
@@ -489,8 +496,16 @@ private SQLResponse queryAndUpdateCache(SQLRequest sqlRequest, boolean queryCach
         Message msg = MsgPicker.getMsg();
         final QueryContext queryContext = QueryContextFacade.current();
 
+        boolean isLazyQueryEnabled = queryCacheEnabled && kylinConfig.isLazyQueryEnabled();
         SQLResponse sqlResponse = null;
         try {
+            // Add dummy response which will be updated or evicted when query finishes
+            if (isLazyQueryEnabled) {
+                SQLResponse dummyResponse = new SQLResponse();
+                dummyResponse.setLazyQueryStartTime(System.currentTimeMillis());
+                cacheManager.getCache(QUERY_CACHE).put(sqlRequest.getCacheKey(), dummyResponse);
+            }
+
             final boolean isSelect = QueryUtil.isSelectStatement(sqlRequest.getSql());
             if (isSelect) {
                 sqlResponse = query(sqlRequest, queryContext.getQueryId());
@@ -511,22 +526,26 @@ private SQLResponse queryAndUpdateCache(SQLRequest sqlRequest, boolean queryCach
                     String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()),
                     String.valueOf(sqlResponse.getTotalScanCount()));
             if (checkCondition(queryCacheEnabled, "query cache is disabled") //
+                    && checkCondition(!Strings.isNullOrEmpty(sqlResponse.getCube()),
+                    "query does not hit cube nor hybrid") //
                     && checkCondition(!sqlResponse.getIsException(), "query has exception") //
                     && checkCondition(
-                            !(sqlResponse.isPushDown()
-                                    && (isSelect == false || kylinConfig.isPushdownQueryCacheEnabled() == false)),
-                            "query is executed with pushdown, but it is non-select, or the cache for pushdown is disabled") //
+                    !(sqlResponse.isPushDown()
+                            && (isSelect == false || kylinConfig.isPushdownQueryCacheEnabled() == false)),
+                    "query is executed with pushdown, but it is non-select, or the cache for pushdown is disabled") //
                     && checkCondition(
-                            sqlResponse.getDuration() > durationThreshold
-                                    || sqlResponse.getTotalScanCount() > scanCountThreshold
-                                    || sqlResponse.getTotalScanBytes() > scanBytesThreshold, //
-                            "query is too lightweight with duration: {} (threshold {}), scan count: {} (threshold {}), scan bytes: {} (threshold {})",
-                            sqlResponse.getDuration(), durationThreshold, sqlResponse.getTotalScanCount(),
-                            scanCountThreshold, sqlResponse.getTotalScanBytes(), scanBytesThreshold)
+                    sqlResponse.getDuration() > durationThreshold
+                            || sqlResponse.getTotalScanCount() > scanCountThreshold
+                            || sqlResponse.getTotalScanBytes() > scanBytesThreshold, //
+                    "query is too lightweight with duration: {} (threshold {}), scan count: {} (threshold {}), scan bytes: {} (threshold {})",
+                    sqlResponse.getDuration(), durationThreshold, sqlResponse.getTotalScanCount(),
+                    scanCountThreshold, sqlResponse.getTotalScanBytes(), scanBytesThreshold)
                     && checkCondition(sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold(),
-                            "query response is too large: {} ({})", sqlResponse.getResults().size(),
-                            kylinConfig.getLargeQueryThreshold())) {
-                cacheManager.getCache(SUCCESS_QUERY_CACHE).put(new Element(sqlRequest.getCacheKey(), sqlResponse));
+                    "query response is too large: {} ({})", sqlResponse.getResults().size(),
+                    kylinConfig.getLargeQueryThreshold())) {
+                cacheManager.getCache(QUERY_CACHE).put(sqlRequest.getCacheKey(), sqlResponse);
+            } else if (isLazyQueryEnabled) {
+                cacheManager.getCache(QUERY_CACHE).evict(sqlRequest.getCacheKey());
             }
             Trace.addTimelineAnnotation("response from execution");
 
@@ -536,15 +555,15 @@ private SQLResponse queryAndUpdateCache(SQLRequest sqlRequest, boolean queryCach
             logger.error("Exception while executing query", e);
             String errMsg = makeErrorMsgUserFriendly(e);
 
-            sqlResponse = new SQLResponse(null, null, null, 0, true, errMsg, false, false);
-            sqlResponse.setTotalScanCount(queryContext.getScannedRows());
-            sqlResponse.setTotalScanBytes(queryContext.getScannedBytes());
-            sqlResponse.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList());
+            sqlResponse = buildSqlResponse(sqlRequest.getProject(), false, null, null, true, errMsg);
+            sqlResponse.setThrowable(e.getCause() == null ? e : ExceptionUtils.getRootCause(e));
 
             if (queryCacheEnabled && e.getCause() != null
                     && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) {
-                Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
-                exceptionCache.put(new Element(sqlRequest.getCacheKey(), sqlResponse));
+                Cache exceptionCache = cacheManager.getCache(QUERY_CACHE);
+                exceptionCache.put(sqlRequest.getCacheKey(), sqlResponse);
+            } else if (isLazyQueryEnabled) {
+                cacheManager.getCache(QUERY_CACHE).evict(sqlRequest.getCacheKey());
             }
             Trace.addTimelineAnnotation("error response");
         }
@@ -593,21 +612,54 @@ private String getUserName() {
     }
 
     public SQLResponse searchQueryInCache(SQLRequest sqlRequest) {
-        SQLResponse response = null;
-        Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
-        Cache successCache = cacheManager.getCache(SUCCESS_QUERY_CACHE);
-
-        Element element = null;
-        if ((element = exceptionCache.get(sqlRequest.getCacheKey())) != null) {
-            logger.info("The sqlResponse is found in EXCEPTION_QUERY_CACHE");
-            response = (SQLResponse) element.getObjectValue();
+        Cache queryCache = cacheManager.getCache(QUERY_CACHE);
+        Cache.ValueWrapper wrapper = queryCache.get(sqlRequest.getCacheKey());
+        if (wrapper == null) {
+            return null;
+        }
+        SQLResponse response = (SQLResponse) wrapper.get();
+        if (response == null) {
+            return null;
+        }
+
+        // Check whether duplicate query exists
+        while (response.isRunning()) {
+            // Wait at most one minute
+            if (System.currentTimeMillis() - response.getLazyQueryStartTime() >= getConfig()
+                    .getLazyQueryWaitingTimeoutMilliSeconds()) {
+                queryCache.evict(sqlRequest.getCacheKey());
+                return null;
+            }
+            logger.info("Duplicated SQL request is running, waiting...");
+            try {
+                Thread.sleep(100L);
+            } catch (InterruptedException e) {
+                logger.error("Thread interrupted due to " + e);
+                throw new RuntimeException(e);
+            }
+            wrapper = queryCache.get(sqlRequest.getCacheKey());
+            if (wrapper == null) {
+                return null;
+            }
+            response = (SQLResponse) wrapper.get();
+            if (response == null) {
+                return null;
+            }
+        }
+
+        logger.debug("The sqlResponse is found in QUERY_CACHE");
+        if (getConfig().isQueryCacheSignatureEnabled()
+                && !SQLResponseSignatureUtil.checkSignature(getConfig(), response, sqlRequest.getProject())) {
+            logger.info("The sql response signature is changed. Remove it from QUERY_CACHE.");
+            queryCache.evict(sqlRequest.getCacheKey());
+            return null;
+        }
+
+        if (response.getIsException()) {
             response.setHitExceptionCache(true);
-        } else if ((element = successCache.get(sqlRequest.getCacheKey())) != null) {
-            logger.info("The sqlResponse is found in SUCCESS_QUERY_CACHE");
-            response = (SQLResponse) element.getObjectValue();
+        } else {
             response.setStorageCacheUsed(true);
         }
-
         return response;
     }
 
@@ -913,7 +965,8 @@ private SQLResponse execute(String correctedSql, SQLRequest sqlRequest, Connecti
 
             // special case for prepare query.
             if (BackdoorToggles.getPrepareOnly()) {
-                return getPrepareOnlySqlResponse(correctedSql, conn, isPushDown, results, columnMetas);
+                return getPrepareOnlySqlResponse(sqlRequest.getProject(), correctedSql, conn, isPushDown, results,
+                        columnMetas);
             }
 
             if (isPrepareStatementWithParams(sqlRequest)) {
@@ -977,15 +1030,16 @@ private SQLResponse execute(String correctedSql, SQLRequest sqlRequest, Connecti
             close(resultSet, stat, null); //conn is passed in, not my duty to close
         }
 
-        return buildSqlResponse(isPushDown, results, columnMetas);
+        return buildSqlResponse(sqlRequest.getProject(), isPushDown, results, columnMetas);
     }
 
     protected String makeErrorMsgUserFriendly(Throwable e) {
         return QueryUtil.makeErrorMsgUserFriendly(e);
     }
 
-    private SQLResponse getPrepareOnlySqlResponse(String correctedSql, Connection conn, Boolean isPushDown,
-            List<List<String>> results, List<SelectedColumnMeta> columnMetas) throws SQLException {
+    private SQLResponse getPrepareOnlySqlResponse(String projectName, String correctedSql, Connection conn,
+                                                  Boolean isPushDown,
+                                                  List<List<String>> results, List<SelectedColumnMeta> columnMetas) throws SQLException {
 
         CalcitePrepareImpl.KYLIN_ONLY_PREPARE.set(true);
 
@@ -1030,7 +1084,7 @@ private SQLResponse getPrepareOnlySqlResponse(String correctedSql, Connection co
             DBUtils.closeQuietly(preparedStatement);
         }
 
-        return buildSqlResponse(isPushDown, results, columnMetas);
+        return buildSqlResponse(projectName, isPushDown, results, columnMetas);
     }
 
     private boolean isPrepareStatementWithParams(SQLRequest sqlRequest) {
@@ -1040,10 +1094,15 @@ private boolean isPrepareStatementWithParams(SQLRequest sqlRequest) {
         return false;
     }
 
-    private SQLResponse buildSqlResponse(Boolean isPushDown, List<List<String>> results,
-            List<SelectedColumnMeta> columnMetas) {
+    private SQLResponse buildSqlResponse(String projectName, Boolean isPushDown, List<List<String>> results,
+                                         List<SelectedColumnMeta> columnMetas) {
+        return buildSqlResponse(projectName, isPushDown, results, columnMetas, false, null);
+    }
 
+    private SQLResponse buildSqlResponse(String projectName, Boolean isPushDown, List<List<String>> results,
+                                         List<SelectedColumnMeta> columnMetas, boolean isException, String exceptionMessage) {
         boolean isPartialResult = false;
+
         StringBuilder cubeSb = new StringBuilder();
         StringBuilder logSb = new StringBuilder("Processed rows for each storageContext: ");
         QueryContext queryContext = QueryContextFacade.current();
@@ -1067,11 +1126,14 @@ private SQLResponse buildSqlResponse(Boolean isPushDown, List<List<String>> resu
         }
         logger.info(logSb.toString());
 
-        SQLResponse response = new SQLResponse(columnMetas, results, cubeSb.toString(), 0, false, null, isPartialResult,
-                isPushDown);
+        SQLResponse response = new SQLResponse(columnMetas, results, cubeSb.toString(), 0, isException,
+                exceptionMessage, isPartialResult, isPushDown);
         response.setTotalScanCount(queryContext.getScannedRows());
         response.setTotalScanBytes(queryContext.getScannedBytes());
         response.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList());
+        if (getConfig().isQueryCacheSignatureEnabled()) {
+            response.setSignature(SQLResponseSignatureUtil.createSignature(getConfig(), response, projectName));
+        }
         return response;
     }
 
@@ -1094,51 +1156,51 @@ private void setParam(PreparedStatement preparedState, int index, PrepareSqlRequ
         Rep rep = Rep.of(clazz);
 
         switch (rep) {
-        case PRIMITIVE_CHAR:
-        case CHARACTER:
-        case STRING:
-            preparedState.setString(index, isNull ? null : String.valueOf(param.getValue()));
-            break;
-        case PRIMITIVE_INT:
-        case INTEGER:
-            preparedState.setInt(index, isNull ? 0 : Integer.valueOf(param.getValue()));
-            break;
-        case PRIMITIVE_SHORT:
-        case SHORT:
-            preparedState.setShort(index, isNull ? 0 : Short.valueOf(param.getValue()));
-            break;
-        case PRIMITIVE_LONG:
-        case LONG:
-            preparedState.setLong(index, isNull ? 0 : Long.valueOf(param.getValue()));
-            break;
-        case PRIMITIVE_FLOAT:
-        case FLOAT:
-            preparedState.setFloat(index, isNull ? 0 : Float.valueOf(param.getValue()));
-            break;
-        case PRIMITIVE_DOUBLE:
-        case DOUBLE:
-            preparedState.setDouble(index, isNull ? 0 : Double.valueOf(param.getValue()));
-            break;
-        case PRIMITIVE_BOOLEAN:
-        case BOOLEAN:
-            preparedState.setBoolean(index, !isNull && Boolean.parseBoolean(param.getValue()));
-            break;
-        case PRIMITIVE_BYTE:
-        case BYTE:
-            preparedState.setByte(index, isNull ? 0 : Byte.valueOf(param.getValue()));
-            break;
-        case JAVA_UTIL_DATE:
-        case JAVA_SQL_DATE:
-            preparedState.setDate(index, isNull ? null : java.sql.Date.valueOf(param.getValue()));
-            break;
-        case JAVA_SQL_TIME:
-            preparedState.setTime(index, isNull ? null : Time.valueOf(param.getValue()));
-            break;
-        case JAVA_SQL_TIMESTAMP:
-            preparedState.setTimestamp(index, isNull ? null : Timestamp.valueOf(param.getValue()));
-            break;
-        default:
-            preparedState.setObject(index, isNull ? null : param.getValue());
+            case PRIMITIVE_CHAR:
+            case CHARACTER:
+            case STRING:
+                preparedState.setString(index, isNull ? null : String.valueOf(param.getValue()));
+                break;
+            case PRIMITIVE_INT:
+            case INTEGER:
+                preparedState.setInt(index, isNull ? 0 : Integer.valueOf(param.getValue()));
+                break;
+            case PRIMITIVE_SHORT:
+            case SHORT:
+                preparedState.setShort(index, isNull ? 0 : Short.valueOf(param.getValue()));
+                break;
+            case PRIMITIVE_LONG:
+            case LONG:
+                preparedState.setLong(index, isNull ? 0 : Long.valueOf(param.getValue()));
+                break;
+            case PRIMITIVE_FLOAT:
+            case FLOAT:
+                preparedState.setFloat(index, isNull ? 0 : Float.valueOf(param.getValue()));
+                break;
+            case PRIMITIVE_DOUBLE:
+            case DOUBLE:
+                preparedState.setDouble(index, isNull ? 0 : Double.valueOf(param.getValue()));
+                break;
+            case PRIMITIVE_BOOLEAN:
+            case BOOLEAN:
+                preparedState.setBoolean(index, !isNull && Boolean.parseBoolean(param.getValue()));
+                break;
+            case PRIMITIVE_BYTE:
+            case BYTE:
+                preparedState.setByte(index, isNull ? 0 : Byte.valueOf(param.getValue()));
+                break;
+            case JAVA_UTIL_DATE:
+            case JAVA_SQL_DATE:
+                preparedState.setDate(index, isNull ? null : java.sql.Date.valueOf(param.getValue()));
+                break;
+            case JAVA_SQL_TIME:
+                preparedState.setTime(index, isNull ? null : Time.valueOf(param.getValue()));
+                break;
+            case JAVA_SQL_TIMESTAMP:
+                preparedState.setTimestamp(index, isNull ? null : Timestamp.valueOf(param.getValue()));
+                break;
+            default:
+                preparedState.setObject(index, isNull ? null : param.getValue());
         }
     }
 
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/SidInfo.java b/server-base/src/main/java/org/apache/kylin/rest/service/SidInfo.java
index 0a89449575..e933b83928 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/SidInfo.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/SidInfo.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.rest.service;
 
+import java.io.Serializable;
+
 import org.springframework.security.acls.domain.GrantedAuthoritySid;
 import org.springframework.security.acls.domain.PrincipalSid;
 import org.springframework.security.acls.model.Sid;
@@ -25,7 +27,7 @@
 /**
  * Created by xiefan on 17-5-2.
  */
-class SidInfo {
+class SidInfo implements Serializable {
     private String sid;
     private boolean isPrincipal;
 
diff --git a/server-base/src/main/java/org/apache/kylin/rest/signature/ComponentSignature.java b/server-base/src/main/java/org/apache/kylin/rest/signature/ComponentSignature.java
new file mode 100644
index 0000000000..7b3a2e4b9f
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/signature/ComponentSignature.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.signature;
+
+import java.io.Serializable;
+
+abstract class ComponentSignature<T extends ComponentSignature> implements Serializable, Comparable<T> {
+
+    public abstract String getKey();
+
+    @Override
+    public int compareTo(T o) {
+        return getKey().compareTo(o.getKey());
+    }
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/signature/FactTableRealizationSetCalculator.java b/server-base/src/main/java/org/apache/kylin/rest/signature/FactTableRealizationSetCalculator.java
new file mode 100644
index 0000000000..6a6c9866a2
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/signature/FactTableRealizationSetCalculator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.signature;
+
+import java.util.Set;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.RealizationEntry;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.apache.kylin.storage.hybrid.HybridManager;
+
+import com.google.common.collect.Sets;
+
+public class FactTableRealizationSetCalculator extends RealizationSetCalculator {
+
+    /**
+     * In case that cube selection result changes after a new cube's data is ready,
+     * the cache result should be invalidated, which requires the related signature should be changed.
+     * To achieve this, we need to consider all of those cubes who shares the same fact table
+     */
+    @Override
+    protected Set<String> getRealizations(KylinConfig config, String cubes, ProjectInstance project) {
+        Set<String> realizations = super.getRealizations(config, cubes, project);
+        if (realizations == null) {
+            return null;
+        }
+        Set<String> factTables = Sets.newHashSet();
+        for (String realization : realizations) {
+            String factTable = getFactTablesForRealization(config, realization);
+            if (factTable != null) {
+                factTables.add(factTable);
+            }
+        }
+        Set<String> ret = Sets.newHashSet(realizations);
+        for (RealizationEntry entry : project.getRealizationEntries()) {
+            String realization = entry.getRealization();
+            switch (entry.getType()) {
+            case CUBE:
+                CubeInstance cubeInstance = CubeManager.getInstance(config).getCube(realization);
+                if (cubeInstance != null) {
+                    if (factTables.contains(cubeInstance.getRootFactTable())) {
+                        ret.add(realization);
+                    }
+                }
+                break;
+            case HYBRID:
+                HybridInstance hybridInstance = HybridManager.getInstance(config).getHybridInstance(realization);
+                if (hybridInstance != null && hybridInstance.getModel() != null) {
+                    if (factTables.contains(hybridInstance.getModel().getRootFactTable().getTableIdentity())) {
+                        ret.add(realization);
+                    }
+                }
+                break;
+            default:
+            }
+        }
+        return ret;
+    }
+
+    private String getFactTablesForRealization(KylinConfig config, String name) {
+        HybridInstance hybridInstance = HybridManager.getInstance(config).getHybridInstance(name);
+        if (hybridInstance != null) {
+            return hybridInstance.getModel().getRootFactTable().getTableIdentity();
+        }
+        CubeInstance cubeInstance = CubeManager.getInstance(config).getCube(name);
+        if (cubeInstance != null) {
+            return cubeInstance.getRootFactTable();
+        }
+        return null;
+    }
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/signature/RealizationSetCalculator.java b/server-base/src/main/java/org/apache/kylin/rest/signature/RealizationSetCalculator.java
new file mode 100644
index 0000000000..e204cbb249
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/signature/RealizationSetCalculator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.signature;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Set;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.rest.response.SQLResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+public class RealizationSetCalculator implements SignatureCalculator {
+
+    public static final Logger logger = LoggerFactory.getLogger(RealizationSetCalculator.class);
+
+    @Override
+    public String calculateSignature(KylinConfig config, SQLResponse sqlResponse, ProjectInstance project) {
+        Set<String> realizations = getRealizations(config, sqlResponse.getCube(), project);
+        if (realizations == null) {
+            return null;
+        }
+        Set<RealizationSignature> signatureSet = Sets.newTreeSet();
+        for (String realization : realizations) {
+            RealizationSignature realizationSignature = getRealizationSignature(config, realization);
+            if (realizationSignature != null) {
+                signatureSet.add(realizationSignature);
+            }
+        }
+        if (signatureSet.isEmpty()) {
+            return null;
+        }
+        try {
+            MessageDigest md = MessageDigest.getInstance("MD5");
+            byte[] signature = md.digest(signatureSet.toString().getBytes());
+            return new String(Base64.encodeBase64(signature));
+        } catch (NoSuchAlgorithmException e) {
+            logger.warn("Failed to calculate signature due to " + e);
+            return null;
+        }
+    }
+
+    protected Set<String> getRealizations(KylinConfig config, String cubes, ProjectInstance project) {
+        if (Strings.isNullOrEmpty(cubes)) {
+            return null;
+        }
+        String[] realizations = parseNamesFromCanonicalNames(cubes.split(","));
+        return Sets.newHashSet(realizations);
+    }
+
+    protected static RealizationSignature getRealizationSignature(KylinConfig config, String realizationName) {
+        RealizationSignature result = RealizationSignature.HybridSignature.getHybridSignature(config, realizationName);
+        if (result == null) {
+            result = RealizationSignature.CubeSignature.getCubeSignature(config, realizationName);
+        }
+        return result;
+    }
+
+    private static String[] parseNamesFromCanonicalNames(String[] canonicalNames) {
+        String[] result = new String[canonicalNames.length];
+        for (int i = 0; i < canonicalNames.length; i++) {
+            result[i] = parseCanonicalName(canonicalNames[i]).getSecond();
+        }
+        return result;
+    }
+
+    /**
+     * @param canonicalName
+     * @return type and name pair for realization
+     */
+    private static Pair<String, String> parseCanonicalName(String canonicalName) {
+        Iterable<String> parts = Splitter.on(CharMatcher.anyOf("[]=,")).split(canonicalName);
+        String[] partsStr = Iterables.toArray(parts, String.class);
+        return new Pair<>(partsStr[0], partsStr[2]);
+    }
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/signature/RealizationSignature.java b/server-base/src/main/java/org/apache/kylin/rest/signature/RealizationSignature.java
new file mode 100644
index 0000000000..9e54085250
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/signature/RealizationSignature.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.signature;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.apache.kylin.storage.hybrid.HybridManager;
+
+import com.google.common.collect.Sets;
+
+public abstract class RealizationSignature extends ComponentSignature<RealizationSignature> {
+
+    static class CubeSignature extends RealizationSignature {
+        public final String name;
+        public final RealizationStatusEnum status;
+        public final Set<SegmentSignature> segmentSignatureSet;
+
+        private CubeSignature(String name, RealizationStatusEnum status, Set<SegmentSignature> segmentSignatureSet) {
+            this.name = name;
+            this.status = status;
+            this.segmentSignatureSet = segmentSignatureSet;
+        }
+
+        public String getKey() {
+            return name;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            CubeSignature that = (CubeSignature) o;
+
+            if (name != null ? !name.equals(that.name) : that.name != null)
+                return false;
+            if (status != that.status)
+                return false;
+            return segmentSignatureSet != null ? segmentSignatureSet.equals(that.segmentSignatureSet)
+                    : that.segmentSignatureSet == null;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = name != null ? name.hashCode() : 0;
+            result = 31 * result + (status != null ? status.hashCode() : 0);
+            result = 31 * result + (segmentSignatureSet != null ? segmentSignatureSet.hashCode() : 0);
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            return name + "-" + status + ":"
+                    + (segmentSignatureSet != null ? Sets.newTreeSet(segmentSignatureSet) : null);
+        }
+
+        static CubeSignature getCubeSignature(KylinConfig config, String realizationName) {
+            CubeInstance cubeInstance = CubeManager.getInstance(config).getCube(realizationName);
+            if (cubeInstance == null) {
+                return null;
+            }
+            if (!cubeInstance.isReady()) {
+                return new CubeSignature(realizationName, RealizationStatusEnum.DISABLED, null);
+            }
+            List<CubeSegment> readySegments = cubeInstance.getSegments(SegmentStatusEnum.READY);
+            Set<SegmentSignature> segmentSignatureSet = Sets.newHashSetWithExpectedSize(readySegments.size());
+            for (CubeSegment cubeSeg : readySegments) {
+                segmentSignatureSet.add(new SegmentSignature(cubeSeg.getName(), cubeSeg.getLastBuildTime()));
+            }
+            return new CubeSignature(realizationName, RealizationStatusEnum.READY, segmentSignatureSet);
+        }
+    }
+
+    static class HybridSignature extends RealizationSignature {
+        public final String name;
+        public final Set<RealizationSignature> realizationSignatureSet;
+
+        private HybridSignature(String name, Set<RealizationSignature> realizationSignatureSet) {
+            this.name = name;
+            this.realizationSignatureSet = realizationSignatureSet;
+        }
+
+        public String getKey() {
+            return name;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            HybridSignature that = (HybridSignature) o;
+
+            if (name != null ? !name.equals(that.name) : that.name != null)
+                return false;
+            return realizationSignatureSet != null ? realizationSignatureSet.equals(that.realizationSignatureSet)
+                    : that.realizationSignatureSet == null;
+
+        }
+
+        @Override
+        public int hashCode() {
+            int result = name != null ? name.hashCode() : 0;
+            result = 31 * result + (realizationSignatureSet != null ? realizationSignatureSet.hashCode() : 0);
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            return name + ":" + (realizationSignatureSet != null ? Sets.newTreeSet(realizationSignatureSet) : null);
+        }
+
+        static HybridSignature getHybridSignature(KylinConfig config, String realizationName) {
+            HybridInstance hybridInstance = HybridManager.getInstance(config).getHybridInstance(realizationName);
+            if (hybridInstance == null) {
+                return null;
+            }
+            IRealization[] realizations = hybridInstance.getRealizations();
+            Set<RealizationSignature> realizationSignatureSet = Sets.newHashSetWithExpectedSize(realizations.length);
+            for (IRealization realization : realizations) {
+                RealizationSignature realizationSignature = null;
+                if (realization.getType() == RealizationType.CUBE) {
+                    realizationSignature = CubeSignature.getCubeSignature(config, realization.getName());
+                } else if (realization.getType() == RealizationType.HYBRID) {
+                    realizationSignature = getHybridSignature(config, realization.getName());
+                }
+                if (realizationSignature != null) {
+                    realizationSignatureSet.add(realizationSignature);
+                }
+            }
+            return new HybridSignature(realizationName, realizationSignatureSet);
+        }
+    }
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/signature/SegmentSignature.java b/server-base/src/main/java/org/apache/kylin/rest/signature/SegmentSignature.java
new file mode 100644
index 0000000000..800dd99b46
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/signature/SegmentSignature.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.signature;
+
+class SegmentSignature extends ComponentSignature<SegmentSignature> {
+    public final String name;
+    public final long lastBuildTime;
+
+    public SegmentSignature(String name, long lastBuildTime) {
+        this.name = name;
+        this.lastBuildTime = lastBuildTime;
+    }
+
+    public String getKey() {
+        return name;
+    }
+
+    @Override
+    public int compareTo(SegmentSignature o) {
+        return name.compareTo(o.name);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        SegmentSignature that = (SegmentSignature) o;
+
+        if (lastBuildTime != that.lastBuildTime)
+            return false;
+        return name != null ? name.equals(that.name) : that.name == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = name != null ? name.hashCode() : 0;
+        result = 31 * result + (int) (lastBuildTime ^ (lastBuildTime >>> 32));
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return name + ":" + lastBuildTime;
+    }
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/signature/SignatureCalculator.java b/server-base/src/main/java/org/apache/kylin/rest/signature/SignatureCalculator.java
new file mode 100644
index 0000000000..1f94bebbb7
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/signature/SignatureCalculator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.signature;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.rest.response.SQLResponse;
+
+public interface SignatureCalculator {
+
+    String calculateSignature(KylinConfig config, SQLResponse sqlResponse, ProjectInstance project);
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/util/SQLResponseSignatureUtil.java b/server-base/src/main/java/org/apache/kylin/rest/util/SQLResponseSignatureUtil.java
new file mode 100644
index 0000000000..2a575546a7
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/util/SQLResponseSignatureUtil.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.util;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.rest.response.SQLResponse;
+import org.apache.kylin.rest.signature.FactTableRealizationSetCalculator;
+import org.apache.kylin.rest.signature.SignatureCalculator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class SQLResponseSignatureUtil {
+
+    public static final Logger logger = LoggerFactory.getLogger(SQLResponseSignatureUtil.class);
+
+    public static boolean checkSignature(KylinConfig config, SQLResponse sqlResponse, String projectName) {
+        String old = sqlResponse.getSignature();
+        if (old == null) {
+            return false;
+        }
+        String latest = createSignature(config, sqlResponse, projectName);
+        return old.equals(latest);
+    }
+
+    public static String createSignature(KylinConfig config, SQLResponse sqlResponse, String projectName) {
+        ProjectInstance project = ProjectManager.getInstance(config).getProject(projectName);
+        Preconditions.checkNotNull(project);
+
+        SignatureCalculator signatureCalculator;
+        try {
+            Class signatureClass = getSignatureClass(project.getConfig());
+            signatureCalculator = (SignatureCalculator) signatureClass.getConstructor().newInstance();
+        } catch (Exception e) {
+            logger.warn("Will use default signature since fail to construct signature due to " + e);
+            signatureCalculator = new FactTableRealizationSetCalculator();
+        }
+        return signatureCalculator.calculateSignature(config, sqlResponse, project);
+    }
+
+    private static Class getSignatureClass(KylinConfig config) {
+        try {
+            return Class.forName(config.getSQLResponseSignatureClass());
+        } catch (ClassNotFoundException e) {
+            logger.warn("Will use default signature since cannot find class " + config.getSQLResponseSignatureClass());
+            return FactTableRealizationSetCalculator.class;
+        }
+    }
+}
diff --git a/server-base/src/test/java/org/apache/kylin/rest/bean/BeanTest.java b/server-base/src/test/java/org/apache/kylin/rest/bean/BeanTest.java
index e1a1228c54..09191e0cdd 100644
--- a/server-base/src/test/java/org/apache/kylin/rest/bean/BeanTest.java
+++ b/server-base/src/test/java/org/apache/kylin/rest/bean/BeanTest.java
@@ -54,7 +54,7 @@ public void test() {
         } catch (IntrospectionException e) {
         }
 
-        new SQLResponse(null, null, null, 0, true, null, false, false);
+        new SQLResponse(null, null, 0, true, null);
 
         SelectedColumnMeta coulmnMeta = new SelectedColumnMeta(false, false, false, false, 0, false, 0, null, null,
                 null, null, null, 0, 0, 0, null, false, false, false);
diff --git a/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
new file mode 100644
index 0000000000..9f6d48ef9f
--- /dev/null
+++ b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.response;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.util.JsonUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class SQLResponseTest {
+
+    @Test
+    public void testInterfaceConsistency() throws IOException {
+        String[] attrArray = new String[] { "columnMetas", "results", "cube", "affectedRowCount", "isException",
+                "exceptionMessage", "duration", "partial", "totalScanCount", "totalScanBytes", "hitExceptionCache",
+                "storageCacheUsed", "pushDown", "traceUrl" };
+
+        SQLResponse sqlResponse = new SQLResponse(null, null, "learn_cube", 100, false, null, false, false);
+        String jsonStr = JsonUtil.writeValueAsString(sqlResponse);
+        System.out.println(jsonStr);
+
+        JsonNode jnode = JsonUtil.readValueAsTree(jsonStr);
+        assertEquals(jnode.size(), attrArray.length);
+        for (String attr : attrArray) {
+            Assert.assertTrue(attr + " doesn't exist", jnode.has(attr));
+        }
+    }
+}
diff --git a/server-base/src/test/java/org/apache/kylin/rest/signature/SignatureCalculatorTest.java b/server-base/src/test/java/org/apache/kylin/rest/signature/SignatureCalculatorTest.java
new file mode 100644
index 0000000000..3d3ae4a0d5
--- /dev/null
+++ b/server-base/src/test/java/org/apache/kylin/rest/signature/SignatureCalculatorTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.signature;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigExt;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.rest.response.SQLResponse;
+import org.apache.kylin.rest.signature.RealizationSignature.CubeSignature;
+import org.apache.kylin.rest.util.SQLResponseSignatureUtil;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.apache.kylin.storage.hybrid.HybridManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class SignatureCalculatorTest extends LocalFileMetadataTestCase {
+
+    private final String projectName = "default";
+    private KylinConfig config;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        this.config = getTestConfig();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testGetRealizationSignature() {
+        RealizationSignature signature1 = RealizationSetCalculator.getRealizationSignature(this.config,
+                "Test" + System.currentTimeMillis());
+        Assert.assertNull(signature1);
+
+        CubeSignature signature2 = (CubeSignature) RealizationSetCalculator.getRealizationSignature(this.config, "ssb");
+        Assert.assertEquals(RealizationStatusEnum.DISABLED, signature2.status);
+        Assert.assertNull(signature2.segmentSignatureSet);
+
+        CubeSignature signature3 = (CubeSignature) RealizationSetCalculator.getRealizationSignature(this.config,
+                "test_kylin_cube_with_slr_left_join_ready");
+        Assert.assertNotNull(signature3.segmentSignatureSet);
+    }
+
+    @Test
+    public void testRealizationSetCalculator() throws IOException {
+        KylinConfig config = KylinConfig.createKylinConfig(getTestConfig());
+        Map<String, String> overrides = Maps.newHashMap();
+        overrides.put("kylin.query.signature-class", "org.apache.kylin.rest.signature.RealizationSetCalculator");
+
+        ProjectInstance projectInstance = ProjectManager.getInstance(config).getProject(projectName);
+        projectInstance.setConfig(KylinConfigExt.createInstance(config, overrides));
+
+        HybridManager hybridManager = HybridManager.getInstance(config);
+        HybridInstance hybrid1 = hybridManager.getHybridInstance("test_kylin_hybrid_ready");
+
+        CubeManager cubeManager = CubeManager.getInstance(config);
+        CubeInstance cube1 = cubeManager.getCube("test_kylin_cube_with_slr_ready_2_segments");
+        CubeInstance cube2 = cubeManager.getCube("test_kylin_cube_without_slr_ready");
+        CubeInstance cube2Clone = cloneCubeInstance(cubeManager, cube2, cube2.getName() + "_clone");
+
+        //Related cubes:
+        // - test_kylin_cube_with_slr_ready
+        // - test_kylin_cube_with_slr_ready_2_segments
+        // - test_kylin_cube_without_slr_ready
+        String cubes = hybrid1.getCanonicalName() + "," + cube2Clone.getCanonicalName();
+
+        SQLResponse sqlResponse = new SQLResponse();
+        sqlResponse.setCube(cubes);
+
+        String signature = SQLResponseSignatureUtil.createSignature(config, sqlResponse, projectName);
+        sqlResponse.setSignature(signature);
+
+        Assert.assertTrue(SQLResponseSignatureUtil.checkSignature(config, sqlResponse, projectName));
+
+        {//Test the influence of related cubes status change
+            cube1 = cubeManager.updateCubeStatus(cube1, RealizationStatusEnum.DISABLED);
+            Assert.assertFalse(SQLResponseSignatureUtil.checkSignature(config, sqlResponse, projectName));
+
+            cube1 = cubeManager.updateCubeStatus(cube1, RealizationStatusEnum.READY);
+            Assert.assertTrue(SQLResponseSignatureUtil.checkSignature(config, sqlResponse, projectName));
+        }
+
+        {//Test the influence of segment changes
+            cube2Clone = cubeManager.updateCubeDropSegments(cube2Clone, cube2Clone.getSegments().get(0));
+            Assert.assertFalse(SQLResponseSignatureUtil.checkSignature(config, sqlResponse, projectName));
+        }
+    }
+
+    @Test
+    public void testFactTableRealizationSetCalculator() throws IOException {
+        KylinConfig config = KylinConfig.createKylinConfig(getTestConfig());
+        Map<String, String> overrides = Maps.newHashMap();
+        overrides.put("kylin.query.signature-class",
+                "org.apache.kylin.rest.signature.FactTableRealizationSetCalculator");
+
+        ProjectInstance projectInstance = ProjectManager.getInstance(config).getProject(projectName);
+        projectInstance.setConfig(KylinConfigExt.createInstance(config, overrides));
+
+        HybridManager hybridManager = HybridManager.getInstance(config);
+        HybridInstance hybrid1 = hybridManager.getHybridInstance("test_kylin_hybrid_ready");
+
+        CubeManager cubeManager = CubeManager.getInstance(config);
+        CubeInstance cube1 = cubeManager.getCube("test_kylin_cube_with_slr_ready_2_segments");
+        CubeInstance cube2 = cubeManager.getCube("test_kylin_cube_without_slr_ready");
+        CubeInstance cube2Clone = cloneCubeInstance(cubeManager, cube2, cube2.getName() + "_clone");
+        CubeInstance cube3 = cloneCubeInstance(cubeManager, cube2, cube2.getDescName());
+
+        //Related cubes:
+        // - test_kylin_cube_with_slr_ready
+        // - test_kylin_cube_with_slr_ready_2_segments
+        // - test_kylin_cube_without_slr_ready
+        String cubes = hybrid1.getCanonicalName() + "," + cube2Clone.getCanonicalName();
+
+        SQLResponse sqlResponse = new SQLResponse();
+        sqlResponse.setCube(cubes);
+
+        String signature = SQLResponseSignatureUtil.createSignature(config, sqlResponse, projectName);
+        sqlResponse.setSignature(signature);
+
+        Assert.assertTrue(SQLResponseSignatureUtil.checkSignature(config, sqlResponse, projectName));
+
+        {//Test the influence of related cubes status change
+            cube1 = cubeManager.updateCubeStatus(cube1, RealizationStatusEnum.DISABLED);
+            Assert.assertFalse(SQLResponseSignatureUtil.checkSignature(config, sqlResponse, projectName));
+
+            cube1 = cubeManager.updateCubeStatus(cube1, RealizationStatusEnum.READY);
+            Assert.assertTrue(SQLResponseSignatureUtil.checkSignature(config, sqlResponse, projectName));
+        }
+
+        {//Test the influence of cubes not in ${cubes} while share the same fact tables
+            cube3 = cubeManager.updateCubeStatus(cube3, RealizationStatusEnum.DISABLED);
+            Assert.assertFalse(SQLResponseSignatureUtil.checkSignature(config, sqlResponse, projectName));
+
+            cube3 = cubeManager.updateCubeStatus(cube3, RealizationStatusEnum.READY);
+            Assert.assertTrue(SQLResponseSignatureUtil.checkSignature(config, sqlResponse, projectName));
+        }
+
+        {//Test the influence of segment changes
+            cube2Clone = cubeManager.updateCubeDropSegments(cube2Clone, cube2Clone.getSegments().get(0));
+            Assert.assertFalse(SQLResponseSignatureUtil.checkSignature(config, sqlResponse, projectName));
+        }
+    }
+
+    private CubeInstance cloneCubeInstance(CubeManager cubeManager, CubeInstance cube, String name) throws IOException {
+        CubeInstance cubeClone = cubeManager.createCube(name, projectName, cube.getDescriptor(), cube.getOwner());
+        CubeUpdate cubeUpdate = new CubeUpdate(cubeClone.latestCopyForWrite());
+        cubeUpdate.setToAddSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
+        cubeUpdate.setStatus(RealizationStatusEnum.READY);
+        return cubeManager.updateCube(cubeUpdate);
+    }
+}
diff --git a/server-base/src/test/java/org/apache/kylin/rest/util/SQLResponseSignatureUtilTest.java b/server-base/src/test/java/org/apache/kylin/rest/util/SQLResponseSignatureUtilTest.java
new file mode 100644
index 0000000000..85a3eb79fb
--- /dev/null
+++ b/server-base/src/test/java/org/apache/kylin/rest/util/SQLResponseSignatureUtilTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.util;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.rest.response.SQLResponse;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.apache.kylin.storage.hybrid.HybridManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SQLResponseSignatureUtilTest extends LocalFileMetadataTestCase {
+
+    public static final Logger logger = LoggerFactory.getLogger(SQLResponseSignatureUtilTest.class);
+
+    private KylinConfig config;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        this.config = getTestConfig();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testCreateSignature() {
+        String projectName = "default";
+
+        HybridManager hybridManager = HybridManager.getInstance(config);
+        HybridInstance hybrid1 = hybridManager.getHybridInstance("test_kylin_hybrid_ready");
+
+        CubeManager cubeManager = CubeManager.getInstance(config);
+        CubeInstance cube1 = cubeManager.getCube("test_kylin_cube_with_slr_left_join_ready");
+        CubeInstance cube2 = cubeManager.getCube("test_kylin_cube_without_slr_ready");
+
+        String cubes = hybrid1.getCanonicalName() + "," + cube1.getCanonicalName() + "," + cube2.getCanonicalName();
+
+        SQLResponse sqlResponse = new SQLResponse();
+        sqlResponse.setCube(cubes);
+
+        String signature = SQLResponseSignatureUtil.createSignature(config, sqlResponse, projectName);
+        sqlResponse.setSignature(signature);
+
+        Assert.assertTrue(SQLResponseSignatureUtil.checkSignature(config, sqlResponse, projectName));
+    }
+}
diff --git a/server/src/main/resources/applicationContext.xml b/server/src/main/resources/applicationContext.xml
index c39ec5bfc8..17c870c1ac 100644
--- a/server/src/main/resources/applicationContext.xml
+++ b/server/src/main/resources/applicationContext.xml
@@ -16,7 +16,6 @@
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:mvc="http://www.springframework.org/schema/mvc"
-       xmlns:task="http://www.springframework.org/schema/task"
        xmlns:aop="http://www.springframework.org/schema/aop"
        xmlns:cache="http://www.springframework.org/schema/cache"
        xmlns:p="http://www.springframework.org/schema/p"
@@ -24,8 +23,9 @@
     http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
     http://www.springframework.org/schema/context
     http://www.springframework.org/schema/context/spring-context-4.3.xsd
-    http://www.springframework.org/schema/task
-    http://www.springframework.org/schema/task/spring-task-4.3.xsd
+
+
+
     http://www.springframework.org/schema/mvc
     http://www.springframework.org/schema/mvc/spring-mvc-4.3.xsd
     http://www.springframework.org/schema/aop
@@ -38,7 +38,7 @@
     <mvc:annotation-driven/>
     <aop:aspectj-autoproxy/>
 
-    <bean class="org.apache.kylin.rest.init.InitialTaskManager" />
+    <bean class="org.apache.kylin.rest.init.InitialTaskManager"/>
 
     <context:component-scan base-package="org.apache.kylin.rest"/>
 
@@ -88,17 +88,22 @@
     <!-- Cache Config -->
     <cache:annotation-driven/>
 
-    <bean id="cacheManager" class="org.springframework.cache.ehcache.EhCacheCacheManager"
-          p:cacheManager-ref="ehcache"/>
+
     <beans profile="ldap,saml">
         <bean id="ehcache"
               class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean"
               p:configLocation="classpath:ehcache.xml" p:shared="true"/>
+
+        <bean id="cacheManager" class="org.springframework.cache.ehcache.EhCacheCacheManager"
+              p:cacheManager-ref="ehcache"/>
     </beans>
+
     <beans profile="testing">
         <bean id="ehcache"
               class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean"
               p:configLocation="classpath:ehcache-test.xml" p:shared="true"/>
+        <bean id="cacheManager" class="org.springframework.cache.ehcache.EhCacheCacheManager"
+              p:cacheManager-ref="ehcache"/>
     </beans>
 
 </beans>
\ No newline at end of file
diff --git a/server/src/main/resources/ehcache-test.xml b/server/src/main/resources/ehcache-test.xml
index 5bd4d13940..b6d3d9677b 100644
--- a/server/src/main/resources/ehcache-test.xml
+++ b/server/src/main/resources/ehcache-test.xml
@@ -12,19 +12,26 @@
   limitations under the License. See accompanying LICENSE file.
 -->
 
-<ehcache maxBytesLocalHeap="256M">>
+<ehcache maxBytesLocalHeap="256M">
     <cache name="StorageCache"
            eternal="false"
            timeToIdleSeconds="86400"
            memoryStoreEvictionPolicy="LRU"
-            >
+    >
         <persistence strategy="none"/>
     </cache>
-    <cache name="ExceptionQueryCache"
+    <cache name="UserCache"
            eternal="false"
-           timeToIdleSeconds="86400"
+           timeToLiveSeconds="10800"
+           memoryStoreEvictionPolicy="LRU"
+    >
+        <persistence strategy="none"/>
+    </cache>
+    <cache name="AclCache"
+           eternal="false"
+           timeToLiveSeconds="86400"
            memoryStoreEvictionPolicy="LRU"
-            >
+    >
         <persistence strategy="none"/>
     </cache>
 </ehcache>
diff --git a/server/src/main/resources/ehcache.xml b/server/src/main/resources/ehcache.xml
index c9efc13134..d8fab1cb2f 100644
--- a/server/src/main/resources/ehcache.xml
+++ b/server/src/main/resources/ehcache.xml
@@ -17,14 +17,21 @@
            eternal="false"
            timeToIdleSeconds="86400"
            memoryStoreEvictionPolicy="LRU"
-            >
+    >
         <persistence strategy="none"/>
     </cache>
-    <cache name="ExceptionQueryCache"
+    <cache name="UserCache"
            eternal="false"
            timeToIdleSeconds="86400"
            memoryStoreEvictionPolicy="LRU"
-            >
+    >
+        <persistence strategy="none"/>
+    </cache>
+    <cache name="AclCache"
+           eternal="false"
+           timeToIdleSeconds="86400"
+           memoryStoreEvictionPolicy="LRU"
+    >
         <persistence strategy="none"/>
     </cache>
 </ehcache>
diff --git a/server/src/test/java/org/apache/kylin/rest/controller/QueryControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/QueryControllerTest.java
index 7c5f253a2f..2225096da5 100644
--- a/server/src/test/java/org/apache/kylin/rest/controller/QueryControllerTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/controller/QueryControllerTest.java
@@ -31,7 +31,7 @@
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 
-import net.sf.ehcache.CacheManager;
+import org.springframework.cache.CacheManager;
 
 /**
  * @author xduo
diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml
index 15791d5d82..401fae3bd8 100644
--- a/storage-hbase/pom.xml
+++ b/storage-hbase/pom.xml
@@ -43,6 +43,10 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-engine-mr</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-cache</artifactId>
+        </dependency>
 
         <!-- Env & Test -->
         <dependency>
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 16b8db2d08..b980cd800c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -18,14 +18,10 @@
 
 package org.apache.kylin.storage.hbase.cube.v2;
 
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.zip.DataFormatException;
-
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.HBaseZeroCopyByteString;
+import org.apache.commons.lang3.SerializationUtils;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
@@ -36,8 +32,11 @@
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContext.CubeSegmentStatistics;
+import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.exceptions.KylinTimeoutException;
 import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
+import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
@@ -47,7 +46,10 @@
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRange;
 import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTUtil;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.storage.StorageContext;
@@ -62,9 +64,13 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.HBaseZeroCopyByteString;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.zip.DataFormatException;
 
 public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
@@ -90,14 +96,18 @@ public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo,
 
         if (shardNum == totalShards) {
             //all shards
-            return Lists.newArrayList(Pair.newPair(getByteArrayForShort((short) 0), getByteArrayForShort((short) (shardNum - 1))));
+            return Lists.newArrayList(Pair.newPair(getByteArrayForShort((short) 0),
+                    getByteArrayForShort((short) (shardNum - 1))));
         } else if (baseShard + shardNum <= totalShards) {
             //endpoint end key is inclusive, so no need to append 0 or anything
-            return Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard), getByteArrayForShort((short) (baseShard + shardNum - 1))));
+            return Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard),
+                    getByteArrayForShort((short) (baseShard + shardNum - 1))));
         } else {
             //0,1,2,3,4 wants 4,0
-            return Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard), getByteArrayForShort((short) (totalShards - 1))), //
-                    Pair.newPair(getByteArrayForShort((short) 0), getByteArrayForShort((short) (baseShard + shardNum - totalShards - 1))));
+            return Lists.newArrayList(
+                    Pair.newPair(getByteArrayForShort(baseShard), getByteArrayForShort((short) (totalShards - 1))), //
+                    Pair.newPair(getByteArrayForShort((short) 0), getByteArrayForShort((short) (baseShard + shardNum
+                            - totalShards - 1))));
         }
     }
 
@@ -106,6 +116,7 @@ public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo,
     }
 
     static Field channelRowField = null;
+
     static {
         try {
             channelRowField = RegionCoprocessorRpcChannel.class.getDeclaredField("row");
@@ -123,7 +134,6 @@ public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOExcepti
         short cuboidBaseShard = shardNumAndBaseShard.getSecond();
         int totalShards = cubeSeg.getTotalShards(cuboid.getId());
 
-        ByteString scanRequestByteString = null;
         ByteString rawScanByteString = null;
 
         // primary key (also the 0th column block) is always selected
@@ -145,23 +155,66 @@ public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOExcepti
         long coprocessorTimeout = getCoprocessorTimeoutMillis();
         scanRequest.setTimeout(coprocessorTimeout);
         scanRequest.clearScanRanges();//since raw scans are sent to coprocessor, we don't need to duplicate sending it
-        scanRequestByteString = serializeGTScanReq(scanRequest);
+        final ByteString scanRequestByteString = serializeGTScanReq(scanRequest);
 
         final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(queryContext, shardNum, coprocessorTimeout);
 
-        logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size());
+        logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(),
+                rawScanByteString.size());
 
-        logger.info("The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg, rawScans.size());
+        logger.info(
+                "The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0",
+                Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg, rawScans.size());
         for (RawScan rs : rawScans) {
             logScan(rs, cubeSeg.getStorageLocationIdentifier());
         }
-
-        logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", shardNum, cuboidBaseShard, rawScans.size());
-
         // KylinConfig: use env instance instead of CubeSegment, because KylinConfig will share among queries
         // for different cubes until redeployment of coprocessor jar.
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         final boolean compressionResult = kylinConfig.getCompressionResult();
+
+        final boolean querySegmentCacheEnabled = isSegmentLevelCacheEnabled();
+        final SegmentQueryResult.Builder segmentQueryResultBuilder = new SegmentQueryResult.Builder(shardNum, cubeSeg
+                .getConfig().getQuerySegmentCacheMaxMB() * 1024 * 1024);
+        String calculatedSegmentQueryCacheKey = null;
+        if (querySegmentCacheEnabled) {
+            try {
+                logger.info("Query-{}: try to get segment result from cache for segment:{}", queryContext.getQueryId(),
+                        cubeSeg);
+                calculatedSegmentQueryCacheKey = getSegmentQueryCacheKey(scanRequest);
+                long startTime = System.currentTimeMillis();
+                SegmentQueryResult segmentResult = SegmentQueryCache.getInstance().get(calculatedSegmentQueryCacheKey);
+                long spendTime = System.currentTimeMillis() - startTime;
+                if (segmentResult == null) {
+                    logger.info("Query-{}: no segment result is cached for segment:{}, take time:{}ms",
+                            queryContext.getQueryId(), cubeSeg, spendTime);
+                } else {
+                    logger.info("Query-{}: get segment result from cache for segment:{}, take time:{}ms",
+                            queryContext.getQueryId(), cubeSeg, spendTime);
+                    if (segmentResult.getCubeSegmentStatisticsBytes() != null) {
+                        queryContext.addCubeSegmentStatistics(storageContext.ctxId,
+                                (CubeSegmentStatistics) SerializationUtils.deserialize(segmentResult
+                                        .getCubeSegmentStatisticsBytes()));
+                    }
+                    for (byte[] regionResult : segmentResult.getRegionResults()) {
+                        if (compressionResult) {
+                            epResultItr.append(CompressionUtils.decompress(regionResult));
+                        } else {
+                            epResultItr.append(regionResult);
+                        }
+                    }
+                    return new StorageResponseGTScatter(scanRequest, new DummyPartitionStreamer(epResultItr),
+                            storageContext);
+                }
+            } catch (Exception e) {
+                logger.error("Fail to handle cached segment result from cache", e);
+            }
+        }
+        final String segmentQueryCacheKey = calculatedSegmentQueryCacheKey;
+
+        logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", shardNum,
+                cuboidBaseShard, rawScans.size());
+
         final CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
         builder.setGtScanRequest(scanRequestByteString).setHbaseRawScan(rawScanByteString);
         for (IntList intList : hbaseColumnsToGTIntList) {
@@ -184,7 +237,8 @@ public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOExcepti
                 @Override
                 public void run() {
                     runEPRange(queryContext, logHeader, compressionResult, builder.build(), conn, epRange.getFirst(),
-                            epRange.getSecond(), epResultItr);
+                            epRange.getSecond(), epResultItr, querySegmentCacheEnabled, segmentQueryResultBuilder,
+                            segmentQueryCacheKey);
                 }
             });
         }
@@ -193,8 +247,9 @@ public void run() {
     }
 
     private void runEPRange(final QueryContext queryContext, final String logHeader, final boolean compressionResult,
-            final CubeVisitProtos.CubeVisitRequest request, final Connection conn, byte[] startKey, byte[] endKey,
-            final ExpectedSizeIterator epResultItr) {
+                            final CubeVisitProtos.CubeVisitRequest request, final Connection conn, byte[] startKey, byte[] endKey,
+                            final ExpectedSizeIterator epResultItr, final boolean querySegmentCacheEnabled,
+                            final SegmentQueryResult.Builder segmentQueryResultBuilder, final String segmentQueryCacheKey) {
 
         final String queryId = queryContext.getQueryId();
 
@@ -294,14 +349,21 @@ public void update(byte[] region, byte[] row, CubeVisitResponse result) {
                                 // record coprocessor error if happened
                                 rpcException = getCoprocessorException(result);
                             }
-                            queryContext.addRPCStatistics(storageContext.ctxId, stats.getHostname(),
-                                    cubeSeg.getCubeDesc().getName(), cubeSeg.getName(), cuboid.getInputID(),
-                                    cuboid.getId(), storageContext.getFilterMask(), rpcException,
-                                    stats.getServiceEndTime() - stats.getServiceStartTime(), 0,
+                            queryContext.addRPCStatistics(
+                                    storageContext.ctxId,
+                                    stats.getHostname(),
+                                    cubeSeg.getCubeDesc().getName(),
+                                    cubeSeg.getName(),
+                                    cuboid.getInputID(),
+                                    cuboid.getId(),
+                                    storageContext.getFilterMask(),
+                                    rpcException,
+                                    stats.getServiceEndTime() - stats.getServiceStartTime(),
+                                    0,
                                     stats.getScannedRowCount(),
                                     stats.getScannedRowCount() - stats.getAggregatedRowCount()
-                                            - stats.getFilteredRowCount(),
-                                    stats.getAggregatedRowCount(), stats.getScannedBytes());
+                                            - stats.getFilteredRowCount(), stats.getAggregatedRowCount(), stats
+                                            .getScannedBytes());
 
                             if (queryContext.getScannedBytes() > cubeSeg.getConfig().getQueryMaxScanBytes()) {
                                 rpcException = new ResourceLimitExceededException(
@@ -319,12 +381,38 @@ public void update(byte[] region, byte[] row, CubeVisitResponse result) {
                             }
 
                             try {
+                                byte[] rawData = HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows());
                                 if (compressionResult) {
-                                    epResultItr.append(CompressionUtils.decompress(
-                                            HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows())));
+                                    epResultItr.append(CompressionUtils.decompress(rawData));
                                 } else {
-                                    epResultItr.append(
-                                            HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()));
+                                    epResultItr.append(rawData);
+                                }
+                                // put segment query result to cache if cache is enabled
+                                if (querySegmentCacheEnabled) {
+                                    try {
+                                        segmentQueryResultBuilder.putRegionResult(rawData);
+                                        if (segmentQueryResultBuilder.isComplete()) {
+                                            CubeSegmentStatistics cubeSegmentStatistics = queryContext
+                                                    .getCubeSegmentStatistics(storageContext.ctxId, cubeSeg
+                                                            .getCubeInstance().getName(), cubeSeg.getName());
+                                            if (cubeSegmentStatistics != null) {
+                                                segmentQueryResultBuilder
+                                                        .setCubeSegmentStatistics(cubeSegmentStatistics);
+                                                logger.info(
+                                                        "Query-{}: try to put segment query result to cache for segment:{}",
+                                                        queryContext.getQueryId(), cubeSeg);
+                                                SegmentQueryResult segmentQueryResult = segmentQueryResultBuilder
+                                                        .build();
+                                                SegmentQueryCache.getInstance().put(segmentQueryCacheKey,
+                                                        segmentQueryResult);
+                                                logger.info(
+                                                        "Query-{}: successfully put segment query result to cache for segment:{}",
+                                                        queryContext.getQueryId(), cubeSeg);
+                                            }
+                                        }
+                                    } catch (Throwable t) {
+                                        logger.error("Fail to put query segment result to cache", t);
+                                    }
                                 }
                             } catch (IOException | DataFormatException e) {
                                 throw new RuntimeException(logHeader + "Error when decompressing", e);
@@ -370,7 +458,8 @@ private ByteString serializeRawScans(List<RawScan> rawScans) {
                     RawScan.serializer.serialize(rs, rawScanBuffer);
                 }
                 rawScanBuffer.flip();
-                rawScanByteString = HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit());
+                rawScanByteString = HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(),
+                        rawScanBuffer.limit());
                 break;
             } catch (BufferOverflowException boe) {
                 logger.info("Buffer size {} cannot hold the raw scans, resizing to 4 times", rawScanBufferSize);
@@ -385,13 +474,18 @@ private String getStatsString(byte[] region, CubeVisitResponse result) {
         Stats stats = result.getStats();
         byte[] compressedRows = HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows());
 
-        sb.append("Endpoint RPC returned from HTable ").append(cubeSeg.getStorageLocationIdentifier()).append(" Shard ").append(BytesUtil.toHex(region)).append(" on host: ").append(stats.getHostname()).append(".");
+        sb.append("Endpoint RPC returned from HTable ").append(cubeSeg.getStorageLocationIdentifier())
+                .append(" Shard ").append(BytesUtil.toHex(region)).append(" on host: ").append(stats.getHostname())
+                .append(".");
         sb.append("Total scanned row: ").append(stats.getScannedRowCount()).append(". ");
         sb.append("Total scanned bytes: ").append(stats.getScannedBytes()).append(". ");
         sb.append("Total filtered row: ").append(stats.getFilteredRowCount()).append(". ");
         sb.append("Total aggred row: ").append(stats.getAggregatedRowCount()).append(". ");
-        sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - stats.getServiceStartTime()).append("(ms). ");
-        sb.append("Server CPU usage: ").append(stats.getSystemCpuLoad()).append(", server physical mem left: ").append(stats.getFreePhysicalMemorySize()).append(", server swap mem left:").append(stats.getFreeSwapSpaceSize()).append(".");
+        sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - stats.getServiceStartTime())
+                .append("(ms). ");
+        sb.append("Server CPU usage: ").append(stats.getSystemCpuLoad()).append(", server physical mem left: ")
+                .append(stats.getFreePhysicalMemorySize()).append(", server swap mem left:")
+                .append(stats.getFreeSwapSpaceSize()).append(".");
         sb.append("Etc message: ").append(stats.getEtcMsg()).append(".");
         sb.append("Normal Complete: ").append(stats.getNormalComplete() == 1).append(".");
         sb.append("Compressed row size: ").append(compressedRows.length);
@@ -401,20 +495,87 @@ private String getStatsString(byte[] region, CubeVisitResponse result) {
 
     private RuntimeException getCoprocessorException(CubeVisitResponse response) {
         if (!response.hasErrorInfo()) {
-            return new RuntimeException("Coprocessor aborts due to scan timeout or other reasons, please re-deploy coprocessor to see concrete error message");
+            return new RuntimeException(
+                    "Coprocessor aborts due to scan timeout or other reasons, please re-deploy coprocessor to see concrete error message");
         }
 
         CubeVisitResponse.ErrorInfo errorInfo = response.getErrorInfo();
 
         switch (errorInfo.getType()) {
-        case UNKNOWN_TYPE:
-            return new RuntimeException("Coprocessor aborts: " + errorInfo.getMessage());
-        case TIMEOUT:
-            return new KylinTimeoutException(errorInfo.getMessage());
-        case RESOURCE_LIMIT_EXCEEDED:
-            return new ResourceLimitExceededException("Coprocessor resource limit exceeded: " + errorInfo.getMessage());
-        default:
-            throw new AssertionError("Unknown error type: " + errorInfo.getType());
+            case UNKNOWN_TYPE:
+                return new RuntimeException("Coprocessor aborts: " + errorInfo.getMessage());
+            case TIMEOUT:
+                return new KylinTimeoutException(errorInfo.getMessage());
+            case RESOURCE_LIMIT_EXCEEDED:
+                return new ResourceLimitExceededException("Coprocessor resource limit exceeded: " + errorInfo.getMessage());
+            default:
+                throw new AssertionError("Unknown error type: " + errorInfo.getType());
         }
     }
+
+    private boolean isSegmentLevelCacheEnabled() {
+        if (BackdoorToggles.getDisableSegmentCache()) {
+            return false;
+        }
+        if (!cubeSeg.getConfig().isQuerySegmentCacheEnabled()) {
+            return false;
+        }
+        if (KylinConfig.getInstanceFromEnv().getMemCachedHosts() == null) {
+            return false;
+        }
+        return true;
+    }
+
+    private String getSegmentQueryCacheKey(GTScanRequest scanRequest) {
+        String scanReqStr = getScanRequestString(scanRequest);
+        return cubeSeg.getCubeInstance().getName() + "_" + cubeSeg.getUuid() + "_"
+                + scanReqStr;
+    }
+
+    private String getScanRequestString(GTScanRequest scanRequest) {
+        int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
+        while (true) {
+            try {
+                ByteBuffer out = ByteBuffer.allocate(scanRequestBufferSize);
+                GTInfo.serializer.serialize(scanRequest.getInfo(), out);
+
+                BytesUtil.writeVInt(scanRequest.getGTScanRanges().size(), out);
+                for (GTScanRange range : scanRequest.getGTScanRanges()) {
+                    serializeGTRecord(range.pkStart, out);
+                    serializeGTRecord(range.pkEnd, out);
+                    BytesUtil.writeVInt(range.fuzzyKeys.size(), out);
+                    for (GTRecord f : range.fuzzyKeys) {
+                        serializeGTRecord(f, out);
+                    }
+                }
+
+                ImmutableBitSet.serializer.serialize(scanRequest.getColumns(), out);
+                BytesUtil.writeByteArray(
+                        GTUtil.serializeGTFilter(scanRequest.getFilterPushDown(), scanRequest.getInfo()), out);
+
+                ImmutableBitSet.serializer.serialize(scanRequest.getAggrGroupBy(), out);
+                ImmutableBitSet.serializer.serialize(scanRequest.getAggrMetrics(), out);
+                BytesUtil.writeAsciiStringArray(scanRequest.getAggrMetricsFuncs(), out);
+                BytesUtil.writeVInt(scanRequest.isAllowStorageAggregation() ? 1 : 0, out);
+                BytesUtil.writeUTFString(scanRequest.getStorageLimitLevel().name(), out);
+                BytesUtil.writeVInt(scanRequest.getStorageScanRowNumThreshold(), out);
+                BytesUtil.writeVInt(scanRequest.getStoragePushDownLimit(), out);
+                BytesUtil.writeUTFString(scanRequest.getStorageBehavior(), out);
+                out.flip();
+                return Bytes.toStringBinary(out.array(), out.position(), out.limit());
+            } catch (BufferOverflowException boe) {
+                logger.info("Buffer size {} cannot hold the scan request, resizing to 4 times", scanRequestBufferSize);
+                scanRequestBufferSize *= 4;
+            }
+        }
+    }
+
+    private void serializeGTRecord(GTRecord gtRecord, ByteBuffer out) {
+        ByteArray[] cols = gtRecord.getInternal();
+        BytesUtil.writeVInt(cols.length, out);
+        for (ByteArray col : cols) {
+            col.exportData(out);
+        }
+    }
+
 }
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryCache.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryCache.java
new file mode 100755
index 0000000000..bea43876bd
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryCache.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.cache.memcached.CacheStats;
+import org.apache.kylin.cache.memcached.MemcachedCache;
+import org.apache.kylin.cache.memcached.MemcachedCacheConfig;
+import org.apache.kylin.cache.memcached.MemcachedChunkingCache;
+import org.apache.kylin.common.KylinConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SegmentQueryCache {
+    public static final Logger logger = LoggerFactory.getLogger(SegmentQueryCache.class);
+    private static final String SEG_QUERY_CACHE_NAME = "query_segment_cache";
+    private static SegmentQueryCache segmentQueryCacheInstance = new SegmentQueryCache();
+
+    private MemcachedChunkingCache memcachedCache;
+
+    public static SegmentQueryCache getInstance() {
+        return segmentQueryCacheInstance;
+    }
+
+    private SegmentQueryCache() {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        MemcachedCacheConfig memcachedCacheConfig = new MemcachedCacheConfig();
+        String configHosts = kylinConfig.getMemCachedHosts();
+        memcachedCacheConfig.setTimeout(kylinConfig.getQuerySegmentCacheTimeoutMilliSeconds());
+        // set max object size a little less than 1024 * 1024, because the key of the segment result cache is long
+        // if set to 1024 * 1024 will cause memcached client exceed max size error
+        memcachedCacheConfig.setMaxObjectSize(1040000);
+        memcachedCacheConfig.setHosts(configHosts);
+        //Reverse the compression setting between Hbase coprocessor and memcached, if Hbase result is compressed, memcached will not compress.
+        memcachedCacheConfig.setEnableCompression(!kylinConfig.getCompressionResult());
+        String cacheName = SEG_QUERY_CACHE_NAME;
+        memcachedCache = new MemcachedChunkingCache(MemcachedCache.create(memcachedCacheConfig, cacheName));
+    }
+
+    public void put(String key, SegmentQueryResult segmentQueryResult) {
+        memcachedCache.put(key, segmentQueryResult);
+    }
+
+    public SegmentQueryResult get(String key) {
+        byte[] value = memcachedCache.get(key);
+        if (value == null) {
+            return null;
+        }
+        return (SegmentQueryResult) (SerializationUtils.deserialize(value));
+    }
+
+    public CacheStats getCacheStats() {
+        return memcachedCache.getStats();
+    }
+
+    /**
+     * evict the segment cache by query key
+     *
+     * @param segmentQueryKey
+     */
+    public void evict(String segmentQueryKey) {
+        memcachedCache.evict(segmentQueryKey);
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryResult.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryResult.java
new file mode 100755
index 0000000000..e208c02ef1
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryResult.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import com.google.common.collect.Queues;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.common.QueryContext.CubeSegmentStatistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * query result for each segment
+ */
+public class SegmentQueryResult implements Serializable {
+    private static final long serialVersionUID = 9047493994209284453L;
+
+    private Collection<byte[]> regionResults;
+
+    // store segment query stats for cube planer usage
+    private byte[] cubeSegmentStatisticsBytes;
+
+    public void setRegionResults(Collection<byte[]> regionResults) {
+        this.regionResults = regionResults;
+    }
+
+    public Collection<byte[]> getRegionResults() {
+        return regionResults;
+    }
+
+    public byte[] getCubeSegmentStatisticsBytes() {
+        return cubeSegmentStatisticsBytes;
+    }
+
+    public void setCubeSegmentStatisticsBytes(byte[] cubeSegmentStatisticsBytes) {
+        this.cubeSegmentStatisticsBytes = cubeSegmentStatisticsBytes;
+    }
+
+    public static class Builder {
+        private static final Logger logger = LoggerFactory.getLogger(Builder.class);
+
+        private volatile int regionsNum;
+        private ConcurrentLinkedQueue<byte[]> queue;
+        private AtomicInteger totalResultSize;
+        private volatile int maxSegmentCacheSize;
+        private byte[] cubeSegmentStatisticsBytes;
+
+        public Builder(int regionsNum, int maxCacheResultSize) {
+            this.regionsNum = regionsNum;
+            this.queue = Queues.newConcurrentLinkedQueue();
+            this.totalResultSize = new AtomicInteger();
+            this.maxSegmentCacheSize = maxCacheResultSize;
+        }
+
+        public void putRegionResult(byte[] result) {
+            totalResultSize.addAndGet(result.length);
+            if (totalResultSize.get() > maxSegmentCacheSize) {
+                logger.info("stop put result to cache, since the result size:{} is larger than configured size:{}",
+                        totalResultSize.get(), maxSegmentCacheSize);
+                return;
+            }
+            queue.offer(result);
+        }
+
+        public void setCubeSegmentStatistics(CubeSegmentStatistics cubeSegmentStatistics) {
+            this.cubeSegmentStatisticsBytes = (cubeSegmentStatistics == null ? null : SerializationUtils
+                    .serialize(cubeSegmentStatistics));
+        }
+
+        public boolean isComplete() {
+            return queue.size() == regionsNum;
+        }
+
+        public SegmentQueryResult build() {
+            SegmentQueryResult result = new SegmentQueryResult();
+            result.setCubeSegmentStatisticsBytes(cubeSegmentStatisticsBytes);
+            result.setRegionResults(queue);
+            return result;
+        }
+    }
+}
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/SegmentQueryResultTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/SegmentQueryResultTest.java
new file mode 100644
index 0000000000..a944c8bfc7
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/SegmentQueryResultTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.common.QueryContext.CubeSegmentStatistics;
+import org.apache.kylin.storage.hbase.cube.v2.SegmentQueryResult;
+import org.apache.kylin.storage.hbase.cube.v2.SegmentQueryResult.Builder;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.TestCase.assertFalse;
+
+public class SegmentQueryResultTest {
+    private static final Logger logger = LoggerFactory.getLogger(SegmentQueryResultTest.class);
+
+    @Test
+    public void buildTest() {
+        int maxCacheResultSize = 10 * 1024;
+        ExecutorService rpcExecutor = Executors.newFixedThreadPool(4);
+        SegmentQueryResult.Builder builder = new Builder(8, maxCacheResultSize);
+        mockSendRPCTasks(rpcExecutor, 4, builder, 1024);
+        assertFalse(builder.isComplete());
+        mockSendRPCTasks(rpcExecutor, 4, builder, 1024);
+        assertTrue(builder.isComplete());
+
+        builder = new Builder(8, maxCacheResultSize);
+        mockSendRPCTasks(rpcExecutor, 8, builder, 1500);
+        assertFalse(builder.isComplete());
+    }
+
+    @Test
+    public void resultValidateTest() {
+        long segmentBuildTime = System.currentTimeMillis() - 1000;
+        int maxCacheResultSize = 10 * 1024;
+        ExecutorService rpcExecutor = Executors.newFixedThreadPool(4);
+        SegmentQueryResult.Builder builder = new Builder(8, maxCacheResultSize);
+        mockSendRPCTasks(rpcExecutor, 8, builder, 1024);
+        CubeSegmentStatistics statistics = new CubeSegmentStatistics();
+        statistics.setWrapper("cube1", "20171001000000-20171010000000", 3, 7, 1);
+        builder.setCubeSegmentStatistics(statistics);
+        SegmentQueryResult segmentQueryResult = builder.build();
+
+        CubeSegmentStatistics desStatistics = SerializationUtils.deserialize(segmentQueryResult
+                .getCubeSegmentStatisticsBytes());
+        assertEquals("cube1", desStatistics.getCubeName());
+    }
+
+    private void mockSendRPCTasks(ExecutorService rpcExecutor, int rpcNum, SegmentQueryResult.Builder builder,
+                                  int resultSize) {
+        List<Future> futures = Lists.newArrayList();
+        for (int i = 0; i < rpcNum; i++) {
+            Future future = rpcExecutor.submit(new MockRPCTask(resultSize, 10, builder));
+            futures.add(future);
+        }
+        for (Future future : futures) {
+            try {
+                future.get();
+            } catch (Exception e) {
+                logger.error("exception", e);
+            }
+        }
+    }
+
+    private static class MockRPCTask implements Runnable {
+        private int resultSize;
+        private long takeTime;
+        private SegmentQueryResult.Builder builder;
+
+        MockRPCTask(int resultSize, long takeTime, SegmentQueryResult.Builder builder) {
+            this.resultSize = resultSize;
+            this.takeTime = takeTime;
+            this.builder = builder;
+        }
+
+        @Override
+        public void run() {
+            try {
+                Thread.sleep(takeTime);
+            } catch (InterruptedException e) {
+                logger.error("interrupt", e);
+            }
+            builder.putRegionResult(new byte[resultSize]);
+        }
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message