kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shaofeng...@apache.org
Subject [1/2] incubator-kylin git commit: KYLIN-808 fix time str dict merge issue
Date Sun, 07 Jun 2015 02:36:55 GMT
Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 ee53c1643 -> 98e1b2986


KYLIN-808 fix time str dict merge issue


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c2a469c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c2a469c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c2a469c4

Branch: refs/heads/0.8.0
Commit: c2a469c49da9d495bf648bd34afc3da4420536fc
Parents: dda27de
Author: honma <honma@ebay.com>
Authored: Fri Jun 5 17:19:02 2015 +0800
Committer: honma <honma@ebay.com>
Committed: Sun Jun 7 09:22:39 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/util/BasicTest.java |  2 +-
 .../apache/kylin/dict/DictionaryGenerator.java  | 17 +++----
 .../apache/kylin/dict/DictionaryManager.java    | 51 +++++++++++++------
 .../apache/kylin/dict/TimeStrDictionary.java    |  8 +++
 .../kylin/dict/TimeStrDictionaryTests.java      |  2 +
 .../cubev2/MergeCuboidFromHBaseMapper.java      | 15 +++---
 .../kylin/job/streaming/StreamingBootstrap.java |  2 +-
 .../job/hadoop/cube/MergeCuboidMapperTest.java  | 52 ++++++++++----------
 .../apache/kylin/rest/service/BasicService.java |  5 ++
 .../apache/kylin/rest/service/CacheService.java |  6 ++-
 .../apache/kylin/streaming/KafkaConsumer.java   |  3 +-
 11 files changed, 101 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index ff1cd49..73f708d 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -77,7 +77,7 @@ public class BasicTest {
     @Test
     @Ignore("convenient trial tool for dev")
     public void test1() throws Exception {
-        System.out.println(org.apache.kylin.common.util.DateFormat.formatToTimeStr(1433423490000L));
+        System.out.println(org.apache.kylin.common.util.DateFormat.formatToTimeStr(1433496233000L));
         System.out.println(org.apache.kylin.common.util.DateFormat.formatToTimeStr(1433250517000L));
         System.out.println(org.apache.kylin.common.util.DateFormat.stringToMillis("2015-06-01
00:00:00"));
         System.out.println(org.apache.kylin.common.util.DateFormat.stringToMillis("2015-05-15
17:00:00"));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index 7c1f808..a8e27b3 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -45,12 +45,6 @@ public class DictionaryGenerator {
 
     private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd" };
 
-    public static Dictionary<?> buildDictionaryFromValueList(DictionaryInfo info, Collection<byte[]>
values) {
-        Dictionary<?> dict = buildDictionaryFromValueList(DataType.getInstance(info.getDataType()),
values);
-        info.setCardinality(dict.getSize());
-        return dict;
-    }
-
     public static Dictionary<?> buildDictionaryFromValueList(DataType dataType, Collection<byte[]>
values) {
         Preconditions.checkNotNull(dataType, "dataType cannot be null");
         Dictionary dict;
@@ -83,7 +77,7 @@ public class DictionaryGenerator {
         return dict;
     }
 
-    public static Dictionary mergeDictionaries(DictionaryInfo targetInfo, List<DictionaryInfo>
sourceDicts) {
+    public static Dictionary mergeDictionaries(DataType dataType, List<DictionaryInfo>
sourceDicts) {
 
         HashSet<byte[]> dedup = new HashSet<byte[]>();
 
@@ -101,7 +95,8 @@ public class DictionaryGenerator {
         List<byte[]> valueList = new ArrayList<byte[]>();
         valueList.addAll(dedup);
 
-        return buildDictionaryFromValueList(targetInfo, valueList);
+        Dictionary<?> dict = buildDictionaryFromValueList(dataType, valueList);
+        return dict;
     }
 
     public static Dictionary<?> buildDictionary(DictionaryInfo info, ReadableTable
inpTable) throws IOException {
@@ -109,11 +104,13 @@ public class DictionaryGenerator {
         // currently all data types are casted to string to build dictionary
         // String dataType = info.getDataType();
 
-        logger.debug("Building dictionary " + JsonUtil.writeValueAsString(info));
+        logger.debug("Building dictionary object " + JsonUtil.writeValueAsString(info));
 
         ArrayList<byte[]> values = loadColumnValues(inpTable, info.getSourceColumnIndex());
 
-        return buildDictionaryFromValueList(info, values);
+        Dictionary<?> dict = buildDictionaryFromValueList(DataType.getInstance(info.getDataType()),
values);
+
+        return dict;
     }
 
     private static Dictionary buildDateTimeDict(Collection<byte[]> values, int baseId,
int nSamples, ArrayList samples) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index be45899..4b61903 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -18,31 +18,32 @@
 
 package org.apache.kylin.dict;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
+import com.google.common.base.Preconditions;
 import org.apache.commons.compress.utils.IOUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.kylin.dict.lookup.FileTable;
-import org.apache.kylin.dict.lookup.HiveTable;
-import org.apache.kylin.dict.lookup.TableSignature;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.dict.lookup.FileTable;
+import org.apache.kylin.dict.lookup.HiveTable;
 import org.apache.kylin.dict.lookup.ReadableTable;
+import org.apache.kylin.dict.lookup.TableSignature;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class DictionaryManager {
 
@@ -116,6 +117,8 @@ public class DictionaryManager {
     }
 
     public DictionaryInfo mergeDictionary(List<DictionaryInfo> dicts) throws IOException
{
+        Preconditions.checkArgument(dicts.size() >= 2, "dicts size not valid");
+
         DictionaryInfo firstDictInfo = null;
         int totalSize = 0;
         for (DictionaryInfo info : dicts) {
@@ -134,6 +137,7 @@ public class DictionaryManager {
             throw new IllegalArgumentException("DictionaryManager.mergeDictionary input cannot
be null");
         }
 
+        //identical
         DictionaryInfo newDictInfo = new DictionaryInfo(firstDictInfo);
         TableSignature signature = newDictInfo.getInput();
         signature.setSize(totalSize);
@@ -146,9 +150,23 @@ public class DictionaryManager {
             return getDictionaryInfo(dupDict);
         }
 
-        Dictionary<?> newDict = DictionaryGenerator.mergeDictionaries(newDictInfo,
dicts);
+        //check for cases where merging dicts are actually same
+        boolean identicalSourceDicts = true;
+        for (int i = 1; i < dicts.size(); ++i) {
+            if (!dicts.get(0).getDictionaryObject().equals(dicts.get(i).getDictionaryObject()))
{
+                identicalSourceDicts = false;
+                break;
+            }
+        }
 
-        return trySaveNewDict(newDict, newDictInfo);
+        if (identicalSourceDicts) {
+            logger.info("Use one of the merging dictionaries directly");
+            return dicts.get(0);
+        } else {
+            Dictionary<?> newDict = DictionaryGenerator.mergeDictionaries(DataType.getInstance(newDictInfo.getDataType()),
dicts);
+            newDictInfo.setCardinality(newDict.getSize());
+            return trySaveNewDict(newDict, newDictInfo);
+        }
     }
 
     public DictionaryInfo buildDictionary(DataModelDesc model, String dict, TblColRef col,
String factColumnsPath) throws IOException {
@@ -170,6 +188,7 @@ public class DictionaryManager {
         }
 
         Dictionary<?> dictionary = DictionaryGenerator.buildDictionary(dictInfo, inpTable);
+        dictInfo.setCardinality(dictionary.getSize());
 
         return trySaveNewDict(dictionary, dictInfo);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java b/dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
index fde17a8..3759d23 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
@@ -105,6 +105,14 @@ public class TimeStrDictionary extends Dictionary<String> {
     }
 
     @Override
+    public boolean equals(Object o) {
+        if (o == null)
+            return false;
+
+        return o instanceof TimeStrDictionary;
+    }
+
+    @Override
     public void write(DataOutput out) throws IOException {
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/dictionary/src/test/java/org/apache/kylin/dict/TimeStrDictionaryTests.java
----------------------------------------------------------------------
diff --git a/dictionary/src/test/java/org/apache/kylin/dict/TimeStrDictionaryTests.java b/dictionary/src/test/java/org/apache/kylin/dict/TimeStrDictionaryTests.java
index 92f62fb..93b2dae 100644
--- a/dictionary/src/test/java/org/apache/kylin/dict/TimeStrDictionaryTests.java
+++ b/dictionary/src/test/java/org/apache/kylin/dict/TimeStrDictionaryTests.java
@@ -57,4 +57,6 @@ public class TimeStrDictionaryTests {
         Assert.assertEquals(origin, back);
     }
 
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
index ef59d9b..a8283aa 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
@@ -24,9 +24,9 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableMapper;
 import org.apache.hadoop.hbase.mapreduce.TableSplit;
-import org.apache.kylin.common.util.Bytes;
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.SplittedBytes;
@@ -44,7 +44,6 @@ import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.hadoop.cube.KeyValueCreator;
 import org.apache.kylin.metadata.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
@@ -71,7 +70,7 @@ public class MergeCuboidFromHBaseMapper extends TableMapper<ImmutableBytesWritab
     private CubeSegment sourceCubeSegment;// Must be unique during a mapper's
     // life cycle
 
-//    private Text outputKey = new Text();
+    //    private Text outputKey = new Text();
     private ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
 
     private byte[] newKeyBuf;
@@ -102,7 +101,6 @@ public class MergeCuboidFromHBaseMapper extends TableMapper<ImmutableBytesWritab
         }
     }
 
-
     private CubeSegment findSegmentWithHTable(String htable, CubeInstance cubeInstance) {
         for (CubeSegment segment : cubeInstance.getSegments()) {
             String segmentHtable = segment.getStorageLocationIdentifier();
@@ -189,8 +187,14 @@ public class MergeCuboidFromHBaseMapper extends TableMapper<ImmutableBytesWritab
                 }
 
                 int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[i + 1].value,
0, splittedByteses[i + 1].length);
+
                 int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBuf, bufOffset);
-                int idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, bufOffset,
size);
+                int idInMergedDict;
+                if (size < 0) {
+                    idInMergedDict = mergedDict.nullId();
+                } else {
+                    idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, bufOffset,
size);
+                }
                 BytesUtil.writeUnsigned(idInMergedDict, newKeyBuf, bufOffset, mergedDict.getSizeOfId());
 
                 bufOffset += mergedDict.getSizeOfId();
@@ -232,5 +236,4 @@ public class MergeCuboidFromHBaseMapper extends TableMapper<ImmutableBytesWritab
         context.write(outputKey, outputValue);
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index fd590d2..a3036e2 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -172,7 +172,7 @@ public class StreamingBootstrap {
 
         int batchInterval = 5 * 60 * 1000;
         MicroBatchCondition condition = new MicroBatchCondition(Integer.MAX_VALUE, batchInterval);
-        long startTimestamp = cubeInstance.getDateRangeEnd() == 0 ? TimeUtil.getNextPeriodStart(System.currentTimeMillis(),
(long) batchInterval) : cubeInstance.getDateRangeEnd();
+        long startTimestamp = cubeInstance.getDateRangeEnd() == 0 ? TimeUtil.getNextPeriodStart(System.currentTimeMillis()
- 30 * 60 * 1000, (long) batchInterval) : cubeInstance.getDateRangeEnd();
         logger.info("batch time interval is {} to {}", DateFormat.formatToTimeStr(startTimestamp),
DateFormat.formatToTimeStr(startTimestamp + batchInterval));
         StreamBuilder cubeStreamBuilder = new StreamBuilder(allClustersData, condition, new
CubeStreamConsumer(cubeName), startTimestamp);
         cubeStreamBuilder.setStreamParser(getStreamParser(streamingConfig, Lists.transform(new
CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc,
TblColRef>() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
index 83b995a..b1d37bb 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
@@ -22,13 +22,14 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mrunit.mapreduce.MapDriver;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeBuilder;
+import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.dict.*;
 import org.apache.kylin.dict.lookup.TableSignature;
 import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.junit.After;
@@ -73,7 +74,8 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
         List<byte[]> values = new ArrayList<byte[]>();
         values.add(new byte[] { 101, 101, 101 });
         values.add(new byte[] { 102, 102, 102 });
-        Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(newDictInfo,
values);
+        Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(DataType.getInstance(newDictInfo.getDataType()),
values);
+        newDictInfo.setCardinality(dict.getSize());
         dictionaryManager.trySaveNewDict(dict, newDictInfo);
         ((TrieDictionary) dict).dump(System.out);
 
@@ -125,7 +127,8 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
                 values.add(new byte[] { 99, 99, 99 });
             else
                 values.add(new byte[] { 98, 98, 98 });
-            Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(newDictInfo,
values);
+            Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(DataType.getInstance(newDictInfo.getDataType()),
values);
+            newDictInfo.setCardinality(dict.getSize());
             dictionaryManager.trySaveNewDict(dict, newDictInfo);
             ((TrieDictionary) dict).dump(System.out);
 
@@ -139,7 +142,6 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
             isFirstSegment = false;
         }
 
-
         CubeBuilder cubeBuilder = new CubeBuilder(cube);
         cubeBuilder.setToUpdateSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
         cube = cubeManager.updateCube(cubeBuilder);
@@ -155,33 +157,33 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase
{
     @Test
     public void test() throws IOException, ParseException {
 
-//        String cubeName = "test_kylin_cube_without_slr_left_join_ready_2_segments";
+        //        String cubeName = "test_kylin_cube_without_slr_left_join_ready_2_segments";
 
         CubeSegment newSeg = cubeManager.mergeSegments(cube, 0L, 1386835200000L);
-//        String segmentName = newSeg.getName();
+        //        String segmentName = newSeg.getName();
 
         final Dictionary<?> dictionary = cubeManager.getDictionary(newSeg, lfn);
         assertTrue(dictionary == null);
-//        ((TrieDictionary) dictionary).dump(System.out);
+        //        ((TrieDictionary) dictionary).dump(System.out);
 
         // hack for distributed cache
-//        File metaDir = new File("../job/meta");
-//        FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl()), metaDir);
-//
-//        mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-//        mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
-//        // mapDriver.getConfiguration().set(KylinConfig.KYLIN_METADATA_URL,
-//        // "../job/meta");
-//
-//        byte[] key = new byte[] { 0, 0, 0, 0, 0, 0, 0, -92, 1, 1, 1 };
-//        byte[] value = new byte[] { 1, 2, 3 };
-//        byte[] newkey = new byte[] { 0, 0, 0, 0, 0, 0, 0, -92, 1, 1, 2 };
-//        byte[] newvalue = new byte[] { 1, 2, 3 };
-//
-//        mapDriver.withInput(new Text(key), new Text(value));
-//        mapDriver.withOutput(new Text(newkey), new Text(newvalue));
-//        mapDriver.setMapInputPath(new Path("/apps/hdmi-prod/b_kylin/prod/kylin-f24668f6-dcff-4cb6-a89b-77f1119df8fa/vac_sw_cube_v4/cuboid/15d_cuboid"));
-//
-//        mapDriver.runTest();
+        //        File metaDir = new File("../job/meta");
+        //        FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl()), metaDir);
+        //
+        //        mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+        //        mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME,
segmentName);
+        //        // mapDriver.getConfiguration().set(KylinConfig.KYLIN_METADATA_URL,
+        //        // "../job/meta");
+        //
+        //        byte[] key = new byte[] { 0, 0, 0, 0, 0, 0, 0, -92, 1, 1, 1 };
+        //        byte[] value = new byte[] { 1, 2, 3 };
+        //        byte[] newkey = new byte[] { 0, 0, 0, 0, 0, 0, 0, -92, 1, 1, 2 };
+        //        byte[] newvalue = new byte[] { 1, 2, 3 };
+        //
+        //        mapDriver.withInput(new Text(key), new Text(value));
+        //        mapDriver.withOutput(new Text(newkey), new Text(newvalue));
+        //        mapDriver.setMapInputPath(new Path("/apps/hdmi-prod/b_kylin/prod/kylin-f24668f6-dcff-4cb6-a89b-77f1119df8fa/vac_sw_cube_v4/cuboid/15d_cuboid"));
+        //
+        //        mapDriver.runTest();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
index c9eb150..0a8c808 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -83,6 +83,11 @@ public abstract class BasicService {
         }
     }
 
+    protected void cleanAllDataCache() {
+        if (cacheManager != null)
+            cacheManager.clearAll();
+    }
+
     public void removeOLAPDataSource(String project) {
         logger.info("removeOLAPDataSource is called for project " + project);
         if (StringUtils.isEmpty(project))

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 56f9d4b..6cc0de3 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -27,11 +27,13 @@ import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.invertedindex.IIDescManager;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.job.cube.CubingJob;
 import org.apache.kylin.job.cube.CubingJobBuilder;
 import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.rest.constant.Constant;
@@ -101,12 +103,14 @@ public class CacheService extends BasicService {
                 CubeDescManager.clearCache();
                 break;
             case ALL:
-                getMetadataManager().reload();
+                MetadataManager.clearCache();
+                DictionaryManager.clearCache();
                 CubeDescManager.clearCache();
                 CubeManager.clearCache();
                 IIDescManager.clearCache();
                 IIManager.clearCache();
                 ProjectManager.clearCache();
+                cleanAllDataCache();
                 removeAllOLAPDataSources();
                 break;
             default:

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c2a469c4/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
index 488f6e3..6f5e279 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
@@ -111,7 +111,6 @@ public class KafkaConsumer implements Runnable {
             while (isRunning) {
                 int consumeMsgCountAtBeginning = consumeMsgCount;
                 fetchRound++;
-                logger.info("start " + fetchRound + "th round of fetching");
 
                 if (leadBroker == null) {
                     leadBroker = getLeadBroker();
@@ -141,7 +140,7 @@ public class KafkaConsumer implements Runnable {
                     offset++;
                     consumeMsgCount++;
                 }
-                logger.info("Number of messages consumed: " + consumeMsgCount + " offset
is " + offset);
+                logger.info("Number of messages consumed: " + consumeMsgCount + " offset
is: " + offset + " total fetch round: " + fetchRound);
 
                 if (consumeMsgCount == consumeMsgCountAtBeginning)//nothing this round
                 {


Mime
View raw message