Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 707F2106DA for ; Wed, 3 Dec 2014 05:30:17 +0000 (UTC) Received: (qmail 60651 invoked by uid 500); 3 Dec 2014 05:30:17 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 60557 invoked by uid 500); 3 Dec 2014 05:30:17 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 59981 invoked by uid 99); 3 Dec 2014 05:30:16 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Dec 2014 05:30:16 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 80F21A07813; Wed, 3 Dec 2014 05:30:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: hjkim@apache.org To: commits@tajo.apache.org Date: Wed, 03 Dec 2014 05:30:30 -0000 Message-Id: <4a110e041d78467aa3b266f4a8e0f0c5@git.apache.org> In-Reply-To: <5912497907094a69ad14fc86c9c0cea3@git.apache.org> References: <5912497907094a69ad14fc86c9c0cea3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project structure. http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java b/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java deleted file mode 100644 index c43ba38..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java +++ /dev/null @@ -1,577 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import com.google.common.collect.Lists; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.catalog.*; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.datum.ProtobufDatum; -import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; -import org.apache.tajo.storage.BaseTupleComparator; -import org.apache.tajo.storage.RowStoreUtil; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; -import org.apache.tajo.unit.StorageUnit; -import org.apache.tajo.util.FileUtil; -import org.apache.tajo.util.ProtoUtil; -import org.junit.Test; - -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.List; - -import static org.apache.tajo.common.TajoDataTypes.Type; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class TestOffHeapRowBlock { - private static final Log LOG = LogFactory.getLog(TestOffHeapRowBlock.class); - public static String UNICODE_FIELD_PREFIX = "abc_가나다_"; - public static Schema schema; - - static { - schema = new Schema(); - schema.addColumn("col0", Type.BOOLEAN); - schema.addColumn("col1", Type.INT2); - schema.addColumn("col2", Type.INT4); - schema.addColumn("col3", Type.INT8); - schema.addColumn("col4", Type.FLOAT4); - schema.addColumn("col5", Type.FLOAT8); - schema.addColumn("col6", Type.TEXT); - schema.addColumn("col7", Type.TIMESTAMP); - schema.addColumn("col8", Type.DATE); - schema.addColumn("col9", Type.TIME); - schema.addColumn("col10", Type.INTERVAL); - schema.addColumn("col11", Type.INET4); - schema.addColumn("col12", - CatalogUtil.newDataType(TajoDataTypes.Type.PROTOBUF, PrimitiveProtos.StringProto.class.getName())); - } - - private void explainRowBlockAllocation(OffHeapRowBlock rowBlock, long startTime, long endTime) { - LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated " - + (endTime - startTime) + " msec"); - } - - @Test - public void testPutAndReadValidation() { - int rowNum = 1000; - - long allocStart = System.currentTimeMillis(); - OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024); - long allocEnd = System.currentTimeMillis(); - explainRowBlockAllocation(rowBlock, allocStart, allocEnd); - - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); - - ZeroCopyTuple tuple = new ZeroCopyTuple(); - long writeStart = System.currentTimeMillis(); - for (int i = 0; i < rowNum; i++) { - fillRow(i, rowBlock.getWriter()); - - reader.reset(); - int j = 0; - while(reader.next(tuple)) { - validateTupleResult(j, tuple); - - j++; - } - } - long writeEnd = System.currentTimeMillis(); - LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec"); - - long readStart = System.currentTimeMillis(); - tuple = new ZeroCopyTuple(); - int j = 0; - reader.reset(); - while(reader.next(tuple)) { - validateTupleResult(j, tuple); - j++; - } - long readEnd = System.currentTimeMillis(); - LOG.info("reading takes " + (readEnd - readStart) + " msec"); - - rowBlock.release(); - } - - @Test - public void testNullityValidation() { - int rowNum = 1000; - - long allocStart = System.currentTimeMillis(); - OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024); - long allocEnd = System.currentTimeMillis(); - explainRowBlockAllocation(rowBlock, allocStart, allocEnd); - - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); - ZeroCopyTuple tuple = new ZeroCopyTuple(); - long writeStart = System.currentTimeMillis(); - for (int i = 0; i < rowNum; i++) { - fillRowBlockWithNull(i, rowBlock.getWriter()); - - reader.reset(); - int j = 0; - while(reader.next(tuple)) { - validateNullity(j, tuple); - - j++; - } - } - long writeEnd = System.currentTimeMillis(); - LOG.info("writing and nullity validating take " + (writeEnd - writeStart) +" msec"); - - long readStart = System.currentTimeMillis(); - tuple = new ZeroCopyTuple(); - int j = 0; - reader.reset(); - while(reader.next(tuple)) { - validateNullity(j, tuple); - - j++; - } - long readEnd = System.currentTimeMillis(); - LOG.info("reading takes " + (readEnd - readStart) + " msec"); - - rowBlock.release(); - } - - @Test - public void testEmptyRow() { - int rowNum = 1000; - - long allocStart = System.currentTimeMillis(); - OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 10); - long allocEnd = System.currentTimeMillis(); - explainRowBlockAllocation(rowBlock, allocStart, allocEnd); - - long writeStart = System.currentTimeMillis(); - for (int i = 0; i < rowNum; i++) { - rowBlock.getWriter().startRow(); - // empty columns - rowBlock.getWriter().endRow(); - } - long writeEnd = System.currentTimeMillis(); - LOG.info("writing tooks " + (writeEnd - writeStart) + " msec"); - - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); - - long readStart = System.currentTimeMillis(); - ZeroCopyTuple tuple = new ZeroCopyTuple(); - int j = 0; - reader.reset(); - while(reader.next(tuple)) { - j++; - } - long readEnd = System.currentTimeMillis(); - LOG.info("reading takes " + (readEnd - readStart) + " msec"); - rowBlock.release(); - - assertEquals(rowNum, j); - assertEquals(rowNum, rowBlock.rows()); - } - - @Test - public void testSortBenchmark() { - int rowNum = 1000; - - OffHeapRowBlock rowBlock = createRowBlock(rowNum); - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); - - List unSafeTuples = Lists.newArrayList(); - - long readStart = System.currentTimeMillis(); - ZeroCopyTuple tuple = new ZeroCopyTuple(); - reader.reset(); - while(reader.next(tuple)) { - unSafeTuples.add(tuple); - tuple = new ZeroCopyTuple(); - } - long readEnd = System.currentTimeMillis(); - LOG.info("reading takes " + (readEnd - readStart) + " msec"); - - SortSpec sortSpec = new SortSpec(new Column("col2", Type.INT4)); - BaseTupleComparator comparator = new BaseTupleComparator(schema, new SortSpec[] {sortSpec}); - - long sortStart = System.currentTimeMillis(); - Collections.sort(unSafeTuples, comparator); - long sortEnd = System.currentTimeMillis(); - LOG.info("sorting took " + (sortEnd - sortStart) + " msec"); - rowBlock.release(); - } - - @Test - public void testVTuplePutAndGetBenchmark() { - int rowNum = 1000; - - List rowBlock = Lists.newArrayList(); - long writeStart = System.currentTimeMillis(); - VTuple tuple; - for (int i = 0; i < rowNum; i++) { - tuple = new VTuple(schema.size()); - fillVTuple(i, tuple); - rowBlock.add(tuple); - } - long writeEnd = System.currentTimeMillis(); - LOG.info("Writing takes " + (writeEnd - writeStart) + " msec"); - - long readStart = System.currentTimeMillis(); - int j = 0; - for (VTuple t : rowBlock) { - validateTupleResult(j, t); - j++; - } - long readEnd = System.currentTimeMillis(); - LOG.info("reading takes " + (readEnd - readStart) + " msec"); - - int count = 0; - for (int l = 0; l < rowBlock.size(); l++) { - for(int m = 0; m < schema.size(); m++ ) { - if (rowBlock.get(l).contains(m) && rowBlock.get(l).get(m).type() == Type.INT4) { - count ++; - } - } - } - // For preventing unnecessary code elimination optimization. - LOG.info("The number of INT4 values is " + count + "."); - } - - @Test - public void testVTuplePutAndGetBenchmarkViaDirectRowEncoder() { - int rowNum = 1000; - - OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 100); - - long writeStart = System.currentTimeMillis(); - VTuple tuple = new VTuple(schema.size()); - for (int i = 0; i < rowNum; i++) { - fillVTuple(i, tuple); - - RowStoreUtil.convert(tuple, rowBlock.getWriter()); - } - long writeEnd = System.currentTimeMillis(); - LOG.info("Writing takes " + (writeEnd - writeStart) + " msec"); - - validateResults(rowBlock); - rowBlock.release(); - } - - @Test - public void testSerDerOfRowBlock() { - int rowNum = 1000; - - OffHeapRowBlock rowBlock = createRowBlock(rowNum); - - ByteBuffer bb = rowBlock.nioBuffer(); - OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb); - validateResults(restoredRowBlock); - rowBlock.release(); - } - - @Test - public void testSerDerOfZeroCopyTuple() { - int rowNum = 1000; - - OffHeapRowBlock rowBlock = createRowBlock(rowNum); - - ByteBuffer bb = rowBlock.nioBuffer(); - OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb); - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(restoredRowBlock); - - long readStart = System.currentTimeMillis(); - ZeroCopyTuple tuple = new ZeroCopyTuple(); - ZeroCopyTuple copyTuple = new ZeroCopyTuple(); - int j = 0; - reader.reset(); - while(reader.next(tuple)) { - ByteBuffer copy = tuple.nioBuffer(); - copyTuple.set(copy, SchemaUtil.toDataTypes(schema)); - - validateTupleResult(j, copyTuple); - - j++; - } - long readEnd = System.currentTimeMillis(); - LOG.info("reading takes " + (readEnd - readStart) + " msec"); - - rowBlock.release(); - } - - public static OffHeapRowBlock createRowBlock(int rowNum) { - long allocateStart = System.currentTimeMillis(); - OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8); - long allocatedEnd = System.currentTimeMillis(); - LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated " - + (allocatedEnd - allocateStart) + " msec"); - - long writeStart = System.currentTimeMillis(); - for (int i = 0; i < rowNum; i++) { - fillRow(i, rowBlock.getWriter()); - } - long writeEnd = System.currentTimeMillis(); - LOG.info("writing takes " + (writeEnd - writeStart) + " msec"); - - return rowBlock; - } - - public static OffHeapRowBlock createRowBlockWithNull(int rowNum) { - long allocateStart = System.currentTimeMillis(); - OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8); - long allocatedEnd = System.currentTimeMillis(); - LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated " - + (allocatedEnd - allocateStart) + " msec"); - - long writeStart = System.currentTimeMillis(); - for (int i = 0; i < rowNum; i++) { - fillRowBlockWithNull(i, rowBlock.getWriter()); - } - long writeEnd = System.currentTimeMillis(); - LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec"); - - return rowBlock; - } - - public static void fillRow(int i, RowWriter builder) { - builder.startRow(); - builder.putBool(i % 1 == 0 ? true : false); // 0 - builder.putInt2((short) 1); // 1 - builder.putInt4(i); // 2 - builder.putInt8(i); // 3 - builder.putFloat4(i); // 4 - builder.putFloat8(i); // 5 - builder.putText((UNICODE_FIELD_PREFIX + i).getBytes()); // 6 - builder.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7 - builder.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8 - builder.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9 - builder.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10 - builder.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11 - builder.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12 - builder.endRow(); - } - - public static void fillRowBlockWithNull(int i, RowWriter writer) { - writer.startRow(); - - if (i == 0) { - writer.skipField(); - } else { - writer.putBool(i % 1 == 0 ? true : false); // 0 - } - if (i % 1 == 0) { - writer.skipField(); - } else { - writer.putInt2((short) 1); // 1 - } - - if (i % 2 == 0) { - writer.skipField(); - } else { - writer.putInt4(i); // 2 - } - - if (i % 3 == 0) { - writer.skipField(); - } else { - writer.putInt8(i); // 3 - } - - if (i % 4 == 0) { - writer.skipField(); - } else { - writer.putFloat4(i); // 4 - } - - if (i % 5 == 0) { - writer.skipField(); - } else { - writer.putFloat8(i); // 5 - } - - if (i % 6 == 0) { - writer.skipField(); - } else { - writer.putText((UNICODE_FIELD_PREFIX + i).getBytes()); // 6 - } - - if (i % 7 == 0) { - writer.skipField(); - } else { - writer.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7 - } - - if (i % 8 == 0) { - writer.skipField(); - } else { - writer.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8 - } - - if (i % 9 == 0) { - writer.skipField(); - } else { - writer.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9 - } - - if (i % 10 == 0) { - writer.skipField(); - } else { - writer.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10 - } - - if (i % 11 == 0) { - writer.skipField(); - } else { - writer.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11 - } - - if (i % 12 == 0) { - writer.skipField(); - } else { - writer.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12 - } - - writer.endRow(); - } - - public static void fillVTuple(int i, VTuple tuple) { - tuple.put(0, DatumFactory.createBool(i % 1 == 0)); - tuple.put(1, DatumFactory.createInt2((short) 1)); - tuple.put(2, DatumFactory.createInt4(i)); - tuple.put(3, DatumFactory.createInt8(i)); - tuple.put(4, DatumFactory.createFloat4(i)); - tuple.put(5, DatumFactory.createFloat8(i)); - tuple.put(6, DatumFactory.createText((UNICODE_FIELD_PREFIX + i).getBytes())); - tuple.put(7, DatumFactory.createTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i)); // 7 - tuple.put(8, DatumFactory.createDate(DatumFactory.createDate("2014-04-16").asInt4() + i)); // 8 - tuple.put(9, DatumFactory.createTime(DatumFactory.createTime("08:48:00").asInt8() + i)); // 9 - tuple.put(10, DatumFactory.createInterval((i + 1) + " hours")); // 10 - tuple.put(11, DatumFactory.createInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i)); // 11 - tuple.put(12, new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12; - } - - public static void validateResults(OffHeapRowBlock rowBlock) { - long readStart = System.currentTimeMillis(); - ZeroCopyTuple tuple = new ZeroCopyTuple(); - int j = 0; - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); - reader.reset(); - while(reader.next(tuple)) { - validateTupleResult(j, tuple); - j++; - } - long readEnd = System.currentTimeMillis(); - LOG.info("Reading takes " + (readEnd - readStart) + " msec"); - } - - public static void validateTupleResult(int j, Tuple t) { - assertTrue((j % 1 == 0) == t.getBool(0)); - assertTrue(1 == t.getInt2(1)); - assertEquals(j, t.getInt4(2)); - assertEquals(j, t.getInt8(3)); - assertTrue(j == t.getFloat4(4)); - assertTrue(j == t.getFloat8(5)); - assertEquals(new String(UNICODE_FIELD_PREFIX + j), t.getText(6)); - assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, t.getInt8(7)); - assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, t.getInt4(8)); - assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, t.getInt8(9)); - assertEquals(DatumFactory.createInterval((j + 1) + " hours"), t.getInterval(10)); - assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, t.getInt4(11)); - assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), t.getProtobufDatum(12)); - } - - public static void validateNullity(int j, Tuple tuple) { - if (j == 0) { - tuple.isNull(0); - } else { - assertTrue((j % 1 == 0) == tuple.getBool(0)); - } - - if (j % 1 == 0) { - tuple.isNull(1); - } else { - assertTrue(1 == tuple.getInt2(1)); - } - - if (j % 2 == 0) { - tuple.isNull(2); - } else { - assertEquals(j, tuple.getInt4(2)); - } - - if (j % 3 == 0) { - tuple.isNull(3); - } else { - assertEquals(j, tuple.getInt8(3)); - } - - if (j % 4 == 0) { - tuple.isNull(4); - } else { - assertTrue(j == tuple.getFloat4(4)); - } - - if (j % 5 == 0) { - tuple.isNull(5); - } else { - assertTrue(j == tuple.getFloat8(5)); - } - - if (j % 6 == 0) { - tuple.isNull(6); - } else { - assertEquals(new String(UNICODE_FIELD_PREFIX + j), tuple.getText(6)); - } - - if (j % 7 == 0) { - tuple.isNull(7); - } else { - assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, tuple.getInt8(7)); - } - - if (j % 8 == 0) { - tuple.isNull(8); - } else { - assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, tuple.getInt4(8)); - } - - if (j % 9 == 0) { - tuple.isNull(9); - } else { - assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, tuple.getInt8(9)); - } - - if (j % 10 == 0) { - tuple.isNull(10); - } else { - assertEquals(DatumFactory.createInterval((j + 1) + " hours"), tuple.getInterval(10)); - } - - if (j % 11 == 0) { - tuple.isNull(11); - } else { - assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, tuple.getInt4(11)); - } - - if (j % 12 == 0) { - tuple.isNull(12); - } else { - assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), tuple.getProtobufDatum(12)); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java b/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java deleted file mode 100644 index 1eb9c17..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import org.apache.tajo.unit.StorageUnit; -import org.junit.Test; - -import static org.junit.Assert.*; - -public class TestResizableSpec { - - @Test - public void testResizableLimit() { - ResizableLimitSpec limit = new ResizableLimitSpec(10 * StorageUnit.MB, 1000 * StorageUnit.MB, 0.1f, 1.0f); - - long expectedMaxSize = (long) (1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f)); - - assertTrue(limit.limit() == 1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f)); - - assertEquals(20971520, limit.increasedSize(10 * StorageUnit.MB)); - - assertEquals(expectedMaxSize, limit.increasedSize(1600 * StorageUnit.MB)); - - assertEquals(0.98f, limit.remainRatio(980 * StorageUnit.MB), 0.1); - - assertFalse(limit.canIncrease(limit.limit())); - } - - @Test - public void testFixedLimit() { - FixedSizeLimitSpec limit = new FixedSizeLimitSpec(100 * StorageUnit.MB, 0.0f); - - assertEquals(limit.limit(), 100 * StorageUnit.MB); - - assertEquals(100 * StorageUnit.MB, limit.increasedSize(1000)); - - assertEquals(100 * StorageUnit.MB, limit.increasedSize(1600 * StorageUnit.MB)); - - assertTrue(0.98f == limit.remainRatio(98 * StorageUnit.MB)); - - assertFalse(limit.canIncrease(limit.limit())); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/resources/storage-default.xml b/tajo-storage/src/test/resources/storage-default.xml deleted file mode 100644 index 790d5a8..0000000 --- a/tajo-storage/src/test/resources/storage-default.xml +++ /dev/null @@ -1,154 +0,0 @@ - - - - - - - - fs.s3.impl - org.apache.tajo.storage.s3.SmallBlockS3FileSystem - - - - - tajo.storage.scanner-handler - textfile,csv,raw,rcfile,row,parquet,sequencefile,avro - - - - - tajo.storage.fragment.textfile.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.csv.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.raw.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.rcfile.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.row.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.parquet.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.sequencefile.class - org.apache.tajo.storage.fragment.FileFragment - - - tajo.storage.fragment.avro.class - org.apache.tajo.storage.fragment.FileFragment - - - - - tajo.storage.scanner-handler.textfile.class - org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner - - - - tajo.storage.scanner-handler.csv.class - org.apache.tajo.storage.CSVFile$CSVScanner - - - - tajo.storage.scanner-handler.raw.class - org.apache.tajo.storage.RawFile$RawFileScanner - - - - tajo.storage.scanner-handler.rcfile.class - org.apache.tajo.storage.rcfile.RCFile$RCFileScanner - - - - tajo.storage.scanner-handler.rowfile.class - org.apache.tajo.storage.RowFile$RowFileScanner - - - - tajo.storage.scanner-handler.parquet.class - org.apache.tajo.storage.parquet.ParquetScanner - - - - tajo.storage.scanner-handler.sequencefile.class - org.apache.tajo.storage.sequencefile.SequenceFileScanner - - - - tajo.storage.scanner-handler.avro.class - org.apache.tajo.storage.avro.AvroScanner - - - - - tajo.storage.appender-handler - textfile,csv,raw,rcfile,row,parquet,sequencefile,avro - - - - tajo.storage.appender-handler.textfile.class - org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender - - - - tajo.storage.appender-handler.csv.class - org.apache.tajo.storage.CSVFile$CSVAppender - - - - tajo.storage.appender-handler.raw.class - org.apache.tajo.storage.RawFile$RawFileAppender - - - - tajo.storage.appender-handler.rcfile.class - org.apache.tajo.storage.rcfile.RCFile$RCFileAppender - - - - tajo.storage.appender-handler.rowfile.class - org.apache.tajo.storage.RowFile$RowFileAppender - - - - tajo.storage.appender-handler.parquet.class - org.apache.tajo.storage.parquet.ParquetAppender - - - - tajo.storage.appender-handler.sequencefile.class - org.apache.tajo.storage.sequencefile.SequenceFileAppender - - - - tajo.storage.appender-handler.avro.class - org.apache.tajo.storage.avro.AvroAppender - - http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/resources/testVariousTypes.avsc ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/resources/testVariousTypes.avsc b/tajo-storage/src/test/resources/testVariousTypes.avsc deleted file mode 100644 index 611b97f..0000000 --- a/tajo-storage/src/test/resources/testVariousTypes.avsc +++ /dev/null @@ -1,21 +0,0 @@ -{ - "type": "record", - "namespace": "org.apache.tajo", - "name": "testVariousTypes", - "fields": [ - { "name": "col1", "type": "boolean" }, - { "name": "col2", "type": "int" }, - { "name": "col3", "type": "string" }, - { "name": "col4", "type": "int" }, - { "name": "col5", "type": "int" }, - { "name": "col6", "type": "long" }, - { "name": "col7", "type": "float" }, - { "name": "col8", "type": "double" }, - { "name": "col9", "type": "string" }, - { "name": "col10", "type": "bytes" }, - { "name": "col11", "type": "bytes" }, - { "name": "col12", "type": "null" }, - { "name": "col13", "type": "bytes" } - ] -} - http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/pom.xml b/tajo-storage/tajo-storage-common/pom.xml new file mode 100644 index 0000000..c600b4b --- /dev/null +++ b/tajo-storage/tajo-storage-common/pom.xml @@ -0,0 +1,337 @@ + + + + + + tajo-project + org.apache.tajo + 0.9.1-SNAPSHOT + ../../tajo-project + + 4.0.0 + tajo-storage-common + jar + Tajo Storage Common + + UTF-8 + UTF-8 + + + + + repository.jboss.org + https://repository.jboss.org/nexus/content/repositories/releases/ + + + false + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.6 + 1.6 + ${project.build.sourceEncoding} + + + + org.apache.rat + apache-rat-plugin + + + verify + + check + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + TRUE + + -Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8 + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + + test-jar + + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + create-protobuf-generated-sources-directory + initialize + + + + + + + run + + + + + + org.codehaus.mojo + exec-maven-plugin + 1.2 + + + generate-sources + generate-sources + + protoc + + -Isrc/main/proto/ + --proto_path=../../tajo-common/src/main/proto + --proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto + --java_out=target/generated-sources/proto + src/main/proto/IndexProtos.proto + + + + exec + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.5 + + + add-source + generate-sources + + add-source + + + + target/generated-sources/proto + + + + + + + org.apache.maven.plugins + maven-pmd-plugin + 2.7.1 + + + org.apache.maven.plugins + maven-surefire-report-plugin + + + + + + + + org.apache.tajo + tajo-common + + + org.apache.tajo + tajo-catalog-common + + + org.apache.tajo + tajo-plan + + + + org.apache.hadoop + hadoop-common + provided + + + zookeeper + org.apache.zookeeper + + + slf4j-api + org.slf4j + + + jersey-json + com.sun.jersey + + + + + org.apache.hadoop + hadoop-hdfs + provided + + + commons-el + commons-el + + + tomcat + jasper-runtime + + + tomcat + jasper-compiler + + + org.mortbay.jetty + jsp-2.1-jetty + + + com.sun.jersey.jersey-test-framework + jersey-test-framework-grizzly2 + + + + + org.apache.hadoop + hadoop-minicluster + test + + + commons-el + commons-el + + + tomcat + jasper-runtime + + + tomcat + jasper-compiler + + + org.mortbay.jetty + jsp-2.1-jetty + + + com.sun.jersey.jersey-test-framework + jersey-test-framework-grizzly2 + + + hadoop-yarn-server-tests + org.apache.hadoop + + + hadoop-mapreduce-client-jobclient + org.apache.hadoop + + + hadoop-mapreduce-client-app + org.apache.hadoop + + + hadoop-yarn-api + org.apache.hadoop + + + hadoop-mapreduce-client-hs + org.apache.hadoop + + + hadoop-mapreduce-client-core + org.apache.hadoop + + + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + provided + + + com.google.protobuf + protobuf-java + + + junit + junit + test + + + io.netty + netty-buffer + + + + + + docs + + false + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + + + module-javadocs + package + + jar + + + ${project.build.directory} + + + + + + + + + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + 2.15 + + + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java new file mode 100644 index 0000000..c5e96ac --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.catalog.statistics.TableStats; + +import java.io.Closeable; +import java.io.IOException; + +public interface Appender extends Closeable { + + void init() throws IOException; + + void addTuple(Tuple t) throws IOException; + + void flush() throws IOException; + + long getEstimatedOutputSize() throws IOException; + + void close() throws IOException; + + void enableStats(); + + TableStats getStats(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java new file mode 100644 index 0000000..b829f60 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.common.ProtoObject; +import org.apache.tajo.datum.Datum; + +import static org.apache.tajo.catalog.proto.CatalogProtos.TupleComparatorSpecProto; +import static org.apache.tajo.index.IndexProtos.TupleComparatorProto; + +/** + * The Comparator class for Tuples + * + * @see Tuple + */ +public class BaseTupleComparator extends TupleComparator implements ProtoObject { + private final Schema schema; + private final SortSpec [] sortSpecs; + private final int[] sortKeyIds; + private final boolean[] asc; + @SuppressWarnings("unused") + private final boolean[] nullFirsts; + + private Datum left; + private Datum right; + private int compVal; + + /** + * @param schema The schema of input tuples + * @param sortKeys The description of sort keys + */ + public BaseTupleComparator(Schema schema, SortSpec[] sortKeys) { + Preconditions.checkArgument(sortKeys.length > 0, + "At least one sort key must be specified."); + + this.schema = schema; + this.sortSpecs = sortKeys; + this.sortKeyIds = new int[sortKeys.length]; + this.asc = new boolean[sortKeys.length]; + this.nullFirsts = new boolean[sortKeys.length]; + for (int i = 0; i < sortKeys.length; i++) { + if (sortKeys[i].getSortKey().hasQualifier()) { + this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName()); + } else { + this.sortKeyIds[i] = schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName()); + } + + this.asc[i] = sortKeys[i].isAscending(); + this.nullFirsts[i]= sortKeys[i].isNullFirst(); + } + } + + public BaseTupleComparator(TupleComparatorProto proto) { + this.schema = new Schema(proto.getSchema()); + + this.sortSpecs = new SortSpec[proto.getSortSpecsCount()]; + for (int i = 0; i < proto.getSortSpecsCount(); i++) { + sortSpecs[i] = new SortSpec(proto.getSortSpecs(i)); + } + + this.sortKeyIds = new int[proto.getCompSpecsCount()]; + this.asc = new boolean[proto.getCompSpecsCount()]; + this.nullFirsts = new boolean[proto.getCompSpecsCount()]; + + for (int i = 0; i < proto.getCompSpecsCount(); i++) { + TupleComparatorSpecProto sortSepcProto = proto.getCompSpecs(i); + sortKeyIds[i] = sortSepcProto.getColumnId(); + asc[i] = sortSepcProto.getAscending(); + nullFirsts[i] = sortSepcProto.getNullFirst(); + } + } + + public Schema getSchema() { + return schema; + } + + public SortSpec [] getSortSpecs() { + return sortSpecs; + } + + public int [] getSortKeyIds() { + return sortKeyIds; + } + + @Override + public boolean isAscendingFirstKey() { + return this.asc[0]; + } + + @Override + public int compare(Tuple tuple1, Tuple tuple2) { + for (int i = 0; i < sortKeyIds.length; i++) { + left = tuple1.get(sortKeyIds[i]); + right = tuple2.get(sortKeyIds[i]); + + if (left.isNull() || right.isNull()) { + if (!left.equals(right)) { + if (left.isNull()) { + compVal = 1; + } else if (right.isNull()) { + compVal = -1; + } + if (nullFirsts[i]) { + if (compVal != 0) { + compVal *= -1; + } + } + } else { + compVal = 0; + } + } else { + if (asc[i]) { + compVal = left.compareTo(right); + } else { + compVal = right.compareTo(left); + } + } + + if (compVal < 0 || compVal > 0) { + return compVal; + } + } + return 0; + } + + @Override + public int hashCode() { + return Objects.hashCode(sortKeyIds); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof BaseTupleComparator) { + BaseTupleComparator other = (BaseTupleComparator) obj; + if (sortKeyIds.length != other.sortKeyIds.length) { + return false; + } + + for (int i = 0; i < sortKeyIds.length; i++) { + if (sortKeyIds[i] != other.sortKeyIds[i] || + asc[i] != other.asc[i] || + nullFirsts[i] != other.nullFirsts[i]) { + return false; + } + } + + return true; + } else { + return false; + } + } + + @Override + public TupleComparatorProto getProto() { + TupleComparatorProto.Builder builder = TupleComparatorProto.newBuilder(); + builder.setSchema(schema.getProto()); + for (int i = 0; i < sortSpecs.length; i++) { + builder.addSortSpecs(sortSpecs[i].getProto()); + } + + TupleComparatorSpecProto.Builder sortSpecBuilder; + for (int i = 0; i < sortKeyIds.length; i++) { + sortSpecBuilder = TupleComparatorSpecProto.newBuilder(); + sortSpecBuilder.setColumnId(sortKeyIds[i]); + sortSpecBuilder.setAscending(asc[i]); + sortSpecBuilder.setNullFirst(nullFirsts[i]); + builder.addCompSpecs(sortSpecBuilder); + } + + return builder.build(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + + String prefix = ""; + for (int i = 0; i < sortKeyIds.length; i++) { + sb.append(prefix).append("SortKeyId=").append(sortKeyIds[i]) + .append(",Asc=").append(asc[i]) + .append(",NullFirst=").append(nullFirsts[i]); + prefix = " ,"; + } + return sb.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java new file mode 100644 index 0000000..00112e7 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.base.Preconditions; +import com.google.protobuf.Message; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.datum.*; +import org.apache.tajo.util.Bytes; + +import java.io.IOException; +import java.io.OutputStream; + +@Deprecated +public class BinarySerializerDeserializer implements SerializerDeserializer { + + static final byte[] INVALID_UTF__SINGLE_BYTE = {(byte) Integer.parseInt("10111111", 2)}; + + @Override + public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) + throws IOException { + byte[] bytes; + int length = 0; + if (datum == null || datum instanceof NullDatum) { + return 0; + } + + switch (col.getDataType().getType()) { + case BOOLEAN: + case BIT: + case CHAR: + bytes = datum.asByteArray(); + length = bytes.length; + out.write(bytes, 0, length); + break; + case INT2: + length = writeShort(out, datum.asInt2()); + break; + case INT4: + length = writeVLong(out, datum.asInt4()); + break; + case INT8: + length = writeVLong(out, datum.asInt8()); + break; + case FLOAT4: + length = writeFloat(out, datum.asFloat4()); + break; + case FLOAT8: + length = writeDouble(out, datum.asFloat8()); + break; + case TEXT: { + bytes = datum.asTextBytes(); + length = datum.size(); + if (length == 0) { + bytes = INVALID_UTF__SINGLE_BYTE; + length = INVALID_UTF__SINGLE_BYTE.length; + } + out.write(bytes, 0, bytes.length); + break; + } + case BLOB: + case INET4: + case INET6: + bytes = datum.asByteArray(); + length = bytes.length; + out.write(bytes, 0, length); + break; + case PROTOBUF: + ProtobufDatum protobufDatum = (ProtobufDatum) datum; + bytes = protobufDatum.asByteArray(); + length = bytes.length; + out.write(bytes, 0, length); + break; + case NULL_TYPE: + break; + default: + throw new IOException("Does not support type"); + } + return length; + } + + @Override + public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException { + if (length == 0) return NullDatum.get(); + + Datum datum; + switch (col.getDataType().getType()) { + case BOOLEAN: + datum = DatumFactory.createBool(bytes[offset]); + break; + case BIT: + datum = DatumFactory.createBit(bytes[offset]); + break; + case CHAR: { + byte[] chars = new byte[length]; + System.arraycopy(bytes, offset, chars, 0, length); + datum = DatumFactory.createChar(chars); + break; + } + case INT2: + datum = DatumFactory.createInt2(Bytes.toShort(bytes, offset, length)); + break; + case INT4: + datum = DatumFactory.createInt4((int) Bytes.readVLong(bytes, offset)); + break; + case INT8: + datum = DatumFactory.createInt8(Bytes.readVLong(bytes, offset)); + break; + case FLOAT4: + datum = DatumFactory.createFloat4(toFloat(bytes, offset, length)); + break; + case FLOAT8: + datum = DatumFactory.createFloat8(toDouble(bytes, offset, length)); + break; + case TEXT: { + byte[] chars = new byte[length]; + System.arraycopy(bytes, offset, chars, 0, length); + + if (Bytes.equals(INVALID_UTF__SINGLE_BYTE, chars)) { + datum = DatumFactory.createText(new byte[0]); + } else { + datum = DatumFactory.createText(chars); + } + break; + } + case PROTOBUF: { + ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType().getCode()); + Message.Builder builder = factory.newBuilder(); + builder.mergeFrom(bytes, offset, length); + datum = factory.createDatum(builder); + break; + } + case INET4: + datum = DatumFactory.createInet4(bytes, offset, length); + break; + case BLOB: + datum = DatumFactory.createBlob(bytes, offset, length); + break; + default: + datum = NullDatum.get(); + } + return datum; + } + + private byte[] shortBytes = new byte[2]; + + public int writeShort(OutputStream out, short val) throws IOException { + shortBytes[0] = (byte) (val >> 8); + shortBytes[1] = (byte) val; + out.write(shortBytes, 0, 2); + return 2; + } + + public float toFloat(byte[] bytes, int offset, int length) { + Preconditions.checkArgument(length == 4); + + int val = ((bytes[offset] & 0x000000FF) << 24) + + ((bytes[offset + 1] & 0x000000FF) << 16) + + ((bytes[offset + 2] & 0x000000FF) << 8) + + (bytes[offset + 3] & 0x000000FF); + return Float.intBitsToFloat(val); + } + + private byte[] floatBytes = new byte[4]; + + public int writeFloat(OutputStream out, float f) throws IOException { + int val = Float.floatToIntBits(f); + + floatBytes[0] = (byte) (val >> 24); + floatBytes[1] = (byte) (val >> 16); + floatBytes[2] = (byte) (val >> 8); + floatBytes[3] = (byte) val; + out.write(floatBytes, 0, 4); + return floatBytes.length; + } + + public double toDouble(byte[] bytes, int offset, int length) { + Preconditions.checkArgument(length == 8); + long val = ((long) (bytes[offset] & 0x00000000000000FF) << 56) + + ((long) (bytes[offset + 1] & 0x00000000000000FF) << 48) + + ((long) (bytes[offset + 2] & 0x00000000000000FF) << 40) + + ((long) (bytes[offset + 3] & 0x00000000000000FF) << 32) + + ((long) (bytes[offset + 4] & 0x00000000000000FF) << 24) + + ((long) (bytes[offset + 5] & 0x00000000000000FF) << 16) + + ((long) (bytes[offset + 6] & 0x00000000000000FF) << 8) + + (long) (bytes[offset + 7] & 0x00000000000000FF); + return Double.longBitsToDouble(val); + } + + private byte[] doubleBytes = new byte[8]; + + public int writeDouble(OutputStream out, double d) throws IOException { + long val = Double.doubleToLongBits(d); + + doubleBytes[0] = (byte) (val >> 56); + doubleBytes[1] = (byte) (val >> 48); + doubleBytes[2] = (byte) (val >> 40); + doubleBytes[3] = (byte) (val >> 32); + doubleBytes[4] = (byte) (val >> 24); + doubleBytes[5] = (byte) (val >> 16); + doubleBytes[6] = (byte) (val >> 8); + doubleBytes[7] = (byte) val; + out.write(doubleBytes, 0, 8); + return doubleBytes.length; + } + + private byte[] vLongBytes = new byte[9]; + + public static int writeVLongToByteArray(byte[] bytes, int offset, long l) { + if (l >= -112 && l <= 127) { + bytes[offset] = (byte) l; + return 1; + } + + int len = -112; + if (l < 0) { + l ^= -1L; // take one's complement' + len = -120; + } + + long tmp = l; + while (tmp != 0) { + tmp = tmp >> 8; + len--; + } + + bytes[offset++] = (byte) len; + len = (len < -120) ? -(len + 120) : -(len + 112); + + for (int idx = len; idx != 0; idx--) { + int shiftbits = (idx - 1) * 8; + bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits); + } + return 1 + len; + } + + public int writeVLong(OutputStream out, long l) throws IOException { + int len = writeVLongToByteArray(vLongBytes, 0, l); + out.write(vLongBytes, 0, len); + return len; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java new file mode 100644 index 0000000..85c79fa --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.util.internal.PlatformDependent; +import org.apache.hadoop.classification.InterfaceStability; + +/* this class is PooledBuffer holder */ +public class BufferPool { + + private static final PooledByteBufAllocator allocator; + + private BufferPool() { + } + + static { + //TODO we need determine the default params + allocator = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred()); + + /* if you are finding memory leak, please enable this line */ + //ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); + } + + public static long maxDirectMemory() { + return PlatformDependent.maxDirectMemory(); + } + + + public synchronized static ByteBuf directBuffer(int size) { + return allocator.directBuffer(size); + } + + /** + * + * @param size the initial capacity + * @param max the max capacity + * @return allocated ByteBuf from pool + */ + public static ByteBuf directBuffer(int size, int max) { + return allocator.directBuffer(size, max); + } + + @InterfaceStability.Unstable + public static void forceRelease(ByteBuf buf) { + buf.release(buf.refCnt()); + } + + /** + * the ByteBuf will increase to writable size + * @param buf + * @param minWritableBytes required minimum writable size + */ + public static void ensureWritable(ByteBuf buf, int minWritableBytes) { + buf.ensureWritable(minWritableBytes); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java new file mode 100644 index 0000000..b1b6d65 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.hdfs.DFSInputStream; +import org.apache.hadoop.io.IOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.ScatteringByteChannel; +import java.nio.channels.spi.AbstractInterruptibleChannel; + +public class ByteBufInputChannel extends AbstractInterruptibleChannel implements ScatteringByteChannel { + + ByteBufferReadable byteBufferReadable; + ReadableByteChannel channel; + InputStream inputStream; + + public ByteBufInputChannel(InputStream inputStream) { + if (inputStream instanceof DFSInputStream && inputStream instanceof ByteBufferReadable) { + this.byteBufferReadable = (ByteBufferReadable) inputStream; + } else { + this.channel = Channels.newChannel(inputStream); + } + + this.inputStream = inputStream; + } + + @Override + public long read(ByteBuffer[] dsts, int offset, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public long read(ByteBuffer[] dsts) { + return read(dsts, 0, dsts.length); + } + + @Override + public int read(ByteBuffer dst) throws IOException { + if (byteBufferReadable != null) { + return byteBufferReadable.read(dst); + } else { + return channel.read(dst); + } + } + + @Override + protected void implCloseChannel() throws IOException { + IOUtils.cleanup(null, channel, inputStream); + } + + public int available() throws IOException { + return inputStream.available(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java new file mode 100644 index 0000000..8841a31 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +public class DataLocation { + private String host; + private int volumeId; + + public DataLocation(String host, int volumeId) { + this.host = host; + this.volumeId = volumeId; + } + + public String getHost() { + return host; + } + + public int getVolumeId() { + return volumeId; + } + + @Override + public String toString() { + return "DataLocation{" + + "host=" + host + + ", volumeId=" + volumeId + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java new file mode 100644 index 0000000..2396349 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import java.util.ArrayList; +import java.util.List; + +public class DiskDeviceInfo { + private int id; + private String name; + + private List mountInfos = new ArrayList(); + + public DiskDeviceInfo(int id) { + this.id = id; + } + + public int getId() { + return id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public String toString() { + return id + "," + name; + } + + public void addMountPath(DiskMountInfo diskMountInfo) { + mountInfos.add(diskMountInfo); + } + + public List getMountInfos() { + return mountInfos; + } + + public void setMountInfos(List mountInfos) { + this.mountInfos = mountInfos; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskInfo.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskInfo.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskInfo.java new file mode 100644 index 0000000..22f18ba --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskInfo.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +public class DiskInfo { + private int id; + private String partitionName; + private String mountPath; + + private long capacity; + private long used; + + public DiskInfo(int id, String partitionName) { + this.id = id; + this.partitionName = partitionName; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getPartitionName() { + return partitionName; + } + + public void setPartitionName(String partitionName) { + this.partitionName = partitionName; + } + + public String getMountPath() { + return mountPath; + } + + public void setMountPath(String mountPath) { + this.mountPath = mountPath; + } + + public long getCapacity() { + return capacity; + } + + public void setCapacity(long capacity) { + this.capacity = capacity; + } + + public long getUsed() { + return used; + } + + public void setUsed(long used) { + this.used = used; + } + + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskMountInfo.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskMountInfo.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskMountInfo.java new file mode 100644 index 0000000..aadb0e7 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskMountInfo.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.base.Objects; + +public class DiskMountInfo implements Comparable { + private String mountPath; + + private long capacity; + private long used; + + private int deviceId; + + public DiskMountInfo(int deviceId, String mountPath) { + this.mountPath = mountPath; + } + + public String getMountPath() { + return mountPath; + } + + public void setMountPath(String mountPath) { + this.mountPath = mountPath; + } + + public long getCapacity() { + return capacity; + } + + public void setCapacity(long capacity) { + this.capacity = capacity; + } + + public long getUsed() { + return used; + } + + public void setUsed(long used) { + this.used = used; + } + + public int getDeviceId() { + return deviceId; + } + + @Override + public boolean equals(Object obj){ + if (!(obj instanceof DiskMountInfo)) return false; + + if (compareTo((DiskMountInfo) obj) == 0) return true; + else return false; + } + + @Override + public int hashCode(){ + return Objects.hashCode(mountPath); + } + + @Override + public int compareTo(DiskMountInfo other) { + String path1 = mountPath; + String path2 = other.mountPath; + + int path1Depth = "/".equals(path1) ? 0 : path1.split("/", -1).length - 1 ; + int path2Depth = "/".equals(path2) ? 0 : path2.split("/", -1).length - 1 ; + + if(path1Depth > path2Depth) { + return -1; + } else if(path1Depth < path2Depth) { + return 1; + } else { + int path1Length = path1.length(); + int path2Length = path2.length(); + + if(path1Length < path2Length) { + return 1; + } else if(path1Length > path2Length) { + return -1; + } else { + return path1.compareTo(path2); + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java new file mode 100644 index 0000000..2d68870 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.common.Util; + +import java.io.*; +import java.net.URI; +import java.util.*; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; + +public class DiskUtil { + + static String UNIX_DISK_DEVICE_PATH = "/proc/partitions"; + + public enum OSType { + OS_TYPE_UNIX, OS_TYPE_WINXP, OS_TYPE_SOLARIS, OS_TYPE_MAC + } + + static private OSType getOSType() { + String osName = System.getProperty("os.name"); + if (osName.contains("Windows") + && (osName.contains("XP") || osName.contains("2003") + || osName.contains("Vista") + || osName.contains("Windows_7") + || osName.contains("Windows 7") || osName + .contains("Windows7"))) { + return OSType.OS_TYPE_WINXP; + } else if (osName.contains("SunOS") || osName.contains("Solaris")) { + return OSType.OS_TYPE_SOLARIS; + } else if (osName.contains("Mac")) { + return OSType.OS_TYPE_MAC; + } else { + return OSType.OS_TYPE_UNIX; + } + } + + public static List getDiskDeviceInfos() throws IOException { + List deviceInfos; + + if(getOSType() == OSType.OS_TYPE_UNIX) { + deviceInfos = getUnixDiskDeviceInfos(); + setDeviceMountInfo(deviceInfos); + } else { + deviceInfos = getDefaultDiskDeviceInfos(); + } + + return deviceInfos; + } + + private static List getUnixDiskDeviceInfos() { + List infos = new ArrayList(); + + File file = new File(UNIX_DISK_DEVICE_PATH); + if(!file.exists()) { + System.out.println("No partition file:" + file.getAbsolutePath()); + return getDefaultDiskDeviceInfos(); + } + + BufferedReader reader = null; + try { + reader = new BufferedReader(new InputStreamReader(new FileInputStream(UNIX_DISK_DEVICE_PATH))); + String line = null; + + int count = 0; + Set deviceNames = new TreeSet(); + while((line = reader.readLine()) != null) { + if(count > 0 && !line.trim().isEmpty()) { + String[] tokens = line.trim().split(" +"); + if(tokens.length == 4) { + String deviceName = getDiskDeviceName(tokens[3]); + deviceNames.add(deviceName); + } + } + count++; + } + + int id = 0; + for(String eachDeviceName: deviceNames) { + DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(id++); + diskDeviceInfo.setName(eachDeviceName); + + //TODO set addtional info + // /sys/block/sda/queue + infos.add(diskDeviceInfo); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + if(reader != null) { + try { + reader.close(); + } catch (IOException e) { + } + } + } + + return infos; + } + + private static String getDiskDeviceName(String partitionName) { + byte[] bytes = partitionName.getBytes(); + + byte[] result = new byte[bytes.length]; + int length = 0; + for(int i = 0; i < bytes.length; i++, length++) { + if(bytes[i] >= '0' && bytes[i] <= '9') { + break; + } else { + result[i] = bytes[i]; + } + } + + return new String(result, 0, length); + } + + public static List getDefaultDiskDeviceInfos() { + DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(0); + diskDeviceInfo.setName("default"); + + List infos = new ArrayList(); + + infos.add(diskDeviceInfo); + + return infos; + } + + + private static void setDeviceMountInfo(List deviceInfos) throws IOException { + Map deviceMap = new HashMap(); + for(DiskDeviceInfo eachDevice: deviceInfos) { + deviceMap.put(eachDevice.getName(), eachDevice); + } + + BufferedReader mountOutput = null; + try { + Process mountProcess = Runtime.getRuntime().exec("mount"); + mountOutput = new BufferedReader(new InputStreamReader( + mountProcess.getInputStream())); + while (true) { + String line = mountOutput.readLine(); + if (line == null) { + break; + } + + int indexStart = line.indexOf(" on /"); + int indexEnd = line.indexOf(" ", indexStart + 4); + + String deviceName = line.substring(0, indexStart).trim(); + String[] deviceNameTokens = deviceName.split("/"); + if(deviceNameTokens.length == 3) { + if("dev".equals(deviceNameTokens[1])) { + String realDeviceName = getDiskDeviceName(deviceNameTokens[2]); + String mountPath = new File(line.substring(indexStart + 4, indexEnd)).getAbsolutePath(); + + DiskDeviceInfo diskDeviceInfo = deviceMap.get(realDeviceName); + if(diskDeviceInfo != null) { + diskDeviceInfo.addMountPath(new DiskMountInfo(diskDeviceInfo.getId(), mountPath)); + } + } + } + } + } catch (IOException e) { + throw e; + } finally { + if (mountOutput != null) { + mountOutput.close(); + } + } + } + + public static int getDataNodeStorageSize(){ + return getStorageDirs().size(); + } + + public static List getStorageDirs(){ + Configuration conf = new HdfsConfiguration(); + Collection dirNames = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY); + return Util.stringCollectionAsURIs(dirNames); + } + + public static void main(String[] args) throws Exception { + System.out.println("/dev/sde1".split("/").length); + for(String eachToken: "/dev/sde1".split("/")) { + System.out.println(eachToken); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java new file mode 100644 index 0000000..7df4584 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import io.netty.buffer.ByteBuf; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.datum.Datum; + +import java.io.IOException; +import java.io.OutputStream; + + +public interface FieldSerializerDeserializer { + + public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException; + + public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java new file mode 100644 index 0000000..8b7e2e0 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + */ +package org.apache.tajo.storage; + +import com.google.common.base.Preconditions; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.exception.UnsupportedException; + +/** + * An instance of FrameTuple is an immutable tuple. + * It contains two tuples and pretends to be one instance of Tuple for + * join qual evaluatations. + */ +public class FrameTuple implements Tuple, Cloneable { + private int size; + private int leftSize; + + private Tuple left; + private Tuple right; + + public FrameTuple() {} + + public FrameTuple(Tuple left, Tuple right) { + set(left, right); + } + + public void set(Tuple left, Tuple right) { + this.size = left.size() + right.size(); + this.left = left; + this.leftSize = left.size(); + this.right = right; + } + + @Override + public int size() { + return size; + } + + @Override + public boolean contains(int fieldId) { + Preconditions.checkArgument(fieldId < size, + "Out of field access: " + fieldId); + + if (fieldId < leftSize) { + return left.contains(fieldId); + } else { + return right.contains(fieldId - leftSize); + } + } + + @Override + public boolean isNull(int fieldid) { + return get(fieldid).isNull(); + } + + @Override + public boolean isNotNull(int fieldid) { + return !isNull(fieldid); + } + + @Override + public void clear() { + throw new UnsupportedException(); + } + + @Override + public void put(int fieldId, Datum value) { + throw new UnsupportedException(); + } + + @Override + public void put(int fieldId, Datum[] values) { + throw new UnsupportedException(); + } + + @Override + public void put(int fieldId, Tuple tuple) { + throw new UnsupportedException(); + } + + @Override + public void setOffset(long offset) { + throw new UnsupportedException(); + } + + @Override + public long getOffset() { + throw new UnsupportedException(); + } + + @Override + public void put(Datum [] values) { + throw new UnsupportedException(); + } + + @Override + public Datum get(int fieldId) { + Preconditions.checkArgument(fieldId < size, + "Out of field access: " + fieldId); + + if (fieldId < leftSize) { + return left.get(fieldId); + } else { + return right.get(fieldId - leftSize); + } + } + + @Override + public boolean getBool(int fieldId) { + return get(fieldId).asBool(); + } + + @Override + public byte getByte(int fieldId) { + return get(fieldId).asByte(); + } + + @Override + public char getChar(int fieldId) { + return get(fieldId).asChar(); + } + + @Override + public byte [] getBytes(int fieldId) { + return get(fieldId).asByteArray(); + } + + @Override + public short getInt2(int fieldId) { + return get(fieldId).asInt2(); + } + + @Override + public int getInt4(int fieldId) { + return get(fieldId).asInt4(); + } + + @Override + public long getInt8(int fieldId) { + return get(fieldId).asInt8(); + } + + @Override + public float getFloat4(int fieldId) { + return get(fieldId).asFloat4(); + } + + @Override + public double getFloat8(int fieldId) { + return get(fieldId).asFloat8(); + } + + @Override + public String getText(int fieldId) { + return get(fieldId).asChars(); + } + + @Override + public ProtobufDatum getProtobufDatum(int fieldId) { + return (ProtobufDatum) get(fieldId); + } + + @Override + public IntervalDatum getInterval(int fieldId) { + return (IntervalDatum) get(fieldId); + } + + @Override + public char [] getUnicodeChars(int fieldId) { + return get(fieldId).asUnicodeChars(); + } + + @Override + public Tuple clone() throws CloneNotSupportedException { + FrameTuple frameTuple = (FrameTuple) super.clone(); + frameTuple.set(this.left.clone(), this.right.clone()); + return frameTuple; + } + + @Override + public Datum[] getValues(){ + throw new UnsupportedException(); + } + + public String toString() { + boolean first = true; + StringBuilder str = new StringBuilder(); + str.append("("); + for(int i=0; i < size(); i++) { + if(contains(i)) { + if(first) { + first = false; + } else { + str.append(", "); + } + str.append(i) + .append("=>") + .append(get(i)); + } + } + str.append(")"); + return str.toString(); + } +}