kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liy...@apache.org
Subject [25/34] incubator-kylin git commit: KYLIN-663 pre-sort the hive data for BuildIIWithStreamTest
Date Fri, 10 Apr 2015 22:48:02 GMT
KYLIN-663 pre-sort the hive data for BuildIIWithStreamTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8444cf9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8444cf9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8444cf9d

Branch: refs/heads/streaming-localdict
Commit: 8444cf9d794d8b20cf18a1e61ca8ee63ea43555f
Parents: 5a46a15
Author: honma <honma@ebay.com>
Authored: Thu Apr 9 16:33:46 2015 +0800
Committer: honma <honma@ebay.com>
Committed: Thu Apr 9 16:33:46 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/util/BasicTest.java |  12 ++
 .../invertedindex/index/BatchSliceBuilder.java  |  95 --------------
 .../invertedindex/index/BatchSliceMaker.java    |  95 ++++++++++++++
 .../index/IncrementalSliceMaker.java            | 125 +++++++++++++++++++
 .../index/ShardingSliceBuilder.java             |   8 +-
 .../kylin/invertedindex/index/SliceBuilder.java | 125 -------------------
 .../invertedindex/InvertedIndexReducer.java     |   6 +-
 .../apache/kylin/job/BuildIIWithStreamTest.java |  68 ++++++----
 .../apache/kylin/query/test/KylinQueryTest.java |   2 +-
 .../hbase/coprocessor/endpoint/IIEndpoint.java  |  22 ++--
 .../streaming/invertedindex/SliceBuilder.java   |  34 ++---
 11 files changed, 315 insertions(+), 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8444cf9d/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 1d6ac01..bbe6e8b 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -26,6 +26,7 @@ import java.util.*;
 
 import com.google.common.collect.Lists;
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
@@ -61,6 +62,17 @@ public class BasicTest {
     @Test
     @Ignore("convenient trial tool for dev")
     public void test1() throws Exception {
+        String bb = "\\x00\\x00\\x00\\x00\\x01\\x3F\\xD0\\x2D\\58\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00";//2013/07/12
07:59:37
+        String cc = "\\x00\\x00\\x00\\x00\\x01\\x41\\xBE\\x8F\\xD8\\x00\\x00\\x00\\x00\\x00\\x00\\x00";//2013/10/16
08:00:00
+        String dd = "\\x00\\x00\\x00\\x00\\x01\\x41\\xBE\\x8F\\xD8\\x07\\x00\\x18\\x00\\x00\\x00";
+        byte[] bytes = BytesUtil.fromReadableText(dd);
+        long ttt = BytesUtil.readLong(bytes,2,8);
+        System.out.println(time(ttt));
+
+        System.out.println("\\");
+        System.out.println("\\\\");
+
+        System.out.println("The start key is set to " + null);
         System.out.println(time(946684800000L));
         long current = System.currentTimeMillis();
         System.out.println(time(current));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8444cf9d/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
deleted file mode 100644
index 8b2578f..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.invertedindex.index;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.invertedindex.model.IIDesc;
-
-import java.util.List;
-
-/**
- * Created by qianzhou on 3/20/15.
- */
-public class BatchSliceBuilder {
-
-    private final int nColumns;
-    private final int nRecordsCap;
-    private final short shard;
-    private final IIDesc desc;
-
-    private long sliceTimestamp;
-
-    transient ImmutableBytesWritable temp = new ImmutableBytesWritable();
-
-    public BatchSliceBuilder(IIDesc desc, short shard) {
-        this.desc = desc;
-        this.nColumns = desc.listAllColumns().size();
-        this.nRecordsCap = Math.max(1, desc.getSliceSize());
-
-        this.shard = shard;
-        this.sliceTimestamp = Long.MIN_VALUE;
-    }
-
-    public Slice build(TableRecordInfoDigest digest, List<TableRecord> records) {
-        Preconditions.checkArgument(records != null && !records.isEmpty(), "records
cannot be empty");
-        Preconditions.checkArgument(records.size() <= nRecordsCap, "batch count cannot
exceed " + nRecordsCap);
-        sliceTimestamp = increaseSliceTimestamp(records.get(0).getTimestamp());
-        ColumnValueContainer[] containers = new ColumnValueContainer[nColumns];
-        for (int i : desc.getValueColumns()) {
-            containers[i] = new CompressedValueContainer(digest, i, nRecordsCap);
-        }
-        for (int i : desc.getMetricsColumns()) {
-            containers[i] = new CompressedValueContainer(digest, i, nRecordsCap);
-        }
-        for (TableRecord record : records) {
-            for (int i = 0; i < nColumns; i++) {
-                record.getValueBytes(i, temp);
-                containers[i].append(temp);
-            }
-        }
-        return new Slice(digest, shard, sliceTimestamp, containers);
-
-    }
-
-    private long increaseSliceTimestamp(long timestamp) {
-        if (timestamp <= sliceTimestamp) {
-            return sliceTimestamp+1; // ensure slice timestamp increases
-        } else {
-            return timestamp;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8444cf9d/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceMaker.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceMaker.java
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceMaker.java
new file mode 100644
index 0000000..2c3e593
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceMaker.java
@@ -0,0 +1,95 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.invertedindex.index;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.invertedindex.model.IIDesc;
+
+import java.util.List;
+
+/**
+ * Created by qianzhou on 3/20/15.
+ */
+public class BatchSliceMaker {
+
+    private final int nColumns;
+    private final int nRecordsCap;
+    private final short shard;
+    private final IIDesc desc;
+
+    private long sliceTimestamp;
+
+    transient ImmutableBytesWritable temp = new ImmutableBytesWritable();
+
+    public BatchSliceMaker(IIDesc desc, short shard) {
+        this.desc = desc;
+        this.nColumns = desc.listAllColumns().size();
+        this.nRecordsCap = Math.max(1, desc.getSliceSize());
+
+        this.shard = shard;
+        this.sliceTimestamp = Long.MIN_VALUE;
+    }
+
+    public Slice makeSlice(TableRecordInfoDigest digest, List<TableRecord> records)
{
+        Preconditions.checkArgument(records != null && !records.isEmpty(), "records
cannot be empty");
+        Preconditions.checkArgument(records.size() <= nRecordsCap, "batch count cannot
exceed " + nRecordsCap);
+        sliceTimestamp = increaseSliceTimestamp(records.get(0).getTimestamp());
+        ColumnValueContainer[] containers = new ColumnValueContainer[nColumns];
+        for (int i : desc.getValueColumns()) {
+            containers[i] = new CompressedValueContainer(digest, i, nRecordsCap);
+        }
+        for (int i : desc.getMetricsColumns()) {
+            containers[i] = new CompressedValueContainer(digest, i, nRecordsCap);
+        }
+        for (TableRecord record : records) {
+            for (int i = 0; i < nColumns; i++) {
+                record.getValueBytes(i, temp);
+                containers[i].append(temp);
+            }
+        }
+        return new Slice(digest, shard, sliceTimestamp, containers);
+
+    }
+
+    private long increaseSliceTimestamp(long timestamp) {
+        if (timestamp <= sliceTimestamp) {
+            return sliceTimestamp + 1; // ensure slice timestamp increases
+        } else {
+            return timestamp;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8444cf9d/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/IncrementalSliceMaker.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/IncrementalSliceMaker.java
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/IncrementalSliceMaker.java
new file mode 100644
index 0000000..97120c3
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/IncrementalSliceMaker.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.invertedindex.index;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+/**
+ * @author yangli9
+ */
+public class IncrementalSliceMaker {
+
+	TableRecordInfo info;
+	private int nColumns;
+	int nRecordsCap;
+
+	short shard;
+	long sliceTimestamp;
+	int nRecords;
+	private ColumnValueContainer[] containers;
+
+	transient ImmutableBytesWritable temp = new ImmutableBytesWritable();
+
+	public IncrementalSliceMaker(TableRecordInfo info, short shard) {
+		this.info = info;
+		this.nColumns = info.getDigest().getColumnCount();
+		this.nRecordsCap = Math.max(1, info.getDescriptor().getSliceSize());
+
+		this.shard = shard;
+		this.sliceTimestamp = Long.MIN_VALUE;
+		this.nRecords = 0;
+		this.containers = null;
+
+		doneSlice(); // init containers
+	}
+
+	private Slice doneSlice() {
+		Slice r = null;
+		if (nRecords > 0) {
+			for (int i = 0; i < nColumns; i++) {
+				containers[i].closeForChange();
+			}
+			r = new Slice(info.getDigest(), shard, sliceTimestamp, containers);
+		}
+
+		// reset for next slice
+		nRecords = 0;
+		containers = new ColumnValueContainer[nColumns];
+//        for (int i : info.getDescriptor().getBitmapColumns()) {
+//            containers[i] = new CompressedValueContainer(info.getDigest(), i,
+//                    nRecordsCap);
+//        }
+		for (int i : info.getDescriptor().getValueColumns()) {
+			containers[i] = new CompressedValueContainer(info.getDigest(), i,
+					nRecordsCap);
+		}
+		for (int i : info.getDescriptor().getMetricsColumns()) {
+			containers[i] = new CompressedValueContainer(info.getDigest(), i,
+					nRecordsCap);
+		}
+
+		return r;
+
+	}
+
+	// NOTE: record must be appended in time order
+	public Slice append(TableRecord rec) {
+		if (rec.getShard() != shard)
+			throw new IllegalStateException();
+
+		Slice doneSlice = null;
+
+		if (isFull()) {
+			doneSlice = doneSlice();
+		}
+
+		if (nRecords == 0) {
+			sliceTimestamp = increaseSliceTimestamp(rec.getTimestamp());
+		}
+
+		nRecords++;
+		for (int i = 0; i < nColumns; i++) {
+			rec.getValueBytes(i, temp);
+			containers[i].append(temp);
+		}
+
+		return doneSlice;
+	}
+
+	private long increaseSliceTimestamp(long timestamp) {
+		if (timestamp < sliceTimestamp)
+			throw new IllegalStateException();
+
+		if (timestamp == sliceTimestamp)
+			return ++timestamp; // ensure slice timestamp increases
+		else
+			return timestamp;
+	}
+
+	public Slice close() {
+		Slice doneSlice = doneSlice();
+		this.sliceTimestamp = Long.MIN_VALUE;
+		this.nRecords = 0;
+		return doneSlice;
+	}
+
+	private boolean isFull() {
+		return nRecords >= nRecordsCap;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8444cf9d/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/ShardingSliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/ShardingSliceBuilder.java
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/ShardingSliceBuilder.java
index ef2db19..19746a2 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/ShardingSliceBuilder.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/ShardingSliceBuilder.java
@@ -24,13 +24,13 @@ import com.google.common.collect.Lists;
 
 public class ShardingSliceBuilder {
 
-	SliceBuilder[] builders;
+	IncrementalSliceMaker[] builders;
 
 	public ShardingSliceBuilder(TableRecordInfo info) {
 		int sharding = info.getDescriptor().getSharding();
-		builders = new SliceBuilder[sharding];
+		builders = new IncrementalSliceMaker[sharding];
 		for (short i = 0; i < sharding; i++) {
-			builders[i] = new SliceBuilder(info, i);
+			builders[i] = new IncrementalSliceMaker(info, i);
 		}
 	}
 
@@ -42,7 +42,7 @@ public class ShardingSliceBuilder {
 
 	public List<Slice> close() {
 		List<Slice> result = Lists.newArrayList();
-		for (SliceBuilder builder : builders) {
+		for (IncrementalSliceMaker builder : builders) {
 			Slice slice = builder.close();
 			if (slice != null)
 				result.add(slice);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8444cf9d/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
deleted file mode 100644
index a7c79d1..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.invertedindex.index;
-
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
-/**
- * @author yangli9
- */
-public class SliceBuilder {
-
-	TableRecordInfo info;
-	private int nColumns;
-	int nRecordsCap;
-
-	short shard;
-	long sliceTimestamp;
-	int nRecords;
-	private ColumnValueContainer[] containers;
-
-	transient ImmutableBytesWritable temp = new ImmutableBytesWritable();
-
-	public SliceBuilder(TableRecordInfo info, short shard) {
-		this.info = info;
-		this.nColumns = info.getDigest().getColumnCount();
-		this.nRecordsCap = Math.max(1, info.getDescriptor().getSliceSize());
-
-		this.shard = shard;
-		this.sliceTimestamp = Long.MIN_VALUE;
-		this.nRecords = 0;
-		this.containers = null;
-
-		doneSlice(); // init containers
-	}
-
-	private Slice doneSlice() {
-		Slice r = null;
-		if (nRecords > 0) {
-			for (int i = 0; i < nColumns; i++) {
-				containers[i].closeForChange();
-			}
-			r = new Slice(info.getDigest(), shard, sliceTimestamp, containers);
-		}
-
-		// reset for next slice
-		nRecords = 0;
-		containers = new ColumnValueContainer[nColumns];
-//        for (int i : info.getDescriptor().getBitmapColumns()) {
-//            containers[i] = new CompressedValueContainer(info.getDigest(), i,
-//                    nRecordsCap);
-//        }
-		for (int i : info.getDescriptor().getValueColumns()) {
-			containers[i] = new CompressedValueContainer(info.getDigest(), i,
-					nRecordsCap);
-		}
-		for (int i : info.getDescriptor().getMetricsColumns()) {
-			containers[i] = new CompressedValueContainer(info.getDigest(), i,
-					nRecordsCap);
-		}
-
-		return r;
-
-	}
-
-	// NOTE: record must be appended in time order
-	public Slice append(TableRecord rec) {
-		if (rec.getShard() != shard)
-			throw new IllegalStateException();
-
-		Slice doneSlice = null;
-
-		if (isFull()) {
-			doneSlice = doneSlice();
-		}
-
-		if (nRecords == 0) {
-			sliceTimestamp = increaseSliceTimestamp(rec.getTimestamp());
-		}
-
-		nRecords++;
-		for (int i = 0; i < nColumns; i++) {
-			rec.getValueBytes(i, temp);
-			containers[i].append(temp);
-		}
-
-		return doneSlice;
-	}
-
-	private long increaseSliceTimestamp(long timestamp) {
-		if (timestamp < sliceTimestamp)
-			throw new IllegalStateException();
-
-		if (timestamp == sliceTimestamp)
-			return ++timestamp; // ensure slice timestamp increases
-		else
-			return timestamp;
-	}
-
-	public Slice close() {
-		Slice doneSlice = doneSlice();
-		this.sliceTimestamp = Long.MIN_VALUE;
-		this.nRecords = 0;
-		return doneSlice;
-	}
-
-	private boolean isFull() {
-		return nRecords >= nRecordsCap;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8444cf9d/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
index 486e69e..36c048a 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
@@ -27,7 +27,7 @@ import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.SliceBuilder;
+import org.apache.kylin.invertedindex.index.IncrementalSliceMaker;
 import org.apache.kylin.invertedindex.index.TableRecord;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
@@ -45,7 +45,7 @@ public class InvertedIndexReducer extends KylinReducer<LongWritable,
ImmutableBy
 
     private TableRecordInfo info;
     private TableRecord rec;
-    private SliceBuilder builder;
+    private IncrementalSliceMaker builder;
     private IIKeyValueCodec kv;
 
     @Override
@@ -70,7 +70,7 @@ public class InvertedIndexReducer extends KylinReducer<LongWritable,
ImmutableBy
             rec.setBytes(v.get(), v.getOffset(), v.getLength());
 
             if (builder == null) {
-                builder = new SliceBuilder(info, rec.getShard());
+                builder = new IncrementalSliceMaker(info, rec.getShard());
             }
 
             //TODO: to delete this log

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8444cf9d/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index c618c0e..c49de5e 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -34,6 +34,17 @@
 
 package org.apache.kylin.job;
 
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ToolRunner;
@@ -52,10 +63,10 @@ import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
 import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.metadata.util.DateFormat;
 import org.apache.kylin.streaming.Stream;
 import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
 import org.junit.AfterClass;
@@ -65,18 +76,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import static org.junit.Assert.fail;
+import com.google.common.collect.Lists;
 
 /**
  * Created by qianzhou on 3/9/15.
@@ -85,7 +85,7 @@ public class BuildIIWithStreamTest {
 
     private static final Logger logger = LoggerFactory.getLogger(BuildIIWithStreamTest.class);
 
-    private static final String[] II_NAME = new String[]{"test_kylin_ii_left_join", "test_kylin_ii_inner_join"};
+    private static final String[] II_NAME = new String[] { "test_kylin_ii_left_join", "test_kylin_ii_inner_join"
};
     private IIManager iiManager;
     private KylinConfig kylinConfig;
 
@@ -120,10 +120,14 @@ public class BuildIIWithStreamTest {
     }
 
     private static int cleanupOldStorage() throws Exception {
-        String[] args = {"--delete", "true"};
+        return 0;
 
-        int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
-        return exitCode;
+        //do not delete intermediate files for debug purpose
+
+        //        String[] args = {"--delete", "true"};
+        //
+        //        int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
+        //        return exitCode;
     }
 
     private static void backup() throws Exception {
@@ -210,17 +214,19 @@ public class BuildIIWithStreamTest {
         }
         LinkedBlockingDeque<Stream> queue = new LinkedBlockingDeque<Stream>();
         final IISegment segment = createSegment(iiName);
-        String[] args = new String[]{"-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier()};
+        String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier()
};
         ToolRunner.run(new IICreateHTableJob(), args);
 
-
         ExecutorService executorService = Executors.newSingleThreadExecutor();
         final IIStreamBuilder streamBuilder = new IIStreamBuilder(queue, iiName, segment.getStorageLocationIdentifier(),
segment.getIIDesc(), 0);
-        int count = 0;
-        while (reader.next()) {
-            queue.put(parse(reader.getRow()));
-            count++;
+
+        List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
+        int count = sorted.size();
+        for (String[] row : sorted) {
+            logger.info("another row: " + StringUtils.join(row, ","));
+            queue.put(parse(row));
         }
+
         reader.close();
         logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier());
         queue.put(Stream.EOF);
@@ -251,4 +257,20 @@ public class BuildIIWithStreamTest {
         return new Stream(System.currentTimeMillis(), StringUtils.join(row, ",").getBytes());
     }
 
+    private List<String[]> getSortedRows(HiveTableReader reader, final int tsCol) throws
IOException {
+        List<String[]> unsorted = Lists.newArrayList();
+        while (reader.next()) {
+            unsorted.add(reader.getRow());
+        }
+        Collections.sort(unsorted, new Comparator<String[]>() {
+            @Override
+            public int compare(String[] o1, String[] o2) {
+                long t1 = DateFormat.stringToMillis(o1[tsCol]);
+                long t2 = DateFormat.stringToMillis(o2[tsCol]);
+                return Long.compare(t1, t2);
+            }
+        });
+        return unsorted;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8444cf9d/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
index fa0ef5b..f5198e8 100644
--- a/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
@@ -108,7 +108,7 @@ public class KylinQueryTest extends KylinTestBase {
     @Test
     public void testSingleRunQuery() throws Exception {
 
-        String queryFileName = "src/test/resources/query/sql/query62.sql";
+        String queryFileName = "src/test/resources/query/sql/query30.sql";
 
         File sqlFile = new File(queryFileName);
         runSQL(sqlFile, true, true);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8444cf9d/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
index 2068fe2..0c8222f 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map;
 
-import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.hbase.Coprocessor;
@@ -36,7 +35,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.cube.kv.RowKeyColumnIO;
 import org.apache.kylin.dict.Dictionary;
@@ -45,13 +43,15 @@ import org.apache.kylin.invertedindex.index.Slice;
 import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
 import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.invertedindex.model.KeyValueCodec;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.filter.BitMapFilterEvaluator;
 import org.apache.kylin.storage.hbase.coprocessor.*;
 import org.apache.kylin.storage.hbase.coprocessor.endpoint.generated.IIProtos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
@@ -63,6 +63,7 @@ import it.uniroma3.mat.extendedset.intset.ConciseSet;
  */
 public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, CoprocessorService
{
 
+    private static final Logger logger = LoggerFactory.getLogger(IIEndpoint.class);
     private static final int MEMORY_LIMIT = 500 * 1024 * 1024;
 
     private RegionCoprocessorEnvironment env;
@@ -83,30 +84,35 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor,
Cop
             } else {
                 shard = 0;
             }
-            System.out.println("Start key of the region is: " + BytesUtil.toReadableText(regionStartKey)
+ ", making shard to be :" + shard);
+            logger.info("Start key of the region is: " + BytesUtil.toReadableText(regionStartKey)
+ ", making shard to be :" + shard);
         }
 
         if (request.hasTsStart()) {
             Preconditions.checkArgument(shard != -1, "Shard is -1!");
             long tsStart = request.getTsStart();
+            logger.info("ts start is " + tsStart);
 
             byte[] idealStartKey = new byte[IIKeyValueCodec.SHARD_LEN + IIKeyValueCodec.TIMEPART_LEN];
             BytesUtil.writeUnsigned(shard, idealStartKey, 0, IIKeyValueCodec.SHARD_LEN);
             BytesUtil.writeLong(tsStart, idealStartKey, IIKeyValueCodec.SHARD_LEN, IIKeyValueCodec.TIMEPART_LEN);
+            logger.info("ideaStartKey is(readable) :" + BytesUtil.toReadableText(idealStartKey));
             Result result = region.getClosestRowBefore(idealStartKey, IIDesc.HBASE_FAMILY_BYTES);
+            Preconditions.checkArgument(result != null, "No row before " + BytesUtil.toReadableText(idealStartKey));
             byte[] actualStartKey = Arrays.copyOf(result.getRow(), IIKeyValueCodec.SHARD_LEN
+ IIKeyValueCodec.TIMEPART_LEN);
             scan.setStartRow(actualStartKey);
-            System.out.println("The start key is set to " + BytesUtil.toReadableText(actualStartKey));
+            logger.info("The start key is set to " + BytesUtil.toReadableText(actualStartKey));
         }
 
         if (request.hasTsEnd()) {
             Preconditions.checkArgument(shard != -1, "Shard is -1");
             long tsEnd = request.getTsEnd();
+            logger.info("ts end is " + tsEnd);
+
             byte[] actualEndKey = new byte[IIKeyValueCodec.SHARD_LEN + IIKeyValueCodec.TIMEPART_LEN];
             BytesUtil.writeUnsigned(shard, actualEndKey, 0, IIKeyValueCodec.SHARD_LEN);
             BytesUtil.writeLong(tsEnd + 1, actualEndKey, IIKeyValueCodec.SHARD_LEN, IIKeyValueCodec.TIMEPART_LEN);//notice
+1 here
             scan.setStopRow(actualEndKey);
-            System.out.println("The stop key is set to " + BytesUtil.toReadableText(actualEndKey));
+            logger.info("The stop key is set to " + BytesUtil.toReadableText(actualEndKey));
         }
         return scan;
     }
@@ -115,8 +121,6 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor,
Cop
     @Override
     public void getRows(RpcController controller, IIProtos.IIRequest request, RpcCallback<IIProtos.IIResponse>
done) {
 
-        System.out.println("Entry of IIEndpoint");
-
         RegionScanner innerScanner = null;
         HRegion region = null;
 
@@ -134,7 +138,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor,
Cop
             IIProtos.IIResponse response = getResponse(innerScanner, type, projector, aggregators,
filter);
             done.run(response);
         } catch (IOException ioe) {
-            System.out.println(ioe.toString());
+            logger.error(ioe.toString());
             ResponseConverter.setControllerException(controller, ioe);
         } finally {
             IOUtils.closeQuietly(innerScanner);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8444cf9d/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
index b9b2159..22bfa42 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
@@ -34,14 +34,14 @@
 
 package org.apache.kylin.streaming.invertedindex;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.util.Collection;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryGenerator;
-import org.apache.kylin.invertedindex.index.BatchSliceBuilder;
+import org.apache.kylin.invertedindex.index.BatchSliceMaker;
 import org.apache.kylin.invertedindex.index.Slice;
 import org.apache.kylin.invertedindex.index.TableRecord;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
@@ -52,11 +52,10 @@ import org.apache.kylin.streaming.StreamParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
 
 /**
  * Created by qianzhou on 3/27/15.
@@ -65,13 +64,14 @@ public final class SliceBuilder {
 
     private static Logger logger = LoggerFactory.getLogger(SliceBuilder.class);
 
+    private final BatchSliceMaker sliceMaker;
+    private final IIDesc iiDesc;
+
     public SliceBuilder(IIDesc desc, short shard) {
         this.iiDesc = desc;
-        this.sliceBuilder = new BatchSliceBuilder(desc, shard);
+        this.sliceMaker = new BatchSliceMaker(desc, shard);
     }
 
-    private final BatchSliceBuilder sliceBuilder;
-    private final IIDesc iiDesc;
 
     public Slice buildSlice(List<Stream> streams, final StreamParser streamParser)
{
         List<List<String>> table = Lists.transform(streams, new Function<Stream,
List<String>>() {
@@ -83,7 +83,7 @@ public final class SliceBuilder {
         });
         final Dictionary<?>[] dictionaryMap = buildDictionary(table, iiDesc);
         TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaryMap);
-        return build(table, sliceBuilder, tableRecordInfo, dictionaryMap);
+        return build(table,  tableRecordInfo, dictionaryMap);
     }
 
     private Dictionary<?>[] buildDictionary(List<List<String>> table, IIDesc
desc) {
@@ -114,8 +114,8 @@ public final class SliceBuilder {
         return result;
     }
 
-    private Slice build(List<List<String>> table, BatchSliceBuilder sliceBuilder,
final TableRecordInfo tableRecordInfo, Dictionary<?>[] localDictionary) {
-        final Slice slice = sliceBuilder.build(tableRecordInfo.getDigest(), Lists.transform(table,
new Function<List<String>, TableRecord>() {
+    private Slice build(List<List<String>> table,  final TableRecordInfo tableRecordInfo,
Dictionary<?>[] localDictionary) {
+        final Slice slice = sliceMaker.makeSlice(tableRecordInfo.getDigest(), Lists.transform(table,
new Function<List<String>, TableRecord>() {
             @Nullable
             @Override
             public TableRecord apply(@Nullable List<String> input) {


Mime
View raw message