kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liy...@apache.org
Subject [2/2] incubator-kylin git commit: KYLIN-972 half way
Date Sun, 30 Aug 2015 22:35:28 GMT
KYLIN-972 half way


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/9e2f94c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/9e2f94c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/9e2f94c1

Branch: refs/heads/KYLIN-972
Commit: 9e2f94c1e591dba7515e6f98020c69ac5721e93a
Parents: bca588b
Author: Li, Yang <yangli9@ebay.com>
Authored: Sat Aug 29 07:57:09 2015 +0800
Committer: Yang Li <liyang@apache.org>
Committed: Mon Aug 31 06:33:43 2015 +0800

----------------------------------------------------------------------
 .../persistence/RootPersistentEntity.java       |   1 +
 .../org/apache/kylin/common/util/BytesUtil.java |   2 +-
 .../kylin/common/util/CompressionUtils.java     |  17 +++
 .../kylin/common/util/DaemonThreadFactory.java  |  17 +++
 .../apache/kylin/common/util/DateFormat.java    |  17 +++
 .../apache/kylin/common/util/FIFOIterable.java  |  17 +++
 .../apache/kylin/common/util/FIFOIterator.java  |  17 +++
 .../apache/kylin/common/util/IdentityUtils.java |  17 +++
 .../kylin/common/util/ImmutableBitSet.java      |  17 +++
 .../kylin/common/util/ImplementationSwitch.java |  70 +++++++++
 .../common/util/ImplementationSwitchTest.java   |  58 ++++++++
 .../org/apache/kylin/cube/CubeInstance.java     |  27 +++-
 .../java/org/apache/kylin/cube/CubeManager.java |   4 +-
 .../java/org/apache/kylin/cube/CubeSegment.java |   7 +-
 .../org/apache/kylin/cube/model/CubeDesc.java   |  41 ++---
 .../apache/kylin/dict/DictionaryManager.java    |   4 +-
 .../apache/kylin/dict/lookup/SnapshotCLI.java   |   4 +-
 .../apache/kylin/engine/BuildEngineFactory.java |  52 -------
 .../org/apache/kylin/engine/EngineFactory.java  |  66 ++++++++
 .../kylin/metadata/model/DataModelDesc.java     |  21 ++-
 .../apache/kylin/metadata/model/IBuildable.java |   7 +-
 .../kylin/metadata/model/IEngineAware.java      |  28 ++++
 .../kylin/metadata/model/ISourceAware.java      |  27 ++++
 .../kylin/metadata/model/IStorageAware.java     |  27 ++++
 .../apache/kylin/metadata/model/TableDesc.java  |   9 +-
 .../metadata/realization/IRealization.java      |   3 +-
 .../java/org/apache/kylin/source/ISource.java   |  28 ++++
 .../org/apache/kylin/source/ITableSource.java   |  28 ----
 .../org/apache/kylin/source/SourceFactory.java  |  50 +++++++
 .../apache/kylin/source/TableSourceFactory.java |  40 -----
 .../java/org/apache/kylin/storage/IStorage.java |   2 +-
 .../apache/kylin/storage/StorageFactory.java    |  32 +++-
 .../kylin/storage/StorageQueryFactory.java      |  97 ------------
 .../cache/AbstractCacheFledgedQuery.java        |  84 +++++++++++
 .../AbstractCacheFledgedStorageEngine.java      |  84 -----------
 .../storage/cache/CacheFledgedDynamicQuery.java | 149 +++++++++++++++++++
 .../cache/CacheFledgedDynamicStorageEngine.java | 149 -------------------
 .../storage/cache/CacheFledgedStaticQuery.java  |  88 +++++++++++
 .../cache/CacheFledgedStaticStorageEngine.java  |  88 -----------
 .../exception/ScanOutOfLimitException.java      |   2 +-
 .../kylin/storage/hybrid/HybridInstance.java    |  24 ++-
 .../kylin/storage/hybrid/HybridManager.java     |  17 +++
 .../kylin/storage/hybrid/HybridStorage.java     |  36 +++++
 .../storage/hybrid/HybridStorageEngine.java     |  44 ------
 .../storage/hybrid/HybridStorageQuery.java      |  61 ++++++++
 .../kylin/storage/cache/DynamicCacheTest.java   |   2 +-
 .../kylin/storage/cache/StaticCacheTest.java    |   2 +-
 .../java/org/apache/kylin/engine/mr/MRUtil.java |   6 +-
 .../apache/kylin/invertedindex/IIInstance.java  |   7 +
 .../job/hadoop/cube/NewBaseCuboidMapper.java    |   6 +-
 .../kylin/job/BuildCubeWithEngineTest.java      |   4 +-
 .../java/org/apache/kylin/job/DeployUtil.java   |   5 +-
 .../source/hive/ITSnapshotManagerTest.java      |   4 +-
 .../kylin/query/enumerator/OLAPEnumerator.java  |   4 +-
 .../AdjustForWeaklyMatchedRealization.java      |   6 +-
 .../kylin/rest/controller/QueryController.java  |   4 +-
 .../apache/kylin/rest/service/CubeService.java  |   4 +-
 .../apache/kylin/rest/service/JobService.java   |   8 +-
 .../kylin/source/hive/HiveTableSource.java      |   4 +-
 .../kylin/storage/hbase/HBaseStorage.java       |  70 ++++++++-
 .../storage/hbase/common/ITStorageTest.java     |   8 +-
 61 files changed, 1140 insertions(+), 684 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java b/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
index 0cbf9c2..bc72c1e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/RootPersistentEntity.java
@@ -40,6 +40,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
  * 
  * @author yangli9
  */
