kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liy...@apache.org
Subject [34/50] [abbrv] incubator-kylin git commit: fix compile issue
Date Sat, 14 Mar 2015 00:01:09 GMT
fix compile issue


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b36d1e87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b36d1e87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b36d1e87

Branch: refs/heads/streaming-localdict
Commit: b36d1e87543cd9e4b55ad56dde4ed410656e4c39
Parents: bb67362
Author: qianhao.zhou <qianzhou@ebay.com>
Authored: Thu Mar 12 18:50:45 2015 +0800
Committer: qianhao.zhou <qianzhou@ebay.com>
Committed: Thu Mar 12 18:50:45 2015 +0800

----------------------------------------------------------------------
 .../invertedindex/model/IIKeyValueCodec.java    | 61 +++++++++++++-------
 .../storage/hbase/InvertedIndexHBaseTest.java   |  4 +-
 .../invertedindex/IIStreamBuilder.java          |  8 ---
 3 files changed, 42 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b36d1e87/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
index 6955229..31fa48a 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
@@ -51,23 +51,47 @@ public class IIKeyValueCodec implements KeyValueCodec {
 		ColumnValueContainer[] containers = slice.getColumnValueContainers();
 		for (int col = 0; col < containers.length; col++) {
 			if (containers[col] instanceof CompressedValueContainer) {
-				result.add(collectKeyValues(slice, col, (CompressedValueContainer) containers[col]));
-			} else {
-				throw new IllegalArgumentException("Unknown container class "
+                final IIRow row = collectKeyValues(slice, col, (CompressedValueContainer)
containers[col]);
+                result.add(row);
+            } else {
+                throw new IllegalArgumentException("Unknown container class "
 						+ containers[col].getClass());
-			}
-		}
+            }
+        }
 		return result;
 	}
 
-	private IIRow collectKeyValues(Slice slice,
-			int col,
-			CompressedValueContainer container) {
+	private IIRow collectKeyValues(Slice slice, int col, CompressedValueContainer container)
{
 		ImmutableBytesWritable key = encodeKey(slice.getShard(), slice.getTimestamp(), col);
 		ImmutableBytesWritable value = container.toBytes();
-        return new IIRow(key, value, null);
+        final Dictionary<?> dictionary = slice.getLocalDictionaries().get(col);
+        return new IIRow(key, value, serialize(dictionary));
 	}
 
+    private static Dictionary<?> deserialize(ImmutableBytesWritable dictBytes) {
+        try {
+            final DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(dictBytes.get(),
dictBytes.getOffset(), dictBytes.getLength()));
+            final String type = dataInputStream.readUTF();
+            final Dictionary dictionary = ClassUtil.forName(type, Dictionary.class).newInstance();
+            dictionary.readFields(dataInputStream);
+            return dictionary;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static ImmutableBytesWritable serialize(Dictionary<?> dict) {
+        try {
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            DataOutputStream out = new DataOutputStream(baos);
+            out.writeUTF(dict.getClass().getName());
+            dict.write(out);
+            return new ImmutableBytesWritable(baos.toByteArray());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
 	ImmutableBytesWritable encodeKey(short shard, long timestamp, int col) {
 		byte[] bytes = new byte[20];
 		int len = encodeKey(shard, timestamp, col, bytes, 0);
@@ -139,18 +163,6 @@ public class IIKeyValueCodec implements KeyValueCodec {
 			this.iterator = kvs.iterator();
 		}
 
-        private Dictionary<?> deserialize(ImmutableBytesWritable dictBytes) {
-            try {
-                final DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(dictBytes.get(),
dictBytes.getOffset(), dictBytes.getLength()));
-                final String type = dataInputStream.readUTF();
-                final Dictionary dictionary = ClassUtil.forName(type, Dictionary.class).newInstance();
-                dictionary.readFields(dataInputStream);
-                return dictionary;
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-
 		private void goToNext() {
 			if (slice != null) { // was not fetched
 				return;
@@ -164,6 +176,13 @@ public class IIKeyValueCodec implements KeyValueCodec {
 				decodeKey(k);
                 final Dictionary<?> dictionary = deserialize(kv.getDictionary());
                 addContainer(curCol, new CompressedValueContainer(dictionary.getSizeOfId(),
(dictionary.getMaxId() - dictionary.getMinId() + 1), (dictionary.getMaxId() - dictionary.getMinId()
+ 1)));
+                byte[] bytes = new byte[dictionary.getSizeOfValue()];
+                ImmutableBytesWritable buffer = new ImmutableBytesWritable(bytes);
+                for (int i = dictionary.getMinId(); i <= dictionary.getMaxId(); ++i) {
+                    final int length = dictionary.getValueBytesFromId(i, bytes, 0);
+                    buffer.set(bytes, 0, length);
+                    containers[curCol].append(buffer);
+                }
                 localDictionaries.put(curCol, dictionary);
 
 				if (curShard != lastShard

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b36d1e87/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
index 3ac832a..96c363c 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
@@ -69,7 +69,7 @@ public class InvertedIndexHBaseTest extends HBaseMetadataTestCase {
         Configuration hconf = HadoopUtil.newHBaseConfiguration(hbaseUrl);
         hconn = HConnectionManager.createConnection(hconf);
 
-        this.info = new TableRecordInfo(seg.getIIDesc(), Collections.<TblColRef, Dictionary<?>>emptyMap(),
Collections.<TblColRef, FixedLenMeasureCodec<?>>emptyMap());
+        this.info = new TableRecordInfo(seg.getIIDesc(), Collections.<Integer, Dictionary<?>>emptyMap(),
Collections.<TblColRef, FixedLenMeasureCodec<?>>emptyMap());
     }
 
     @After
@@ -81,7 +81,7 @@ public class InvertedIndexHBaseTest extends HBaseMetadataTestCase {
     public void testLoad() throws Exception {
 
         String tableName = seg.getStorageLocationIdentifier();
-        IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
+        IIKeyValueCodec codec = new IIKeyValueCodec();
 
         List<Slice> slices = Lists.newArrayList();
         HBaseClientKVIterator kvIterator = new HBaseClientKVIterator(hconn, tableName, IIDesc.HBASE_FAMILY_BYTES,
IIDesc.HBASE_QUALIFIER_BYTES);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b36d1e87/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index 10d212a..80ff21b 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -165,14 +165,6 @@ public class IIStreamBuilder extends StreamBuilder {
         return slice;
     }
 
-    private byte[] getDictBytes(Dictionary<?> dict) throws IOException {
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream out = new DataOutputStream(baos);
-        out.writeUTF(dict.getClass().getName());
-        dict.write(out);
-        return baos.toByteArray();
-    }
-
     private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec)
throws IOException {
         try {
             List<Put> data = Lists.newArrayList();


Mime
View raw message