Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9C0FE9812 for ; Fri, 12 Dec 2014 08:22:16 +0000 (UTC) Received: (qmail 28100 invoked by uid 500); 12 Dec 2014 08:22:16 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 27969 invoked by uid 500); 12 Dec 2014 08:22:16 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 27266 invoked by uid 99); 12 Dec 2014 08:22:15 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Dec 2014 08:22:15 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id BBF70A28787; Fri, 12 Dec 2014 08:22:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: jihoonson@apache.org To: commits@tajo.apache.org Date: Fri, 12 Dec 2014 08:22:33 -0000 Message-Id: <0330eda6ddc74952a345cf234ed3592e@git.apache.org> In-Reply-To: <357927479c674b1eaad0f1a135fbfa9a@git.apache.org> References: <357927479c674b1eaad0f1a135fbfa9a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [19/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun Kim via hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java new file mode 100644 index 0000000..c6149f7 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + + +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.util.BytesUtils; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestLazyTuple { + + Schema schema; + byte[][] textRow; + byte[] nullbytes; + SerializerDeserializer serde; + + @Before + public void setUp() { + nullbytes = "\\N".getBytes(); + + schema = new Schema(); + schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN); + schema.addColumn("col2", TajoDataTypes.Type.BIT); + schema.addColumn("col3", TajoDataTypes.Type.CHAR, 7); + schema.addColumn("col4", TajoDataTypes.Type.INT2); + schema.addColumn("col5", TajoDataTypes.Type.INT4); + schema.addColumn("col6", TajoDataTypes.Type.INT8); + schema.addColumn("col7", TajoDataTypes.Type.FLOAT4); + schema.addColumn("col8", TajoDataTypes.Type.FLOAT8); + schema.addColumn("col9", TajoDataTypes.Type.TEXT); + schema.addColumn("col10", TajoDataTypes.Type.BLOB); + schema.addColumn("col11", TajoDataTypes.Type.INET4); + schema.addColumn("col12", TajoDataTypes.Type.INT4); + schema.addColumn("col13", TajoDataTypes.Type.NULL_TYPE); + + StringBuilder sb = new StringBuilder(); + sb.append(DatumFactory.createBool(true)).append('|'); + sb.append(new String(DatumFactory.createBit((byte) 0x99).asTextBytes())).append('|'); + sb.append(DatumFactory.createChar("str")).append('|'); + sb.append(DatumFactory.createInt2((short) 17)).append('|'); + sb.append(DatumFactory.createInt4(59)).append('|'); + sb.append(DatumFactory.createInt8(23l)).append('|'); + sb.append(DatumFactory.createFloat4(77.9f)).append('|'); + sb.append(DatumFactory.createFloat8(271.9f)).append('|'); + sb.append(DatumFactory.createText("str2")).append('|'); + sb.append(DatumFactory.createBlob("jinho".getBytes())).append('|'); + sb.append(DatumFactory.createInet4("192.168.0.1")).append('|'); + sb.append(new String(nullbytes)).append('|'); + sb.append(NullDatum.get()); + textRow = BytesUtils.splitPreserveAllTokens(sb.toString().getBytes(), '|'); + serde = new TextSerializerDeserializer(); + } + + @Test + public void testGetDatum() { + + LazyTuple t1 = new LazyTuple(schema, textRow, -1, nullbytes, serde); + assertEquals(DatumFactory.createBool(true), t1.get(0)); + assertEquals(DatumFactory.createBit((byte) 0x99), t1.get(1)); + assertEquals(DatumFactory.createChar("str"), t1.get(2)); + assertEquals(DatumFactory.createInt2((short) 17), t1.get(3)); + assertEquals(DatumFactory.createInt4(59), t1.get(4)); + assertEquals(DatumFactory.createInt8(23l), t1.get(5)); + assertEquals(DatumFactory.createFloat4(77.9f), t1.get(6)); + assertEquals(DatumFactory.createFloat8(271.9f), t1.get(7)); + assertEquals(DatumFactory.createText("str2"), t1.get(8)); + assertEquals(DatumFactory.createBlob("jinho".getBytes()), t1.get(9)); + assertEquals(DatumFactory.createInet4("192.168.0.1"), t1.get(10)); + assertEquals(NullDatum.get(), t1.get(11)); + assertEquals(NullDatum.get(), t1.get(12)); + } + + @Test + public void testContain() { + int colNum = schema.size(); + + LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); + t1.put(0, DatumFactory.createInt4(1)); + t1.put(3, DatumFactory.createInt4(1)); + t1.put(7, DatumFactory.createInt4(1)); + + assertTrue(t1.contains(0)); + assertFalse(t1.contains(1)); + assertFalse(t1.contains(2)); + assertTrue(t1.contains(3)); + assertFalse(t1.contains(4)); + assertFalse(t1.contains(5)); + assertFalse(t1.contains(6)); + assertTrue(t1.contains(7)); + assertFalse(t1.contains(8)); + assertFalse(t1.contains(9)); + assertFalse(t1.contains(10)); + assertFalse(t1.contains(11)); + assertFalse(t1.contains(12)); + } + + @Test + public void testPut() { + int colNum = schema.size(); + LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); + t1.put(0, DatumFactory.createText("str")); + t1.put(1, DatumFactory.createInt4(2)); + t1.put(11, DatumFactory.createFloat4(0.76f)); + + assertTrue(t1.contains(0)); + assertTrue(t1.contains(1)); + + assertEquals(t1.getText(0), "str"); + assertEquals(t1.get(1).asInt4(), 2); + assertTrue(t1.get(11).asFloat4() == 0.76f); + } + + @Test + public void testEquals() { + int colNum = schema.size(); + LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); + LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1); + + t1.put(0, DatumFactory.createInt4(1)); + t1.put(1, DatumFactory.createInt4(2)); + t1.put(3, DatumFactory.createInt4(2)); + + t2.put(0, DatumFactory.createInt4(1)); + t2.put(1, DatumFactory.createInt4(2)); + t2.put(3, DatumFactory.createInt4(2)); + + assertEquals(t1, t2); + + Tuple t3 = new VTuple(colNum); + t3.put(0, DatumFactory.createInt4(1)); + t3.put(1, DatumFactory.createInt4(2)); + t3.put(3, DatumFactory.createInt4(2)); + assertEquals(t1, t3); + assertEquals(t2, t3); + + LazyTuple t4 = new LazyTuple(schema, new byte[colNum][], -1); + assertNotSame(t1, t4); + } + + @Test + public void testHashCode() { + int colNum = schema.size(); + LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); + LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1); + + t1.put(0, DatumFactory.createInt4(1)); + t1.put(1, DatumFactory.createInt4(2)); + t1.put(3, DatumFactory.createInt4(2)); + t1.put(4, DatumFactory.createText("str")); + + t2.put(0, DatumFactory.createInt4(1)); + t2.put(1, DatumFactory.createInt4(2)); + t2.put(3, DatumFactory.createInt4(2)); + t2.put(4, DatumFactory.createText("str")); + + assertEquals(t1.hashCode(), t2.hashCode()); + + Tuple t3 = new VTuple(colNum); + t3.put(0, DatumFactory.createInt4(1)); + t3.put(1, DatumFactory.createInt4(2)); + t3.put(3, DatumFactory.createInt4(2)); + t3.put(4, DatumFactory.createText("str")); + assertEquals(t1.hashCode(), t3.hashCode()); + assertEquals(t2.hashCode(), t3.hashCode()); + + Tuple t4 = new VTuple(5); + t4.put(0, DatumFactory.createInt4(1)); + t4.put(1, DatumFactory.createInt4(2)); + t4.put(4, DatumFactory.createInt4(2)); + + assertNotSame(t1.hashCode(), t4.hashCode()); + } + + @Test + public void testPutTuple() { + int colNum = schema.size(); + LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); + + t1.put(0, DatumFactory.createInt4(1)); + t1.put(1, DatumFactory.createInt4(2)); + t1.put(2, DatumFactory.createInt4(3)); + + + Schema schema2 = new Schema(); + schema2.addColumn("col1", TajoDataTypes.Type.INT8); + schema2.addColumn("col2", TajoDataTypes.Type.INT8); + + LazyTuple t2 = new LazyTuple(schema2, new byte[schema2.size()][], -1); + t2.put(0, DatumFactory.createInt4(4)); + t2.put(1, DatumFactory.createInt4(5)); + + t1.put(3, t2); + + for (int i = 0; i < 5; i++) { + assertEquals(i + 1, t1.get(i).asInt4()); + } + } + + @Test + public void testInvalidNumber() { + byte[][] bytes = BytesUtils.splitPreserveAllTokens(" 1| |2 ||".getBytes(), '|'); + Schema schema = new Schema(); + schema.addColumn("col1", TajoDataTypes.Type.INT2); + schema.addColumn("col2", TajoDataTypes.Type.INT4); + schema.addColumn("col3", TajoDataTypes.Type.INT8); + schema.addColumn("col4", TajoDataTypes.Type.FLOAT4); + schema.addColumn("col5", TajoDataTypes.Type.FLOAT8); + + LazyTuple tuple = new LazyTuple(schema, bytes, 0); + assertEquals(bytes.length, tuple.size()); + + for (int i = 0; i < tuple.size(); i++){ + assertEquals(NullDatum.get(), tuple.get(i)); + } + } + + @Test + public void testClone() throws CloneNotSupportedException { + int colNum = schema.size(); + LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1); + + t1.put(0, DatumFactory.createInt4(1)); + t1.put(1, DatumFactory.createInt4(2)); + t1.put(3, DatumFactory.createInt4(2)); + t1.put(4, DatumFactory.createText("str")); + + LazyTuple t2 = (LazyTuple) t1.clone(); + assertNotSame(t1, t2); + assertEquals(t1, t2); + + assertSame(t1.get(4), t2.get(4)); + + t1.clear(); + assertFalse(t1.equals(t2)); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java new file mode 100644 index 0000000..639ca04 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestTupleComparator { + + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() throws Exception { + } + + @Test + public final void testCompare() { + Schema schema = new Schema(); + schema.addColumn("col1", Type.INT4); + schema.addColumn("col2", Type.INT4); + schema.addColumn("col3", Type.INT4); + schema.addColumn("col4", Type.INT4); + schema.addColumn("col5", Type.TEXT); + + Tuple tuple1 = new VTuple(5); + Tuple tuple2 = new VTuple(5); + + tuple1.put( + new Datum[] { + DatumFactory.createInt4(9), + DatumFactory.createInt4(3), + DatumFactory.createInt4(33), + DatumFactory.createInt4(4), + DatumFactory.createText("abc")}); + tuple2.put( + new Datum[] { + DatumFactory.createInt4(1), + DatumFactory.createInt4(25), + DatumFactory.createInt4(109), + DatumFactory.createInt4(4), + DatumFactory.createText("abd")}); + + SortSpec sortKey1 = new SortSpec(schema.getColumn("col4"), true, false); + SortSpec sortKey2 = new SortSpec(schema.getColumn("col5"), true, false); + + BaseTupleComparator tc = new BaseTupleComparator(schema, + new SortSpec[] {sortKey1, sortKey2}); + assertEquals(-1, tc.compare(tuple1, tuple2)); + assertEquals(1, tc.compare(tuple2, tuple1)); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java new file mode 100644 index 0000000..1bbd9ec --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + + +import org.apache.tajo.datum.DatumFactory; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestVTuple { + + /** + * @throws Exception + */ + @Before + public void setUp() throws Exception { + + } + + @Test + public void testContain() { + VTuple t1 = new VTuple(260); + t1.put(0, DatumFactory.createInt4(1)); + t1.put(1, DatumFactory.createInt4(1)); + t1.put(27, DatumFactory.createInt4(1)); + t1.put(96, DatumFactory.createInt4(1)); + t1.put(257, DatumFactory.createInt4(1)); + + assertTrue(t1.contains(0)); + assertTrue(t1.contains(1)); + assertFalse(t1.contains(2)); + assertFalse(t1.contains(3)); + assertFalse(t1.contains(4)); + assertTrue(t1.contains(27)); + assertFalse(t1.contains(28)); + assertFalse(t1.contains(95)); + assertTrue(t1.contains(96)); + assertFalse(t1.contains(97)); + assertTrue(t1.contains(257)); + } + + @Test + public void testPut() { + VTuple t1 = new VTuple(260); + t1.put(0, DatumFactory.createText("str")); + t1.put(1, DatumFactory.createInt4(2)); + t1.put(257, DatumFactory.createFloat4(0.76f)); + + assertTrue(t1.contains(0)); + assertTrue(t1.contains(1)); + + assertEquals(t1.getText(0),"str"); + assertEquals(t1.get(1).asInt4(),2); + assertTrue(t1.get(257).asFloat4() == 0.76f); + } + + @Test + public void testEquals() { + Tuple t1 = new VTuple(5); + Tuple t2 = new VTuple(5); + + t1.put(0, DatumFactory.createInt4(1)); + t1.put(1, DatumFactory.createInt4(2)); + t1.put(3, DatumFactory.createInt4(2)); + + t2.put(0, DatumFactory.createInt4(1)); + t2.put(1, DatumFactory.createInt4(2)); + t2.put(3, DatumFactory.createInt4(2)); + + assertEquals(t1,t2); + + Tuple t3 = new VTuple(5); + t2.put(0, DatumFactory.createInt4(1)); + t2.put(1, DatumFactory.createInt4(2)); + t2.put(4, DatumFactory.createInt4(2)); + + assertNotSame(t1,t3); + } + + @Test + public void testHashCode() { + Tuple t1 = new VTuple(5); + Tuple t2 = new VTuple(5); + + t1.put(0, DatumFactory.createInt4(1)); + t1.put(1, DatumFactory.createInt4(2)); + t1.put(3, DatumFactory.createInt4(2)); + t1.put(4, DatumFactory.createText("hyunsik")); + + t2.put(0, DatumFactory.createInt4(1)); + t2.put(1, DatumFactory.createInt4(2)); + t2.put(3, DatumFactory.createInt4(2)); + t2.put(4, DatumFactory.createText("hyunsik")); + + assertEquals(t1.hashCode(),t2.hashCode()); + + Tuple t3 = new VTuple(5); + t3.put(0, DatumFactory.createInt4(1)); + t3.put(1, DatumFactory.createInt4(2)); + t3.put(4, DatumFactory.createInt4(2)); + + assertNotSame(t1.hashCode(),t3.hashCode()); + } + + @Test + public void testPutTuple() { + Tuple t1 = new VTuple(5); + + t1.put(0, DatumFactory.createInt4(1)); + t1.put(1, DatumFactory.createInt4(2)); + t1.put(2, DatumFactory.createInt4(3)); + + Tuple t2 = new VTuple(2); + t2.put(0, DatumFactory.createInt4(4)); + t2.put(1, DatumFactory.createInt4(5)); + + t1.put(3, t2); + + for (int i = 0; i < 5; i++) { + assertEquals(i+1, t1.get(i).asInt4()); + } + } + + @Test + public void testClone() throws CloneNotSupportedException { + Tuple t1 = new VTuple(5); + + t1.put(0, DatumFactory.createInt4(1)); + t1.put(1, DatumFactory.createInt4(2)); + t1.put(3, DatumFactory.createInt4(2)); + t1.put(4, DatumFactory.createText("str")); + + VTuple t2 = (VTuple) t1.clone(); + assertNotSame(t1, t2); + assertEquals(t1, t2); + + assertSame(t1.get(4), t2.get(4)); + + t1.clear(); + assertFalse(t1.equals(t2)); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java new file mode 100644 index 0000000..b332364 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple; + +import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.tuple.offheap.*; +import org.junit.Test; + +public class TestBaseTupleBuilder { + + @Test + public void testBuild() { + BaseTupleBuilder builder = new BaseTupleBuilder(TestOffHeapRowBlock.schema); + + OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(10248); + OffHeapRowBlockReader reader = rowBlock.getReader(); + + ZeroCopyTuple inputTuple = new ZeroCopyTuple(); + + HeapTuple heapTuple = null; + ZeroCopyTuple zcTuple = null; + int i = 0; + while(reader.next(inputTuple)) { + RowStoreUtil.convert(inputTuple, builder); + + heapTuple = builder.buildToHeapTuple(); + TestOffHeapRowBlock.validateTupleResult(i, heapTuple); + + zcTuple = builder.buildToZeroCopyTuple(); + TestOffHeapRowBlock.validateTupleResult(i, zcTuple); + + i++; + } + } + + @Test + public void testBuildWithNull() { + BaseTupleBuilder builder = new BaseTupleBuilder(TestOffHeapRowBlock.schema); + + OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlockWithNull(10248); + OffHeapRowBlockReader reader = rowBlock.getReader(); + + ZeroCopyTuple inputTuple = new ZeroCopyTuple(); + + HeapTuple heapTuple = null; + ZeroCopyTuple zcTuple = null; + int i = 0; + while(reader.next(inputTuple)) { + RowStoreUtil.convert(inputTuple, builder); + + heapTuple = builder.buildToHeapTuple(); + TestOffHeapRowBlock.validateNullity(i, heapTuple); + + zcTuple = builder.buildToZeroCopyTuple(); + TestOffHeapRowBlock.validateNullity(i, zcTuple); + + i++; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java new file mode 100644 index 0000000..96f465a --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java @@ -0,0 +1,45 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.catalog.SchemaUtil; +import org.junit.Test; + +public class TestHeapTuple { + + @Test + public void testHeapTuple() { + OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(1024); + + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + + ZeroCopyTuple zcTuple = new ZeroCopyTuple(); + int i = 0; + while (reader.next(zcTuple)) { + byte [] bytes = new byte[zcTuple.nioBuffer().limit()]; + zcTuple.nioBuffer().get(bytes); + + HeapTuple heapTuple = new HeapTuple(bytes, SchemaUtil.toDataTypes(TestOffHeapRowBlock.schema)); + TestOffHeapRowBlock.validateTupleResult(i, heapTuple); + i++; + } + + rowBlock.release(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java new file mode 100644 index 0000000..c43ba38 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java @@ -0,0 +1,577 @@ +/*** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.*; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; +import org.apache.tajo.storage.BaseTupleComparator; +import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.ProtoUtil; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; + +import static org.apache.tajo.common.TajoDataTypes.Type; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestOffHeapRowBlock { + private static final Log LOG = LogFactory.getLog(TestOffHeapRowBlock.class); + public static String UNICODE_FIELD_PREFIX = "abc_가나다_"; + public static Schema schema; + + static { + schema = new Schema(); + schema.addColumn("col0", Type.BOOLEAN); + schema.addColumn("col1", Type.INT2); + schema.addColumn("col2", Type.INT4); + schema.addColumn("col3", Type.INT8); + schema.addColumn("col4", Type.FLOAT4); + schema.addColumn("col5", Type.FLOAT8); + schema.addColumn("col6", Type.TEXT); + schema.addColumn("col7", Type.TIMESTAMP); + schema.addColumn("col8", Type.DATE); + schema.addColumn("col9", Type.TIME); + schema.addColumn("col10", Type.INTERVAL); + schema.addColumn("col11", Type.INET4); + schema.addColumn("col12", + CatalogUtil.newDataType(TajoDataTypes.Type.PROTOBUF, PrimitiveProtos.StringProto.class.getName())); + } + + private void explainRowBlockAllocation(OffHeapRowBlock rowBlock, long startTime, long endTime) { + LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated " + + (endTime - startTime) + " msec"); + } + + @Test + public void testPutAndReadValidation() { + int rowNum = 1000; + + long allocStart = System.currentTimeMillis(); + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024); + long allocEnd = System.currentTimeMillis(); + explainRowBlockAllocation(rowBlock, allocStart, allocEnd); + + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + + ZeroCopyTuple tuple = new ZeroCopyTuple(); + long writeStart = System.currentTimeMillis(); + for (int i = 0; i < rowNum; i++) { + fillRow(i, rowBlock.getWriter()); + + reader.reset(); + int j = 0; + while(reader.next(tuple)) { + validateTupleResult(j, tuple); + + j++; + } + } + long writeEnd = System.currentTimeMillis(); + LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec"); + + long readStart = System.currentTimeMillis(); + tuple = new ZeroCopyTuple(); + int j = 0; + reader.reset(); + while(reader.next(tuple)) { + validateTupleResult(j, tuple); + j++; + } + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + + rowBlock.release(); + } + + @Test + public void testNullityValidation() { + int rowNum = 1000; + + long allocStart = System.currentTimeMillis(); + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024); + long allocEnd = System.currentTimeMillis(); + explainRowBlockAllocation(rowBlock, allocStart, allocEnd); + + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + ZeroCopyTuple tuple = new ZeroCopyTuple(); + long writeStart = System.currentTimeMillis(); + for (int i = 0; i < rowNum; i++) { + fillRowBlockWithNull(i, rowBlock.getWriter()); + + reader.reset(); + int j = 0; + while(reader.next(tuple)) { + validateNullity(j, tuple); + + j++; + } + } + long writeEnd = System.currentTimeMillis(); + LOG.info("writing and nullity validating take " + (writeEnd - writeStart) +" msec"); + + long readStart = System.currentTimeMillis(); + tuple = new ZeroCopyTuple(); + int j = 0; + reader.reset(); + while(reader.next(tuple)) { + validateNullity(j, tuple); + + j++; + } + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + + rowBlock.release(); + } + + @Test + public void testEmptyRow() { + int rowNum = 1000; + + long allocStart = System.currentTimeMillis(); + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 10); + long allocEnd = System.currentTimeMillis(); + explainRowBlockAllocation(rowBlock, allocStart, allocEnd); + + long writeStart = System.currentTimeMillis(); + for (int i = 0; i < rowNum; i++) { + rowBlock.getWriter().startRow(); + // empty columns + rowBlock.getWriter().endRow(); + } + long writeEnd = System.currentTimeMillis(); + LOG.info("writing tooks " + (writeEnd - writeStart) + " msec"); + + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + + long readStart = System.currentTimeMillis(); + ZeroCopyTuple tuple = new ZeroCopyTuple(); + int j = 0; + reader.reset(); + while(reader.next(tuple)) { + j++; + } + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + rowBlock.release(); + + assertEquals(rowNum, j); + assertEquals(rowNum, rowBlock.rows()); + } + + @Test + public void testSortBenchmark() { + int rowNum = 1000; + + OffHeapRowBlock rowBlock = createRowBlock(rowNum); + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + + List unSafeTuples = Lists.newArrayList(); + + long readStart = System.currentTimeMillis(); + ZeroCopyTuple tuple = new ZeroCopyTuple(); + reader.reset(); + while(reader.next(tuple)) { + unSafeTuples.add(tuple); + tuple = new ZeroCopyTuple(); + } + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + + SortSpec sortSpec = new SortSpec(new Column("col2", Type.INT4)); + BaseTupleComparator comparator = new BaseTupleComparator(schema, new SortSpec[] {sortSpec}); + + long sortStart = System.currentTimeMillis(); + Collections.sort(unSafeTuples, comparator); + long sortEnd = System.currentTimeMillis(); + LOG.info("sorting took " + (sortEnd - sortStart) + " msec"); + rowBlock.release(); + } + + @Test + public void testVTuplePutAndGetBenchmark() { + int rowNum = 1000; + + List rowBlock = Lists.newArrayList(); + long writeStart = System.currentTimeMillis(); + VTuple tuple; + for (int i = 0; i < rowNum; i++) { + tuple = new VTuple(schema.size()); + fillVTuple(i, tuple); + rowBlock.add(tuple); + } + long writeEnd = System.currentTimeMillis(); + LOG.info("Writing takes " + (writeEnd - writeStart) + " msec"); + + long readStart = System.currentTimeMillis(); + int j = 0; + for (VTuple t : rowBlock) { + validateTupleResult(j, t); + j++; + } + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + + int count = 0; + for (int l = 0; l < rowBlock.size(); l++) { + for(int m = 0; m < schema.size(); m++ ) { + if (rowBlock.get(l).contains(m) && rowBlock.get(l).get(m).type() == Type.INT4) { + count ++; + } + } + } + // For preventing unnecessary code elimination optimization. + LOG.info("The number of INT4 values is " + count + "."); + } + + @Test + public void testVTuplePutAndGetBenchmarkViaDirectRowEncoder() { + int rowNum = 1000; + + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 100); + + long writeStart = System.currentTimeMillis(); + VTuple tuple = new VTuple(schema.size()); + for (int i = 0; i < rowNum; i++) { + fillVTuple(i, tuple); + + RowStoreUtil.convert(tuple, rowBlock.getWriter()); + } + long writeEnd = System.currentTimeMillis(); + LOG.info("Writing takes " + (writeEnd - writeStart) + " msec"); + + validateResults(rowBlock); + rowBlock.release(); + } + + @Test + public void testSerDerOfRowBlock() { + int rowNum = 1000; + + OffHeapRowBlock rowBlock = createRowBlock(rowNum); + + ByteBuffer bb = rowBlock.nioBuffer(); + OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb); + validateResults(restoredRowBlock); + rowBlock.release(); + } + + @Test + public void testSerDerOfZeroCopyTuple() { + int rowNum = 1000; + + OffHeapRowBlock rowBlock = createRowBlock(rowNum); + + ByteBuffer bb = rowBlock.nioBuffer(); + OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb); + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(restoredRowBlock); + + long readStart = System.currentTimeMillis(); + ZeroCopyTuple tuple = new ZeroCopyTuple(); + ZeroCopyTuple copyTuple = new ZeroCopyTuple(); + int j = 0; + reader.reset(); + while(reader.next(tuple)) { + ByteBuffer copy = tuple.nioBuffer(); + copyTuple.set(copy, SchemaUtil.toDataTypes(schema)); + + validateTupleResult(j, copyTuple); + + j++; + } + long readEnd = System.currentTimeMillis(); + LOG.info("reading takes " + (readEnd - readStart) + " msec"); + + rowBlock.release(); + } + + public static OffHeapRowBlock createRowBlock(int rowNum) { + long allocateStart = System.currentTimeMillis(); + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8); + long allocatedEnd = System.currentTimeMillis(); + LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated " + + (allocatedEnd - allocateStart) + " msec"); + + long writeStart = System.currentTimeMillis(); + for (int i = 0; i < rowNum; i++) { + fillRow(i, rowBlock.getWriter()); + } + long writeEnd = System.currentTimeMillis(); + LOG.info("writing takes " + (writeEnd - writeStart) + " msec"); + + return rowBlock; + } + + public static OffHeapRowBlock createRowBlockWithNull(int rowNum) { + long allocateStart = System.currentTimeMillis(); + OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8); + long allocatedEnd = System.currentTimeMillis(); + LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated " + + (allocatedEnd - allocateStart) + " msec"); + + long writeStart = System.currentTimeMillis(); + for (int i = 0; i < rowNum; i++) { + fillRowBlockWithNull(i, rowBlock.getWriter()); + } + long writeEnd = System.currentTimeMillis(); + LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec"); + + return rowBlock; + } + + public static void fillRow(int i, RowWriter builder) { + builder.startRow(); + builder.putBool(i % 1 == 0 ? true : false); // 0 + builder.putInt2((short) 1); // 1 + builder.putInt4(i); // 2 + builder.putInt8(i); // 3 + builder.putFloat4(i); // 4 + builder.putFloat8(i); // 5 + builder.putText((UNICODE_FIELD_PREFIX + i).getBytes()); // 6 + builder.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7 + builder.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8 + builder.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9 + builder.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10 + builder.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11 + builder.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12 + builder.endRow(); + } + + public static void fillRowBlockWithNull(int i, RowWriter writer) { + writer.startRow(); + + if (i == 0) { + writer.skipField(); + } else { + writer.putBool(i % 1 == 0 ? true : false); // 0 + } + if (i % 1 == 0) { + writer.skipField(); + } else { + writer.putInt2((short) 1); // 1 + } + + if (i % 2 == 0) { + writer.skipField(); + } else { + writer.putInt4(i); // 2 + } + + if (i % 3 == 0) { + writer.skipField(); + } else { + writer.putInt8(i); // 3 + } + + if (i % 4 == 0) { + writer.skipField(); + } else { + writer.putFloat4(i); // 4 + } + + if (i % 5 == 0) { + writer.skipField(); + } else { + writer.putFloat8(i); // 5 + } + + if (i % 6 == 0) { + writer.skipField(); + } else { + writer.putText((UNICODE_FIELD_PREFIX + i).getBytes()); // 6 + } + + if (i % 7 == 0) { + writer.skipField(); + } else { + writer.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7 + } + + if (i % 8 == 0) { + writer.skipField(); + } else { + writer.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8 + } + + if (i % 9 == 0) { + writer.skipField(); + } else { + writer.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9 + } + + if (i % 10 == 0) { + writer.skipField(); + } else { + writer.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10 + } + + if (i % 11 == 0) { + writer.skipField(); + } else { + writer.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11 + } + + if (i % 12 == 0) { + writer.skipField(); + } else { + writer.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12 + } + + writer.endRow(); + } + + public static void fillVTuple(int i, VTuple tuple) { + tuple.put(0, DatumFactory.createBool(i % 1 == 0)); + tuple.put(1, DatumFactory.createInt2((short) 1)); + tuple.put(2, DatumFactory.createInt4(i)); + tuple.put(3, DatumFactory.createInt8(i)); + tuple.put(4, DatumFactory.createFloat4(i)); + tuple.put(5, DatumFactory.createFloat8(i)); + tuple.put(6, DatumFactory.createText((UNICODE_FIELD_PREFIX + i).getBytes())); + tuple.put(7, DatumFactory.createTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i)); // 7 + tuple.put(8, DatumFactory.createDate(DatumFactory.createDate("2014-04-16").asInt4() + i)); // 8 + tuple.put(9, DatumFactory.createTime(DatumFactory.createTime("08:48:00").asInt8() + i)); // 9 + tuple.put(10, DatumFactory.createInterval((i + 1) + " hours")); // 10 + tuple.put(11, DatumFactory.createInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i)); // 11 + tuple.put(12, new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12; + } + + public static void validateResults(OffHeapRowBlock rowBlock) { + long readStart = System.currentTimeMillis(); + ZeroCopyTuple tuple = new ZeroCopyTuple(); + int j = 0; + OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); + reader.reset(); + while(reader.next(tuple)) { + validateTupleResult(j, tuple); + j++; + } + long readEnd = System.currentTimeMillis(); + LOG.info("Reading takes " + (readEnd - readStart) + " msec"); + } + + public static void validateTupleResult(int j, Tuple t) { + assertTrue((j % 1 == 0) == t.getBool(0)); + assertTrue(1 == t.getInt2(1)); + assertEquals(j, t.getInt4(2)); + assertEquals(j, t.getInt8(3)); + assertTrue(j == t.getFloat4(4)); + assertTrue(j == t.getFloat8(5)); + assertEquals(new String(UNICODE_FIELD_PREFIX + j), t.getText(6)); + assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, t.getInt8(7)); + assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, t.getInt4(8)); + assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, t.getInt8(9)); + assertEquals(DatumFactory.createInterval((j + 1) + " hours"), t.getInterval(10)); + assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, t.getInt4(11)); + assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), t.getProtobufDatum(12)); + } + + public static void validateNullity(int j, Tuple tuple) { + if (j == 0) { + tuple.isNull(0); + } else { + assertTrue((j % 1 == 0) == tuple.getBool(0)); + } + + if (j % 1 == 0) { + tuple.isNull(1); + } else { + assertTrue(1 == tuple.getInt2(1)); + } + + if (j % 2 == 0) { + tuple.isNull(2); + } else { + assertEquals(j, tuple.getInt4(2)); + } + + if (j % 3 == 0) { + tuple.isNull(3); + } else { + assertEquals(j, tuple.getInt8(3)); + } + + if (j % 4 == 0) { + tuple.isNull(4); + } else { + assertTrue(j == tuple.getFloat4(4)); + } + + if (j % 5 == 0) { + tuple.isNull(5); + } else { + assertTrue(j == tuple.getFloat8(5)); + } + + if (j % 6 == 0) { + tuple.isNull(6); + } else { + assertEquals(new String(UNICODE_FIELD_PREFIX + j), tuple.getText(6)); + } + + if (j % 7 == 0) { + tuple.isNull(7); + } else { + assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, tuple.getInt8(7)); + } + + if (j % 8 == 0) { + tuple.isNull(8); + } else { + assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, tuple.getInt4(8)); + } + + if (j % 9 == 0) { + tuple.isNull(9); + } else { + assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, tuple.getInt8(9)); + } + + if (j % 10 == 0) { + tuple.isNull(10); + } else { + assertEquals(DatumFactory.createInterval((j + 1) + " hours"), tuple.getInterval(10)); + } + + if (j % 11 == 0) { + tuple.isNull(11); + } else { + assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, tuple.getInt4(11)); + } + + if (j % 12 == 0) { + tuple.isNull(12); + } else { + assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), tuple.getProtobufDatum(12)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java new file mode 100644 index 0000000..1eb9c17 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.tuple.offheap; + +import org.apache.tajo.unit.StorageUnit; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestResizableSpec { + + @Test + public void testResizableLimit() { + ResizableLimitSpec limit = new ResizableLimitSpec(10 * StorageUnit.MB, 1000 * StorageUnit.MB, 0.1f, 1.0f); + + long expectedMaxSize = (long) (1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f)); + + assertTrue(limit.limit() == 1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f)); + + assertEquals(20971520, limit.increasedSize(10 * StorageUnit.MB)); + + assertEquals(expectedMaxSize, limit.increasedSize(1600 * StorageUnit.MB)); + + assertEquals(0.98f, limit.remainRatio(980 * StorageUnit.MB), 0.1); + + assertFalse(limit.canIncrease(limit.limit())); + } + + @Test + public void testFixedLimit() { + FixedSizeLimitSpec limit = new FixedSizeLimitSpec(100 * StorageUnit.MB, 0.0f); + + assertEquals(limit.limit(), 100 * StorageUnit.MB); + + assertEquals(100 * StorageUnit.MB, limit.increasedSize(1000)); + + assertEquals(100 * StorageUnit.MB, limit.increasedSize(1600 * StorageUnit.MB)); + + assertTrue(0.98f == limit.remainRatio(98 * StorageUnit.MB)); + + assertFalse(limit.canIncrease(limit.limit())); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml new file mode 100644 index 0000000..d1c561b --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml @@ -0,0 +1,164 @@ + + + + + + + + fs.s3.impl + org.apache.tajo.storage.s3.SmallBlockS3FileSystem + + + + + tajo.storage.manager.hdfs.class + org.apache.tajo.storage.FileStorageManager + + + tajo.storage.manager.hbase.class + org.apache.tajo.storage.hbase.HBaseStorageManager + + + + + tajo.storage.scanner-handler + csv,raw,rcfile,row,trevni,parquet,sequencefile,avro + + + + + tajo.storage.fragment.csv.class + org.apache.tajo.storage.fragment.FileFragment + + + tajo.storage.fragment.raw.class + org.apache.tajo.storage.fragment.FileFragment + + + tajo.storage.fragment.rcfile.class + org.apache.tajo.storage.fragment.FileFragment + + + tajo.storage.fragment.row.class + org.apache.tajo.storage.fragment.FileFragment + + + tajo.storage.fragment.trevni.class + org.apache.tajo.storage.fragment.FileFragment + + + tajo.storage.fragment.parquet.class + org.apache.tajo.storage.FileFragment + + + tajo.storage.fragment.sequencefile.class + org.apache.tajo.storage.fragment.FileFragment + + + tajo.storage.fragment.avro.class + org.apache.tajo.storage.fragment.FileFragment + + + + + tajo.storage.scanner-handler.csv.class + org.apache.tajo.storage.CSVFile$CSVScanner + + + + tajo.storage.scanner-handler.raw.class + org.apache.tajo.storage.RawFile$RawFileScanner + + + + tajo.storage.scanner-handler.rcfile.class + org.apache.tajo.storage.rcfile.RCFile$RCFileScanner + + + + tajo.storage.scanner-handler.rowfile.class + org.apache.tajo.storage.RowFile$RowFileScanner + + + + tajo.storage.scanner-handler.trevni.class + org.apache.tajo.storage.trevni.TrevniScanner + + + + tajo.storage.scanner-handler.parquet.class + org.apache.tajo.storage.parquet.ParquetScanner + + + + tajo.storage.scanner-handler.sequencefile.class + org.apache.tajo.storage.sequencefile.SequenceFileScanner + + + + tajo.storage.scanner-handler.avro.class + org.apache.tajo.storage.avro.AvroScanner + + + + + tajo.storage.appender-handler + csv,raw,rcfile,row,trevni,parquet,sequencefile,avro + + + + tajo.storage.appender-handler.csv.class + org.apache.tajo.storage.CSVFile$CSVAppender + + + + tajo.storage.appender-handler.raw.class + org.apache.tajo.storage.RawFile$RawFileAppender + + + + tajo.storage.appender-handler.rcfile.class + org.apache.tajo.storage.rcfile.RCFile$RCFileAppender + + + + tajo.storage.appender-handler.rowfile.class + org.apache.tajo.storage.RowFile$RowFileAppender + + + + tajo.storage.appender-handler.trevni.class + org.apache.tajo.storage.trevni.TrevniAppender + + + + tajo.storage.appender-handler.parquet.class + org.apache.tajo.storage.parquet.ParquetAppender + + + + tajo.storage.appender-handler.sequencefile.class + org.apache.tajo.storage.sequencefile.SequenceFileAppender + + + + tajo.storage.appender-handler.avro.class + org.apache.tajo.storage.avro.AvroAppender + + http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/pom.xml b/tajo-storage/tajo-storage-hbase/pom.xml new file mode 100644 index 0000000..e37149d --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/pom.xml @@ -0,0 +1,349 @@ + + + + + + tajo-project + org.apache.tajo + 0.9.1-SNAPSHOT + ../../tajo-project + + 4.0.0 + + tajo-storage-hbase + jar + Tajo HBase Storage + + UTF-8 + UTF-8 + + + + + repository.jboss.org + https://repository.jboss.org/nexus/content/repositories/releases/ + + + false + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.6 + 1.6 + ${project.build.sourceEncoding} + + + + org.apache.rat + apache-rat-plugin + + + verify + + check + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + TRUE + + -Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8 + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + + test-jar + + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + create-protobuf-generated-sources-directory + initialize + + + + + + + run + + + + + + org.codehaus.mojo + exec-maven-plugin + 1.2 + + + generate-sources + generate-sources + + protoc + + -Isrc/main/proto/ + --proto_path=../../tajo-common/src/main/proto + --proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto + --java_out=target/generated-sources/proto + src/main/proto/StorageFragmentProtos.proto + + + + exec + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.5 + + + add-source + generate-sources + + add-source + + + + target/generated-sources/proto + + + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + + + + + + + + org.apache.tajo + tajo-common + provided + + + org.apache.tajo + tajo-catalog-common + provided + + + org.apache.tajo + tajo-plan + provided + + + org.apache.tajo + tajo-storage-common + provided + + + + org.apache.hadoop + hadoop-common + provided + + + zookeeper + org.apache.zookeeper + + + slf4j-api + org.slf4j + + + jersey-json + com.sun.jersey + + + + + org.apache.hadoop + hadoop-hdfs + provided + + + commons-el + commons-el + + + tomcat + jasper-runtime + + + tomcat + jasper-compiler + + + org.mortbay.jetty + jsp-2.1-jetty + + + com.sun.jersey.jersey-test-framework + jersey-test-framework-grizzly2 + + + + + org.apache.hadoop + hadoop-minicluster + test + + + commons-el + commons-el + + + tomcat + jasper-runtime + + + tomcat + jasper-compiler + + + org.mortbay.jetty + jsp-2.1-jetty + + + com.sun.jersey.jersey-test-framework + jersey-test-framework-grizzly2 + + + hadoop-yarn-server-tests + org.apache.hadoop + + + hadoop-mapreduce-client-jobclient + org.apache.hadoop + + + hadoop-mapreduce-client-app + org.apache.hadoop + + + hadoop-yarn-api + org.apache.hadoop + + + hadoop-mapreduce-client-hs + org.apache.hadoop + + + hadoop-mapreduce-client-core + org.apache.hadoop + + + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + provided + + + com.google.protobuf + protobuf-java + + + junit + junit + test + + + org.apache.hbase + hbase-server + ${hbase.version} + provided + + + org.apache.hbase + hbase-client + ${hbase.version} + provided + + + + + + docs + + false + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + + + module-javadocs + package + + jar + + + ${project.build.directory} + + + + + + + + + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + 2.15 + + + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java new file mode 100644 index 0000000..8615235 --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.storage.Appender; +import org.apache.tajo.storage.TableStatistics; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.util.TUtil; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * An abstract class for HBase appender. + */ +public abstract class AbstractHBaseAppender implements Appender { + protected Configuration conf; + protected Schema schema; + protected TableMeta meta; + protected QueryUnitAttemptId taskAttemptId; + protected Path stagingDir; + protected boolean inited = false; + + protected ColumnMapping columnMapping; + protected TableStatistics stats; + protected boolean enabledStats; + + protected int columnNum; + + protected byte[][][] mappingColumnFamilies; + protected boolean[] isBinaryColumns; + protected boolean[] isRowKeyMappings; + protected boolean[] isColumnKeys; + protected boolean[] isColumnValues; + protected int[] rowKeyFieldIndexes; + protected int[] rowkeyColumnIndexes; + protected char rowKeyDelimiter; + + // the following four variables are used for ':key:' or ':value:' mapping + protected int[] columnKeyValueDataIndexes; + protected byte[][] columnKeyDatas; + protected byte[][] columnValueDatas; + protected byte[][] columnKeyCfNames; + + protected KeyValue[] keyValues; + + public AbstractHBaseAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, + Schema schema, TableMeta meta, Path stagingDir) { + this.conf = conf; + this.schema = schema; + this.meta = meta; + this.stagingDir = stagingDir; + this.taskAttemptId = taskAttemptId; + } + + @Override + public void init() throws IOException { + if (inited) { + throw new IllegalStateException("FileAppender is already initialized."); + } + inited = true; + if (enabledStats) { + stats = new TableStatistics(this.schema); + } + columnMapping = new ColumnMapping(schema, meta); + + mappingColumnFamilies = columnMapping.getMappingColumns(); + + isRowKeyMappings = columnMapping.getIsRowKeyMappings(); + List rowkeyColumnIndexList = new ArrayList(); + for (int i = 0; i < isRowKeyMappings.length; i++) { + if (isRowKeyMappings[i]) { + rowkeyColumnIndexList.add(i); + } + } + rowkeyColumnIndexes = TUtil.toArray(rowkeyColumnIndexList); + + isBinaryColumns = columnMapping.getIsBinaryColumns(); + isColumnKeys = columnMapping.getIsColumnKeys(); + isColumnValues = columnMapping.getIsColumnValues(); + rowKeyDelimiter = columnMapping.getRowKeyDelimiter(); + rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes(); + + this.columnNum = schema.size(); + + // In the case of ':key:' or ':value:' KeyValue object should be set with the qualifier and value + // which are mapped to the same column family. + columnKeyValueDataIndexes = new int[isColumnKeys.length]; + int index = 0; + int numKeyValues = 0; + Map cfNameIndexMap = new HashMap(); + for (int i = 0; i < isColumnKeys.length; i++) { + if (isRowKeyMappings[i]) { + continue; + } + if (isColumnKeys[i] || isColumnValues[i]) { + String cfName = new String(mappingColumnFamilies[i][0]); + if (!cfNameIndexMap.containsKey(cfName)) { + cfNameIndexMap.put(cfName, index); + columnKeyValueDataIndexes[i] = index; + index++; + numKeyValues++; + } else { + columnKeyValueDataIndexes[i] = cfNameIndexMap.get(cfName); + } + } else { + numKeyValues++; + } + } + columnKeyCfNames = new byte[cfNameIndexMap.size()][]; + for (Map.Entry entry: cfNameIndexMap.entrySet()) { + columnKeyCfNames[entry.getValue()] = entry.getKey().getBytes(); + } + columnKeyDatas = new byte[cfNameIndexMap.size()][]; + columnValueDatas = new byte[cfNameIndexMap.size()][]; + + keyValues = new KeyValue[numKeyValues]; + } + + private ByteArrayOutputStream bout = new ByteArrayOutputStream(); + + protected byte[] getRowKeyBytes(Tuple tuple) throws IOException { + Datum datum; + byte[] rowkey; + if (rowkeyColumnIndexes.length > 1) { + bout.reset(); + for (int i = 0; i < rowkeyColumnIndexes.length; i++) { + datum = tuple.get(rowkeyColumnIndexes[i]); + if (isBinaryColumns[rowkeyColumnIndexes[i]]) { + rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum); + } else { + rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum); + } + bout.write(rowkey); + if (i < rowkeyColumnIndexes.length - 1) { + bout.write(rowKeyDelimiter); + } + } + rowkey = bout.toByteArray(); + } else { + int index = rowkeyColumnIndexes[0]; + datum = tuple.get(index); + if (isBinaryColumns[index]) { + rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(index), datum); + } else { + rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(index), datum); + } + } + + return rowkey; + } + + protected void readKeyValues(Tuple tuple, byte[] rowkey) throws IOException { + int keyValIndex = 0; + for (int i = 0; i < columnNum; i++) { + if (isRowKeyMappings[i]) { + continue; + } + Datum datum = tuple.get(i); + byte[] value; + if (isBinaryColumns[i]) { + value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum); + } else { + value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum); + } + + if (isColumnKeys[i]) { + columnKeyDatas[columnKeyValueDataIndexes[i]] = value; + } else if (isColumnValues[i]) { + columnValueDatas[columnKeyValueDataIndexes[i]] = value; + } else { + keyValues[keyValIndex] = new KeyValue(rowkey, mappingColumnFamilies[i][0], mappingColumnFamilies[i][1], value); + keyValIndex++; + } + } + + for (int i = 0; i < columnKeyDatas.length; i++) { + keyValues[keyValIndex++] = new KeyValue(rowkey, columnKeyCfNames[i], columnKeyDatas[i], columnValueDatas[i]); + } + } + + @Override + public void enableStats() { + enabledStats = true; + } + + @Override + public TableStats getStats() { + if (enabledStats) { + return stats.getTableStat(); + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java new file mode 100644 index 0000000..79161cc --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.PlanningException; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.LogicalRootNode; +import org.apache.tajo.plan.logical.SortNode; +import org.apache.tajo.plan.logical.SortNode.SortPurpose; +import org.apache.tajo.plan.logical.UnaryNode; +import org.apache.tajo.plan.rewrite.RewriteRule; +import org.apache.tajo.plan.util.PlannerUtil; + +public class AddSortForInsertRewriter implements RewriteRule { + private int[] sortColumnIndexes; + private Column[] sortColumns; + public AddSortForInsertRewriter(TableDesc tableDesc, Column[] sortColumns) { + this.sortColumns = sortColumns; + this.sortColumnIndexes = new int[sortColumns.length]; + + Schema tableSchema = tableDesc.getSchema(); + for (int i = 0; i < sortColumns.length; i++) { + sortColumnIndexes[i] = tableSchema.getColumnId(sortColumns[i].getQualifiedName()); + } + } + + @Override + public String getName() { + return "AddSortForInsertRewriter"; + } + + @Override + public boolean isEligible(LogicalPlan plan) { + StoreType storeType = PlannerUtil.getStoreType(plan); + return storeType != null; + } + + @Override + public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException { + LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + UnaryNode insertNode = rootNode.getChild(); + LogicalNode childNode = insertNode.getChild(); + + Schema sortSchema = childNode.getOutSchema(); + SortNode sortNode = plan.createNode(SortNode.class); + sortNode.setSortPurpose(SortPurpose.STORAGE_SPECIFIED); + sortNode.setInSchema(sortSchema); + sortNode.setOutSchema(sortSchema); + + SortSpec[] sortSpecs = new SortSpec[sortColumns.length]; + int index = 0; + + for (int i = 0; i < sortColumnIndexes.length; i++) { + Column sortColumn = sortSchema.getColumn(sortColumnIndexes[i]); + if (sortColumn == null) { + throw new PlanningException("Can't fine proper sort column:" + sortColumns[i]); + } + sortSpecs[index++] = new SortSpec(sortColumn, true, true); + } + sortNode.setSortSpecs(sortSpecs); + + sortNode.setChild(insertNode.getChild()); + insertNode.setChild(sortNode); + plan.getRootBlock().registerNode(sortNode); + + return plan; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java new file mode 100644 index 0000000..7ddf09a --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.util.BytesUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class ColumnMapping { + private TableMeta tableMeta; + private Schema schema; + private char rowKeyDelimiter; + + private String hbaseTableName; + + private int[] rowKeyFieldIndexes; + private boolean[] isRowKeyMappings; + private boolean[] isBinaryColumns; + private boolean[] isColumnKeys; + private boolean[] isColumnValues; + + // schema order -> 0: cf name, 1: column name -> name bytes + private byte[][][] mappingColumns; + + private int numRowKeys; + + public ColumnMapping(Schema schema, TableMeta tableMeta) throws IOException { + this.schema = schema; + this.tableMeta = tableMeta; + + init(); + } + + public void init() throws IOException { + hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY); + String delim = tableMeta.getOption(HBaseStorageConstants.META_ROWKEY_DELIMITER, "").trim(); + if (delim.length() > 0) { + rowKeyDelimiter = delim.charAt(0); + } + isRowKeyMappings = new boolean[schema.size()]; + rowKeyFieldIndexes = new int[schema.size()]; + isBinaryColumns = new boolean[schema.size()]; + isColumnKeys = new boolean[schema.size()]; + isColumnValues = new boolean[schema.size()]; + + mappingColumns = new byte[schema.size()][][]; + + for (int i = 0; i < schema.size(); i++) { + rowKeyFieldIndexes[i] = -1; + } + + String columnMapping = tableMeta.getOption(HBaseStorageConstants.META_COLUMNS_KEY, ""); + if (columnMapping == null || columnMapping.isEmpty()) { + throw new IOException("'columns' property is required."); + } + + String[] columnMappingTokens = columnMapping.split(","); + + if (columnMappingTokens.length != schema.getColumns().size()) { + throw new IOException("The number of mapped HBase columns is great than the number of Tajo table columns"); + } + + int index = 0; + for (String eachToken: columnMappingTokens) { + mappingColumns[index] = new byte[2][]; + + byte[][] mappingTokens = BytesUtils.splitPreserveAllTokens(eachToken.trim().getBytes(), ':'); + + if (mappingTokens.length == 3) { + if (mappingTokens[0].length == 0) { + // cfname + throw new IOException(eachToken + " 'column' attribute should be ':key:' or ':key:#b' " + + "or ':value:' or ':value:#b'"); + } + //:key: or :value: + if (mappingTokens[2].length != 0) { + String binaryOption = new String(mappingTokens[2]); + if ("#b".equals(binaryOption)) { + isBinaryColumns[index] = true; + } else { + throw new IOException(eachToken + " 'column' attribute should be ':key:' or ':key:#b' " + + "or ':value:' or ':value:#b'"); + } + } + mappingColumns[index][0] = mappingTokens[0]; + String keyOrValue = new String(mappingTokens[1]); + if (HBaseStorageConstants.KEY_COLUMN_MAPPING.equalsIgnoreCase(keyOrValue)) { + isColumnKeys[index] = true; + } else if (HBaseStorageConstants.VALUE_COLUMN_MAPPING.equalsIgnoreCase(keyOrValue)) { + isColumnValues[index] = true; + } else { + throw new IOException(eachToken + " 'column' attribute should be ':key:' or ':value:'"); + } + } else if (mappingTokens.length == 2) { + //: or : or :key + String cfName = new String(mappingTokens[0]); + String columnName = new String(mappingTokens[1]); + RowKeyMapping rowKeyMapping = getRowKeyMapping(cfName, columnName); + if (rowKeyMapping != null) { + isRowKeyMappings[index] = true; + numRowKeys++; + isBinaryColumns[index] = rowKeyMapping.isBinary(); + if (!cfName.isEmpty()) { + if (rowKeyDelimiter == 0) { + throw new IOException("hbase.rowkey.delimiter is required."); + } + rowKeyFieldIndexes[index] = Integer.parseInt(cfName); + } else { + rowKeyFieldIndexes[index] = -1; //rowkey is mapped a single column. + } + } else { + if (cfName.isEmpty()) { + throw new IOException(eachToken + " 'column' attribute should be ':key:' or ':value:'"); + } + if (cfName != null) { + mappingColumns[index][0] = Bytes.toBytes(cfName); + } + + if (columnName != null && !columnName.isEmpty()) { + String[] columnNameTokens = columnName.split("#"); + if (columnNameTokens[0].isEmpty()) { + mappingColumns[index][1] = null; + } else { + mappingColumns[index][1] = Bytes.toBytes(columnNameTokens[0]); + } + if (columnNameTokens.length == 2 && "b".equals(columnNameTokens[1])) { + isBinaryColumns[index] = true; + } + } + } + } else { + throw new IOException(eachToken + " 'column' attribute '[cfname]:[qualfier]:'"); + } + + index++; + } // for loop + } + + public List getColumnFamilyNames() { + List cfNames = new ArrayList(); + + for (byte[][] eachCfName: mappingColumns) { + if (eachCfName != null && eachCfName.length > 0 && eachCfName[0] != null) { + String cfName = new String(eachCfName[0]); + if (!cfNames.contains(cfName)) { + cfNames.add(cfName); + } + } + } + + return cfNames; + } + + private RowKeyMapping getRowKeyMapping(String cfName, String columnName) { + if (columnName == null || columnName.isEmpty()) { + return null; + } + + String[] tokens = columnName.split("#"); + if (!tokens[0].equalsIgnoreCase(HBaseStorageConstants.KEY_COLUMN_MAPPING)) { + return null; + } + + RowKeyMapping rowKeyMapping = new RowKeyMapping(); + + if (tokens.length == 2 && "b".equals(tokens[1])) { + rowKeyMapping.setBinary(true); + } + + if (cfName != null && !cfName.isEmpty()) { + rowKeyMapping.setKeyFieldIndex(Integer.parseInt(cfName)); + } + return rowKeyMapping; + } + + public char getRowKeyDelimiter() { + return rowKeyDelimiter; + } + + public int[] getRowKeyFieldIndexes() { + return rowKeyFieldIndexes; + } + + public boolean[] getIsRowKeyMappings() { + return isRowKeyMappings; + } + + public byte[][][] getMappingColumns() { + return mappingColumns; + } + + public Schema getSchema() { + return schema; + } + + public boolean[] getIsBinaryColumns() { + return isBinaryColumns; + } + + public String getHbaseTableName() { + return hbaseTableName; + } + + public boolean[] getIsColumnKeys() { + return isColumnKeys; + } + + public int getNumRowKeys() { + return numRowKeys; + } + + public boolean[] getIsColumnValues() { + return isColumnValues; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java new file mode 100644 index 0000000..c05c5bb --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.util.Bytes; + +import java.io.IOException; + +public class HBaseBinarySerializerDeserializer { + + public static Datum deserialize(Column col, byte[] bytes) throws IOException { + Datum datum; + switch (col.getDataType().getType()) { + case INT1: + case INT2: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt2(Bytes.toShort(bytes)); + break; + case INT4: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt4(Bytes.toInt(bytes)); + break; + case INT8: + if (bytes.length == 4) { + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt8(Bytes.toInt(bytes)); + } else { + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt8(Bytes.toLong(bytes)); + } + break; + case FLOAT4: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createFloat4(Bytes.toFloat(bytes)); + break; + case FLOAT8: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createFloat8(Bytes.toDouble(bytes)); + break; + case TEXT: + datum = bytes == null ? NullDatum.get() : DatumFactory.createText(bytes); + break; + default: + datum = NullDatum.get(); + break; + } + return datum; + } + + public static byte[] serialize(Column col, Datum datum) throws IOException { + if (datum == null || datum instanceof NullDatum) { + return null; + } + + byte[] bytes; + switch (col.getDataType().getType()) { + case INT1: + case INT2: + bytes = Bytes.toBytes(datum.asInt2()); + break; + case INT4: + bytes = Bytes.toBytes(datum.asInt4()); + break; + case INT8: + bytes = Bytes.toBytes(datum.asInt8()); + break; + case FLOAT4: + bytes = Bytes.toBytes(datum.asFloat4()); + break; + case FLOAT8: + bytes = Bytes.toBytes(datum.asFloat8()); + break; + case TEXT: + bytes = Bytes.toBytes(datum.asChars()); + break; + default: + bytes = null; + break; + } + + return bytes; + } +}