+@SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
 abstract public class RootPersistentEntity implements AclEntity, Serializable {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
index 0503ad6..0880da1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.common.util;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java b/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
index 3ed279a..13abab5 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/CompressionUtils.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.kylin.common.util;
 
 import java.io.ByteArrayOutputStream;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java b/core-common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java
index bc4502c..56f4a36 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/DaemonThreadFactory.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.kylin.common.util;
 
 import java.util.concurrent.Executors;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java b/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
index f74debd..f46edae 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.kylin.common.util;
 
 import java.text.ParseException;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java b/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java
index 4c4bc6b..7204e33 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterable.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.kylin.common.util;
 
 import java.util.Iterator;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java b/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java
index f734143..ccea37c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/FIFOIterator.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.kylin.common.util;
 
 import java.util.Iterator;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-common/src/main/java/org/apache/kylin/common/util/IdentityUtils.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/IdentityUtils.java b/core-common/src/main/java/org/apache/kylin/common/util/IdentityUtils.java
index d873959..35ade60 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/IdentityUtils.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/IdentityUtils.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.kylin.common.util;
 
 import java.util.Collection;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java b/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
index 2ee7d4f..f5a22d2 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.kylin.common.util;
 
 import java.util.BitSet;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java b/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
new file mode 100644
index 0000000..3101c81
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ImplementationSwitch.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.common.util;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provide switch between different implementations of a same interface.
+ * Each implementation is identified by an integer ID.
+ */
+public class ImplementationSwitch {
+
+    private static final Logger logger = LoggerFactory.getLogger(ImplementationSwitch.class);
+
+    final private Object[] instances;
+
+    public ImplementationSwitch(Map<Integer, String> impls) {
+        instances = initInstances(impls);
+    }
+
+    private Object[] initInstances(Map<Integer, String> impls) {
+        int maxId = 0;
+        for (Integer id : impls.keySet()) {
+            maxId = Math.max(maxId, id);
+        }
+        if (maxId > 100)
+            throw new IllegalArgumentException("you have more than 100 implentations?");
+
+        Object[] result = new Object[maxId + 1];
+
+        for (Integer id : impls.keySet()) {
+            String clzName = impls.get(id);
+            try {
+                result[id] = ClassUtil.newInstance(clzName);
+            } catch (Exception ex) {
+                logger.warn("Implementation missing " + clzName + " - " + ex);
+            }
+        }
+
+        return result;
+    }
+
+    public <I> I get(int id, Class<I> interfaceClz) {
+        @SuppressWarnings("unchecked")
+        I result = (I) instances[id];
+
+        if (result == null)
+            throw new IllegalArgumentException("Implementations missing, ID " + id + ", interafce " + interfaceClz.getName());
+        
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-common/src/test/java/org/apache/kylin/common/util/ImplementationSwitchTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/ImplementationSwitchTest.java b/core-common/src/test/java/org/apache/kylin/common/util/ImplementationSwitchTest.java
new file mode 100644
index 0000000..4c69eeb
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/util/ImplementationSwitchTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.common.util;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+public class ImplementationSwitchTest {
+    
+    ImplementationSwitch sw;
+
+    public ImplementationSwitchTest() {
+        Map<Integer, String> impls = new HashMap<>();
+        impls.put(0, "non.exist.class");
+        impls.put(1, Impl1.class.getName());
+        impls.put(2, Impl2.class.getName());
+        sw = new ImplementationSwitch(impls);
+    }
+    
+    public static interface I {
+    }
+    
+    public static class Impl1 implements I {
+    }
+    
+    public static class Impl2 implements I {
+    }
+    
+    @Test
+    public void test() {
+        assertTrue(sw.get(1, I.class) instanceof Impl1);
+        assertTrue(sw.get(2, I.class) instanceof Impl2);
+    }
+    
+    @Test(expected = IllegalArgumentException.class)  
+    public void testException() {
+        sw.get(0, I.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index eb3b3e2..371221f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -26,9 +26,11 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.LookupDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
@@ -43,8 +45,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 
+@SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class CubeInstance extends RootPersistentEntity implements IRealization {
+public class CubeInstance extends RootPersistentEntity implements IRealization, IBuildable {
 
     public static CubeInstance create(String cubeName, String projectName, CubeDesc cubeDesc) {
         CubeInstance cubeInstance = new CubeInstance();
@@ -205,7 +208,11 @@ public class CubeInstance extends RootPersistentEntity implements IRealization {
 
     @Override
     public String getFactTable() {
-        return this.getDescriptor().getFactTable();
+        return getDescriptor().getFactTable();
+    }
+
+    public TableDesc getFactTableDesc() {
+        return getDescriptor().getFactTableDesc();
     }
 
     @Override
@@ -441,4 +448,20 @@ public class CubeInstance extends RootPersistentEntity implements IRealization {
     public void setRetentionRange(long retentionRange) {
         this.retentionRange = retentionRange;
     }
+
+    @Override
+    public int getSourceType() {
+        return getFactTableDesc().getSourceType();
+    }
+
+    @Override
+    public int getStorageType() {
+        return 0;
+    }
+
+    @Override
+    public int getEngineType() {
+        return 0;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 038c7cb..5cfecf1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -59,7 +59,7 @@ import org.apache.kylin.metadata.realization.IRealizationProvider;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.source.ReadableTable;
-import org.apache.kylin.source.TableSourceFactory;
+import org.apache.kylin.source.SourceFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -207,7 +207,7 @@ public class CubeManager implements IRealizationProvider {
         SnapshotManager snapshotMgr = getSnapshotManager();
 
         TableDesc tableDesc = metaMgr.getTableDesc(lookupTable);
-        ReadableTable hiveTable = TableSourceFactory.createReadableTable(tableDesc);
+        ReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
         SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
 
         cubeSeg.putSnapshotResPath(lookupTable, snapshot.getResourcePath());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index a80bbd2..636217c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -347,16 +347,17 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
 
     @Override
     public int getSourceType() {
-        return 0;
+        return cubeInstance.getSourceType();
     }
 
     @Override
     public int getEngineType() {
-        return 0;
+        return cubeInstance.getEngineType();
     }
 
     @Override
     public int getStorageType() {
-        return 0;
+        return cubeInstance.getStorageType();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index f8d71b2..ae49eb0 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -26,7 +26,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -63,6 +62,7 @@ import com.google.common.collect.Maps;
 
 /**
  */
+@SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
 public class CubeDesc extends RootPersistentEntity {
 
@@ -217,29 +217,6 @@ public class CubeDesc extends RootPersistentEntity {
         return functions;
     }
 
-    /**
-     * @return
-     * @deprecated use getModel().getAllTables() instead
-     */
-    public List<TableDesc> listTables() {
-        MetadataManager metaMgr = MetadataManager.getInstance(config);
-        HashSet<String> tableNames = new HashSet<String>();
-        List<TableDesc> result = new ArrayList<TableDesc>();
-
-        tableNames.add(this.getFactTable().toUpperCase());
-        for (DimensionDesc dim : dimensions) {
-            String table = dim.getTable();
-            if (table != null)
-                tableNames.add(table.toUpperCase());
-        }
-
-        for (String tableName : tableNames) {
-            result.add(metaMgr.getTableDesc(tableName));
-        }
-
-        return result;
-    }
-
     public boolean isDerived(TblColRef col) {
         return derivedToHostMap.containsKey(col);
     }
@@ -330,7 +307,11 @@ public class CubeDesc extends RootPersistentEntity {
     }
 
     public String getFactTable() {
-        return model.getFactTable().toUpperCase();
+        return model.getFactTable();
+    }
+    
+    public TableDesc getFactTableDesc() {
+        return model.getFactTableDesc();
     }
 
     public String[] getNullStrings() {
@@ -456,8 +437,8 @@ public class CubeDesc extends RootPersistentEntity {
         }
 
         sortDimAndMeasure();
-        initDimensionColumns(tables);
-        initMeasureColumns(tables);
+        initDimensionColumns();
+        initMeasureColumns();
 
         rowkey.init(this);
         if (hbaseMapping != null) {
@@ -473,7 +454,7 @@ public class CubeDesc extends RootPersistentEntity {
         }
     }
 
-    private void initDimensionColumns(Map<String, TableDesc> tables) {
+    private void initDimensionColumns() {
         for (DimensionDesc dim : dimensions) {
             JoinDesc join = dim.getJoin();
 
@@ -622,12 +603,12 @@ public class CubeDesc extends RootPersistentEntity {
         return ref;
     }
 
-    private void initMeasureColumns(Map<String, TableDesc> tables) {
+    private void initMeasureColumns() {
         if (measures == null || measures.isEmpty()) {
             return;
         }
 
-        TableDesc factTable = tables.get(getFactTable());
+        TableDesc factTable = getFactTableDesc();
         for (MeasureDesc m : measures) {
             m.setName(m.getName().toUpperCase());
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index f6d76dc..c4b6ef0 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -35,7 +35,7 @@ import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.ReadableTable;
 import org.apache.kylin.source.ReadableTable.TableSignature;
-import org.apache.kylin.source.TableSourceFactory;
+import org.apache.kylin.source.SourceFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -220,7 +220,7 @@ public class DictionaryManager {
             inpTable = factTableValueProvider.getDistinctValuesFor(srcCol);
         } else {
             TableDesc tableDesc = MetadataManager.getInstance(config).getTableDesc(srcTable);
-            inpTable = TableSourceFactory.createReadableTable(tableDesc);
+            inpTable = SourceFactory.createReadableTable(tableDesc);
         }
 
         TableSignature inputSig = inpTable.getSignature();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
index d3e2c7d..149badc 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
@@ -5,7 +5,7 @@ import java.io.IOException;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.TableSourceFactory;
+import org.apache.kylin.source.SourceFactory;
 
 public class SnapshotCLI {
 
@@ -23,7 +23,7 @@ public class SnapshotCLI {
         if (tableDesc == null)
             throw new IllegalArgumentException("Not table found by " + table);
 
-        SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(TableSourceFactory.createReadableTable(tableDesc), tableDesc, overwriteUUID);
+        SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(SourceFactory.createReadableTable(tableDesc), tableDesc, overwriteUUID);
         System.out.println("resource path updated: " + snapshot.getResourcePath());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
deleted file mode 100644
index 7c21e69..0000000
--- a/core-job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-
-public class BuildEngineFactory {
-
-    private static IBatchCubingEngine defaultBatchEngine;
-
-    public static IBatchCubingEngine defaultBatchEngine() {
-        if (defaultBatchEngine == null) {
-            KylinConfig conf = KylinConfig.getInstanceFromEnv();
-            if (conf.isCubingInMem()) {
-                defaultBatchEngine = (IBatchCubingEngine) ClassUtil.newInstance("org.apache.kylin.engine.mr.MRBatchCubingEngine2");
-            } else {
-                defaultBatchEngine = (IBatchCubingEngine) ClassUtil.newInstance("org.apache.kylin.engine.mr.MRBatchCubingEngine");
-            }
-        }
-        return defaultBatchEngine;
-    }
-
-    /** Build a new cube segment, typically its time range appends to the end of current cube. */
-    public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
-        return defaultBatchEngine().createBatchCubingJob(newSegment, submitter);
-    }
-
-    /** Merge multiple small segments into a big one. */
-    public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
-        return defaultBatchEngine().createBatchMergeJob(mergeSegment, submitter);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
new file mode 100644
index 0000000..3880839
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.common.util.ImplementationSwitch;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IEngineAware;
+import static org.apache.kylin.metadata.model.IEngineAware.*;
+
+public class EngineFactory {
+    
+    private static ImplementationSwitch batchEngines;
+    private static ImplementationSwitch streamingEngines;
+    static {
+        Map<Integer, String> impls = new HashMap<>();
+        impls.put(ID_MR, "org.apache.kylin.engine.mr.MRBatchCubingEngine");
+        impls.put(ID_MR_V2, "org.apache.kylin.engine.mr.MRBatchCubingEngine2");
+        batchEngines = new ImplementationSwitch(impls);
+        
+        impls.clear();
+        streamingEngines = new ImplementationSwitch(impls); // TODO
+    }
+    
+    public static IBatchCubingEngine batchEngine(IEngineAware aware) {
+        return batchEngines.get(aware.getEngineType(), IBatchCubingEngine.class);
+    }
+    
+    public static IStreamingCubingEngine streamingEngine(IEngineAware aware) {
+        return streamingEngines.get(aware.getEngineType(), IStreamingCubingEngine.class);
+    }
+    
+    /** Build a new cube segment, typically its time range appends to the end of current cube. */
+    public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
+        return batchEngine(newSegment).createBatchCubingJob(newSegment, submitter);
+    }
+
+    /** Merge multiple small segments into a big one. */
+    public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
+        return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter);
+    }
+    
+    public static Runnable createStreamingCubingBuilder(CubeSegment seg) {
+        return streamingEngine(seg).createStreamingCubingBuilder(seg);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
index 31d7d6c..1c6ef62 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
@@ -36,6 +36,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.Sets;
 
+@SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
 public class DataModelDesc extends RootPersistentEntity {
 
@@ -69,6 +70,8 @@ public class DataModelDesc extends RootPersistentEntity {
     @JsonProperty("capacity")
     private RealizationCapacity capacity = RealizationCapacity.MEDIUM;
 
+    private TableDesc factTableDesc;
+
     /**
      * Error messages during resolving json metadata
      */
@@ -101,6 +104,10 @@ public class DataModelDesc extends RootPersistentEntity {
     public String getFactTable() {
         return factTable;
     }
+    
+    public TableDesc getFactTableDesc() {
+        return factTableDesc;
+    }
 
     public void setFactTable(String factTable) {
         this.factTable = factTable.toUpperCase();
@@ -168,6 +175,11 @@ public class DataModelDesc extends RootPersistentEntity {
     }
 
     public void init(Map<String, TableDesc> tables) {
+        this.factTableDesc = tables.get(this.factTable.toUpperCase());
+        if (factTableDesc == null) {
+            throw new IllegalStateException("Fact table does not exist:" + this.factTable);
+        }
+
         initJoinColumns(tables);
         DimensionDesc.capicalizeStrings(dimensions);
         initPartitionDesc(tables);
@@ -195,6 +207,7 @@ public class DataModelDesc extends RootPersistentEntity {
 
             StringUtil.toUpperCaseArray(join.getForeignKey(), join.getForeignKey());
             StringUtil.toUpperCaseArray(join.getPrimaryKey(), join.getPrimaryKey());
+            
             // primary key
             String[] pks = join.getPrimaryKey();
             TblColRef[] pkCols = new TblColRef[pks.length];
@@ -208,15 +221,12 @@ public class DataModelDesc extends RootPersistentEntity {
                 pkCols[i] = colRef;
             }
             join.setPrimaryKeyColumns(pkCols);
+            
             // foreign key
-            TableDesc factTable = tables.get(this.factTable.toUpperCase());
-            if (factTable == null) {
-                throw new IllegalStateException("Fact table does not exist:" + this.getFactTable());
-            }
             String[] fks = join.getForeignKey();
             TblColRef[] fkCols = new TblColRef[fks.length];
             for (int i = 0; i < fks.length; i++) {
-                ColumnDesc col = factTable.findColumnByName(fks[i]);
+                ColumnDesc col = factTableDesc.findColumnByName(fks[i]);
                 if (col == null) {
                     throw new IllegalStateException("Can't find column " + fks[i] + " in table " + this.getFactTable());
                 }
@@ -225,6 +235,7 @@ public class DataModelDesc extends RootPersistentEntity {
                 fkCols[i] = colRef;
             }
             join.setForeignKeyColumns(fkCols);
+            
             // Validate join in dimension
             if (pkCols.length != fkCols.length) {
                 throw new IllegalStateException("Primary keys(" + lookup.getTable() + ")" + Arrays.toString(pks) + " are not consistent with Foreign keys(" + this.getFactTable() + ") " + Arrays.toString(fks));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-metadata/src/main/java/org/apache/kylin/metadata/model/IBuildable.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IBuildable.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IBuildable.java
index 3090de0..39129f8 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IBuildable.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IBuildable.java
@@ -18,11 +18,6 @@
 
 package org.apache.kylin.metadata.model;
 
-public interface IBuildable {
+public interface IBuildable extends ISourceAware, IEngineAware, IStorageAware {
 
-    int getSourceType();
-
-    int getEngineType();
-
-    int getStorageType();
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-metadata/src/main/java/org/apache/kylin/metadata/model/IEngineAware.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IEngineAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IEngineAware.java
new file mode 100644
index 0000000..607dded
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IEngineAware.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.model;
+
+public interface IEngineAware {
+
+    public static final int ID_MR = 0;
+    public static final int ID_MR_V2 = 2;
+    public static final int ID_SPARK = 5;
+
+    int getEngineType();
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
new file mode 100644
index 0000000..3d89f40
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.model;
+
+public interface ISourceAware {
+
+    public static final int ID_HIVE = 0;
+    public static final int ID_SPARKSQL = 5;
+
+    int getSourceType();
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java
new file mode 100644
index 0000000..ea1aae9
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.model;
+
+public interface IStorageAware {
+
+    public static final int ID_HBASE = 0;
+    public static final int ID_HYBRID = 1;
+
+    int getStorageType();
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index 785e9d4..ad3382d 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -32,8 +32,10 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 /**
  * Table Metadata from Source. All name should be uppercase.
  */
+@SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class TableDesc extends RootPersistentEntity {
+public class TableDesc extends RootPersistentEntity implements ISourceAware {
+    
     @JsonProperty("name")
     private String name;
     @JsonProperty("columns")
@@ -171,4 +173,9 @@ public class TableDesc extends RootPersistentEntity {
         mockup.setName(tableName);
         return mockup;
     }
+
+    @Override
+    public int getSourceType() {
+        return 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
index 6f90e14..8c9258a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
@@ -21,10 +21,11 @@ package org.apache.kylin.metadata.realization;
 import java.util.List;
 
 import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
-public interface IRealization {
+public interface IRealization extends IStorageAware {
 
     public boolean isCapable(SQLDigest digest);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
new file mode 100644
index 0000000..3cd8a02
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import org.apache.kylin.metadata.model.TableDesc;
+
+public interface ISource {
+
+    public <I> I adaptToBuildEngine(Class<I> engineInterface);
+
+    public ReadableTable createReadableTable(TableDesc tableDesc);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-metadata/src/main/java/org/apache/kylin/source/ITableSource.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ITableSource.java b/core-metadata/src/main/java/org/apache/kylin/source/ITableSource.java
deleted file mode 100644
index 83ae8b3..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/source/ITableSource.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source;
-
-import org.apache.kylin.metadata.model.TableDesc;
-
-public interface ITableSource {
-
-    public <I> I adaptToBuildEngine(Class<I> engineInterface);
-
-    public ReadableTable createReadableTable(TableDesc tableDesc);
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
new file mode 100644
index 0000000..2fbf847
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.common.util.ImplementationSwitch;
+import org.apache.kylin.metadata.model.ISourceAware;
+import static org.apache.kylin.metadata.model.ISourceAware.*;
+import org.apache.kylin.metadata.model.TableDesc;
+
+public class SourceFactory {
+
+    private static ImplementationSwitch sources;
+    static {
+        Map<Integer, String> impls = new HashMap<>();
+        impls.put(ID_HIVE, "org.apache.kylin.source.hive.HiveTableSource");
+        sources = new ImplementationSwitch(impls);
+    }
+
+    public static ISource tableSource(ISourceAware aware) {
+        return sources.get(aware.getSourceType(), ISource.class);
+    }
+
+    public static ReadableTable createReadableTable(TableDesc table) {
+        return tableSource(table).createReadableTable(table);
+    }
+
+    public static <T> T createEngineAdapter(ISourceAware table, Class<T> engineInterface) {
+        return tableSource(table).adaptToBuildEngine(engineInterface);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java
deleted file mode 100644
index 67191e3..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source;
-
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.metadata.model.TableDesc;
-
-public class TableSourceFactory {
-
-    private static ITableSource dft = (ITableSource) ClassUtil.newInstance("org.apache.kylin.source.hive.HiveTableSource");
-
-    public static ReadableTable createReadableTable(TableDesc table) {
-        return dft.createReadableTable(table);
-    }
-
-    public static <T> T createEngineAdapter(IBuildable buildable, Class<T> engineInterface) {
-        return dft.adaptToBuildEngine(engineInterface);
-    }
-
-    public static <T> T createEngineAdapter(TableDesc tableDesc, Class<T> engineInterface) {
-        return dft.adaptToBuildEngine(engineInterface);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-storage/src/main/java/org/apache/kylin/storage/IStorage.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/IStorage.java b/core-storage/src/main/java/org/apache/kylin/storage/IStorage.java
index 6506a4f..e229e14 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/IStorage.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/IStorage.java
@@ -22,7 +22,7 @@ import org.apache.kylin.metadata.realization.IRealization;
 
 public interface IStorage {
 
-    public IStorageQuery createStorageQuery(IRealization realization);
+    public IStorageQuery createQuery(IRealization realization);
 
     public <I> I adaptToBuildEngine(Class<I> engineInterface);
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
index b8e3e91..b26dfdb 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
@@ -18,17 +18,37 @@
 
 package org.apache.kylin.storage;
 
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.metadata.model.IBuildable;
+import static org.apache.kylin.metadata.model.IStorageAware.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.common.util.ImplementationSwitch;
+import org.apache.kylin.metadata.model.IStorageAware;
+import org.apache.kylin.metadata.realization.IRealization;
 
 /**
  */
 public class StorageFactory {
 
-    private static final IStorage dft = (IStorage) ClassUtil.newInstance("org.apache.kylin.storage.hbase.HBaseStorage");
-
-    public static <T> T createEngineAdapter(IBuildable buildable, Class<T> engineInterface) {
-        return dft.adaptToBuildEngine(engineInterface);
+    private static ImplementationSwitch storages;
+    static {
+        Map<Integer, String> impls = new HashMap<>();
+        impls.put(ID_HBASE, "org.apache.kylin.storage.hbase.HBaseStorage");
+        impls.put(ID_HYBRID, "org.apache.kylin.storage.hybrid.HybridStorage");
+        storages = new ImplementationSwitch(impls);
+    }
+    
+    public static IStorage storage(IStorageAware aware) {
+        return storages.get(aware.getStorageType(), IStorage.class);
+    }
+    
+    public static IStorageQuery createQuery(IRealization realization) {
+        return storage(realization).createQuery(realization);
+    }
+    
+    public static <T> T createEngineAdapter(IStorageAware aware, Class<T> engineInterface) {
+        return storage(aware).adaptToBuildEngine(engineInterface);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-storage/src/main/java/org/apache/kylin/storage/StorageQueryFactory.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageQueryFactory.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageQueryFactory.java
deleted file mode 100644
index eb6e6b1..0000000
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageQueryFactory.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.storage.cache.CacheFledgedDynamicStorageEngine;
-import org.apache.kylin.storage.cache.CacheFledgedStaticStorageEngine;
-import org.apache.kylin.storage.hybrid.HybridInstance;
-import org.apache.kylin.storage.hybrid.HybridStorageEngine;
-
-import com.google.common.base.Preconditions;
-
-/**
- * @author xjiang
- */
-public class StorageQueryFactory {
-
-    private final static boolean allowStorageLayerCache = true;
-    private final static String defaultCubeStorageQuery = "org.apache.kylin.storage.hbase.cube.v1.CubeStorageQuery";
-    private final static String defaultIIStorageQuery = "org.apache.kylin.storage.hbase.ii.InvertedIndexStorageQuery";
-
-    public static IStorageQuery createQuery(IRealization realization) {
-
-        if (realization.getType() == RealizationType.INVERTED_INDEX) {
-            ICachableStorageQuery ret;
-            try {
-                ret = (ICachableStorageQuery) Class.forName(defaultIIStorageQuery).getConstructor(IIInstance.class).newInstance((IIInstance) realization);
-            } catch (Exception e) {
-                throw new RuntimeException("Failed to initialize storage query for " + defaultIIStorageQuery, e);
-            }
-
-            if (allowStorageLayerCache) {
-                return wrapWithCache(ret, realization);
-            } else {
-                return ret;
-            }
-        } else if (realization.getType() == RealizationType.CUBE) {
-            ICachableStorageQuery ret;
-            try {
-                ret = (ICachableStorageQuery) Class.forName(defaultCubeStorageQuery).getConstructor(CubeInstance.class).newInstance((CubeInstance) realization);
-            } catch (Exception e) {
-                throw new RuntimeException("Failed to initialize storage query for " + defaultCubeStorageQuery, e);
-            }
-
-            if (allowStorageLayerCache) {
-                return wrapWithCache(ret, realization);
-            } else {
-                return ret;
-            }
-        } else {
-            return new HybridStorageEngine((HybridInstance) realization);
-        }
-    }
-
-    private static IStorageQuery wrapWithCache(ICachableStorageQuery underlyingStorageEngine, IRealization realization) {
-        if (underlyingStorageEngine.isDynamic()) {
-            return new CacheFledgedDynamicStorageEngine(underlyingStorageEngine, getPartitionCol(realization));
-        } else {
-            return new CacheFledgedStaticStorageEngine(underlyingStorageEngine);
-        }
-    }
-
-    private static TblColRef getPartitionCol(IRealization realization) {
-        String modelName = realization.getModelName();
-        DataModelDesc dataModelDesc = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getDataModelDesc(modelName);
-        PartitionDesc partitionDesc = dataModelDesc.getPartitionDesc();
-        Preconditions.checkArgument(partitionDesc != null, "PartitionDesc for " + realization + " is null!");
-        TblColRef partitionColRef = partitionDesc.getPartitionDateColumnRef();
-        Preconditions.checkArgument(partitionColRef != null, "getPartitionDateColumnRef for " + realization + " is null");
-        return partitionColRef;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java b/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
new file mode 100644
index 0000000..5ffdf91
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedQuery.java
@@ -0,0 +1,84 @@
+package org.apache.kylin.storage.cache;
+
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.config.CacheConfiguration;
+import net.sf.ehcache.config.Configuration;
+import net.sf.ehcache.config.MemoryUnit;
+import net.sf.ehcache.config.PersistenceConfiguration;
+import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
+
+import org.apache.kylin.metadata.realization.StreamSQLDigest;
+import org.apache.kylin.metadata.tuple.TeeTupleItrListener;
+import org.apache.kylin.storage.ICachableStorageQuery;
+import org.apache.kylin.storage.IStorageQuery;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public abstract class AbstractCacheFledgedQuery implements IStorageQuery, TeeTupleItrListener {
+    private static final Logger logger = LoggerFactory.getLogger(AbstractCacheFledgedQuery.class);
+    private static final String storageCacheTemplate = "StorageCache";
+
+    protected static CacheManager CACHE_MANAGER;
+
+    protected boolean queryCacheExists;
+    protected ICachableStorageQuery underlyingStorage;
+    protected StreamSQLDigest streamSQLDigest;
+
+    public AbstractCacheFledgedQuery(ICachableStorageQuery underlyingStorage) {
+        this.underlyingStorage = underlyingStorage;
+        this.makeCacheIfNecessary(underlyingStorage.getStorageUUID());
+    }
+
+    public static void setCacheManager(CacheManager cacheManager) {
+        CACHE_MANAGER = cacheManager;
+    }
+
+    private static void initCacheManger() {
+        Configuration conf = new Configuration();
+        conf.setMaxBytesLocalHeap("128M");
+        CACHE_MANAGER = CacheManager.create(conf);
+
+        //a fake template for test cases
+        Cache storageCache = new Cache(new CacheConfiguration(storageCacheTemplate, 0).//
+                memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU).//
+                eternal(false).//
+                timeToIdleSeconds(86400).//
+                diskExpiryThreadIntervalSeconds(0).//
+                maxBytesLocalHeap(10, MemoryUnit.MEGABYTES).//
+                persistence(new PersistenceConfiguration().strategy(PersistenceConfiguration.Strategy.NONE)));
+
+        CACHE_MANAGER.addCache(storageCache);
+    }
+
+    private void makeCacheIfNecessary(String storageUUID) {
+        if (CACHE_MANAGER == null) {
+            logger.warn("CACHE_MANAGER is not provided");
+            initCacheManger();
+        }
+
+        if (CACHE_MANAGER.getCache(storageUUID) == null) {
+            logger.info("Cache for {} initting...", storageUUID);
+
+            //Create a Cache specifying its configuration.
+            CacheConfiguration templateConf = CACHE_MANAGER.getCache(storageCacheTemplate).getCacheConfiguration();
+            PersistenceConfiguration pconf = templateConf.getPersistenceConfiguration();
+            if (pconf != null) {
+                logger.info("PersistenceConfiguration strategy: " + pconf.getStrategy());
+            } else {
+                logger.warn("PersistenceConfiguration is null");
+            }
+
+            Cache storageCache = new Cache(new CacheConfiguration(storageUUID, (int) templateConf.getMaxEntriesLocalHeap()).//
+                    memoryStoreEvictionPolicy(templateConf.getMemoryStoreEvictionPolicy()).//
+                    eternal(templateConf.isEternal()).//
+                    timeToIdleSeconds(templateConf.getTimeToIdleSeconds()).//
+                    maxBytesLocalHeap(templateConf.getMaxBytesLocalHeap(), MemoryUnit.BYTES).persistence(pconf));
+            //TODO: deal with failed queries, and only cache too long query
+
+            CACHE_MANAGER.addCache(storageCache);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java b/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
deleted file mode 100644
index 61e008f..0000000
--- a/core-storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package org.apache.kylin.storage.cache;
-
-import net.sf.ehcache.Cache;
-import net.sf.ehcache.CacheManager;
-import net.sf.ehcache.config.CacheConfiguration;
-import net.sf.ehcache.config.Configuration;
-import net.sf.ehcache.config.MemoryUnit;
-import net.sf.ehcache.config.PersistenceConfiguration;
-import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
-
-import org.apache.kylin.metadata.realization.StreamSQLDigest;
-import org.apache.kylin.metadata.tuple.TeeTupleItrListener;
-import org.apache.kylin.storage.ICachableStorageQuery;
-import org.apache.kylin.storage.IStorageQuery;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public abstract class AbstractCacheFledgedStorageEngine implements IStorageQuery, TeeTupleItrListener {
-    private static final Logger logger = LoggerFactory.getLogger(AbstractCacheFledgedStorageEngine.class);
-    private static final String storageCacheTemplate = "StorageCache";
-
-    protected static CacheManager CACHE_MANAGER;
-
-    protected boolean queryCacheExists;
-    protected ICachableStorageQuery underlyingStorage;
-    protected StreamSQLDigest streamSQLDigest;
-
-    public AbstractCacheFledgedStorageEngine(ICachableStorageQuery underlyingStorage) {
-        this.underlyingStorage = underlyingStorage;
-        this.makeCacheIfNecessary(underlyingStorage.getStorageUUID());
-    }
-
-    public static void setCacheManager(CacheManager cacheManager) {
-        CACHE_MANAGER = cacheManager;
-    }
-
-    private static void initCacheManger() {
-        Configuration conf = new Configuration();
-        conf.setMaxBytesLocalHeap("128M");
-        CACHE_MANAGER = CacheManager.create(conf);
-
-        //a fake template for test cases
-        Cache storageCache = new Cache(new CacheConfiguration(storageCacheTemplate, 0).//
-                memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU).//
-                eternal(false).//
-                timeToIdleSeconds(86400).//
-                diskExpiryThreadIntervalSeconds(0).//
-                maxBytesLocalHeap(10, MemoryUnit.MEGABYTES).//
-                persistence(new PersistenceConfiguration().strategy(PersistenceConfiguration.Strategy.NONE)));
-
-        CACHE_MANAGER.addCache(storageCache);
-    }
-
-    private void makeCacheIfNecessary(String storageUUID) {
-        if (CACHE_MANAGER == null) {
-            logger.warn("CACHE_MANAGER is not provided");
-            initCacheManger();
-        }
-
-        if (CACHE_MANAGER.getCache(storageUUID) == null) {
-            logger.info("Cache for {} initting...", storageUUID);
-
-            //Create a Cache specifying its configuration.
-            CacheConfiguration templateConf = CACHE_MANAGER.getCache(storageCacheTemplate).getCacheConfiguration();
-            PersistenceConfiguration pconf = templateConf.getPersistenceConfiguration();
-            if (pconf != null) {
-                logger.info("PersistenceConfiguration strategy: " + pconf.getStrategy());
-            } else {
-                logger.warn("PersistenceConfiguration is null");
-            }
-
-            Cache storageCache = new Cache(new CacheConfiguration(storageUUID, (int) templateConf.getMaxEntriesLocalHeap()).//
-                    memoryStoreEvictionPolicy(templateConf.getMemoryStoreEvictionPolicy()).//
-                    eternal(templateConf.isEternal()).//
-                    timeToIdleSeconds(templateConf.getTimeToIdleSeconds()).//
-                    maxBytesLocalHeap(templateConf.getMaxBytesLocalHeap(), MemoryUnit.BYTES).persistence(pconf));
-            //TODO: deal with failed queries, and only cache too long query
-
-            CACHE_MANAGER.addCache(storageCache);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9e2f94c1/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
new file mode 100644
index 0000000..febe1a9
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicQuery.java
@@ -0,0 +1,149 @@
+package org.apache.kylin.storage.cache;
+
+import java.util.List;
+
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.Element;
+
+import org.apache.kylin.common.util.RangeUtil;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.realization.SQLDigestUtil;
+import org.apache.kylin.metadata.realization.StreamSQLDigest;
+import org.apache.kylin.metadata.tuple.CompoundTupleIterator;
+import org.apache.kylin.metadata.tuple.ITuple;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.metadata.tuple.SimpleTupleIterator;
+import org.apache.kylin.metadata.tuple.TeeTupleIterator;
+import org.apache.kylin.storage.ICachableStorageQuery;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.tuple.TupleInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+
+/**
+ */
+public class CacheFledgedDynamicQuery extends AbstractCacheFledgedQuery {
+    private static final Logger logger = LoggerFactory.getLogger(CacheFledgedDynamicQuery.class);
+
+    private final TblColRef partitionColRef;
+
+    private Range<Long> ts;
+
+    public CacheFledgedDynamicQuery(ICachableStorageQuery underlyingStorage, TblColRef partitionColRef) {
+        super(underlyingStorage);
+        this.partitionColRef = partitionColRef;
+
+        Preconditions.checkArgument(this.partitionColRef != null, "For dynamic columns like " + //
+                this.underlyingStorage.getStorageUUID() + ", partition column must be provided");
+    }
+
+    @Override
+    public ITupleIterator search(final StorageContext context, final SQLDigest sqlDigest, final TupleInfo returnTupleInfo) {
+        //check if ts condition in sqlDigest valid
+        ts = TsConditionExtractor.extractTsCondition(partitionColRef, sqlDigest.filter);
+        if (ts == null || ts.isEmpty()) {
+            logger.info("ts range in the query conflicts,return empty directly");
+            return ITupleIterator.EMPTY_TUPLE_ITERATOR;
+        }
+
+        //enable dynamic cache iff group by columns contains partition col
+        //because cache extraction requires partition col value as selection key
+        boolean needUpdateCache = sqlDigest.groupbyColumns.contains(partitionColRef);
+
+        streamSQLDigest = new StreamSQLDigest(sqlDigest, partitionColRef);
+        StreamSQLResult cachedResult = null;
+        Cache cache = CACHE_MANAGER.getCache(this.underlyingStorage.getStorageUUID());
+        Element element = cache.get(streamSQLDigest.hashCode());
+        if (element != null) {
+            this.queryCacheExists = true;
+            cachedResult = (StreamSQLResult) element.getObjectValue();
+        }
+
+        ITupleIterator ret = null;
+        if (cachedResult != null) {
+            Range<Long> reusePeriod = cachedResult.getReusableResults(ts);
+
+            logger.info("existing cache    : " + cachedResult);
+            logger.info("ts Range in query: " + RangeUtil.formatTsRange(ts));
+            logger.info("potential reusable range   : " + RangeUtil.formatTsRange(reusePeriod));
+
+            if (reusePeriod != null) {
+                List<Range<Long>> remainings = RangeUtil.remove(ts, reusePeriod);
+                if (remainings.size() == 1) {//if using cache causes two underlyingStorage searches, we'd rather not use the cache
+
+                    SimpleTupleIterator reusedTuples = new SimpleTupleIterator(cachedResult.reuse(reusePeriod));
+                    List<ITupleIterator> iTupleIteratorList = Lists.newArrayList();
+                    iTupleIteratorList.add(reusedTuples);
+
+                    for (Range<Long> remaining : remainings) {//actually there will be only one loop
+                        logger.info("Appending ts " + RangeUtil.formatTsRange(remaining) + " as additional filter");
+
+                        ITupleIterator freshTuples = SQLDigestUtil.appendTsFilterToExecute(sqlDigest, partitionColRef, remaining, new Function<Void, ITupleIterator>() {
+                            @Override
+                            public ITupleIterator apply(Void input) {
+                                return underlyingStorage.search(context, sqlDigest, returnTupleInfo);
+                            }
+                        });
+                        iTupleIteratorList.add(freshTuples);
+                    }
+
+                    ret = new CompoundTupleIterator(iTupleIteratorList);
+                } else if (remainings.size() == 0) {
+                    logger.info("The ts range in new query was fully cached");
+                    needUpdateCache = false;
+                    ret = new SimpleTupleIterator(cachedResult.reuse(reusePeriod));
+                } else {
+                    //if using cache causes more than one underlyingStorage searches
+                    //the incurred overhead might be more expensive than the cache benefit
+                    logger.info("Give up using cache to avoid complexity");
+                }
+            }
+        } else {
+            logger.info("no cache entry for this query");
+        }
+
+        if (ret == null) {
+            ret = underlyingStorage.search(context, sqlDigest, returnTupleInfo);
+            logger.info("No Cache being used");
+        } else {
+            logger.info("Cache being used");
+        }
+
+        if (needUpdateCache) {
+            //use another nested ITupleIterator to deal with cache
+            final TeeTupleIterator tee = new TeeTupleIterator(ret);
+            tee.addCloseListener(this);
+            return tee;
+        } else {
+            return ret;
+        }
+    }
+
+    @Override
+    public void notify(List<ITuple> duplicated, long createTime) {
+
+        Range<Long> cacheExclude = this.underlyingStorage.getVolatilePeriod();
+        if (cacheExclude != null) {
+            List<Range<Long>> cachablePeriods = RangeUtil.remove(ts, cacheExclude);
+            if (cachablePeriods.size() == 1) {
+                if (!ts.equals(cachablePeriods.get(0))) {
+                    logger.info("With respect to growing storage, the cacheable tsRange shrinks from " + RangeUtil.formatTsRange(ts) + " to " + RangeUtil.formatTsRange(cachablePeriods.get(0)));
+                }
+                ts = cachablePeriods.get(0);
+            } else {
+                //give up updating the cache, in avoid to make cache complicated
+                logger.info("Skip updating cache to avoid complexity");
+            }
+        }
+
+        StreamSQLResult newCacheEntry = new StreamSQLResult(duplicated, ts, partitionColRef);
+        CACHE_MANAGER.getCache(this.underlyingStorage.getStorageUUID()).put(new Element(streamSQLDigest.hashCode(), newCacheEntry));
+        logger.info("cache after the query: " + newCacheEntry);
+    }
+}


Mime
View raw message