tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [1/3] TAJO-223: Maximize disk read bandwidth utilization of StorageManagerV2 by moving Tuple creation role to next() (Keuntae Park via hyunsik)
Date Fri, 04 Oct 2013 09:53:06 GMT
Updated Branches:
  refs/heads/master 3f61b9733 -> 4449d9c48


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4449d9c4/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVCompression.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVCompression.java
b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVCompression.java
new file mode 100644
index 0000000..e2259a1
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVCompression.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class TestCSVCompression {
+  private TajoConf conf;
+  private static String TEST_PATH = "target/test-data/v2/TestCSVCompression";
+
+  private CatalogProtos.StoreType storeType;
+  private Path testDir;
+  private FileSystem fs;
+
+  public TestCSVCompression(CatalogProtos.StoreType type) throws IOException {
+    this.storeType = type;
+    conf = new TajoConf();
+    conf.set("tajo.storage.manager.v2", "true");
+
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    fs = testDir.getFileSystem(conf);
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][]{
+        {CatalogProtos.StoreType.CSV}
+    });
+  }
+
+  @Test
+  public void testDeflateCodecCompressionData() throws IOException {
+    storageCompressionTest(storeType, DeflateCodec.class);
+  }
+
+  @Test
+  public void testGzipCodecCompressionData() throws IOException {
+    storageCompressionTest(storeType, GzipCodec.class);
+  }
+
+  @Test
+  public void testSnappyCodecCompressionData() throws IOException {
+    if (SnappyCodec.isNativeCodeLoaded()) {
+      storageCompressionTest(storeType, SnappyCodec.class);
+    }
+  }
+
+  @Test
+  public void testBzip2CodecCompressionData() throws IOException {
+    storageCompressionTest(storeType, BZip2Codec.class);
+  }
+
+  @Test
+  public void testLz4CodecCompressionData() throws IOException {
+    if(NativeCodeLoader.isNativeCodeLoaded() && Lz4Codec.isNativeCodeLoaded())
+      storageCompressionTest(storeType, Lz4Codec.class);
+  }
+
+  @Test
+  public void testSplitCompressionData() throws IOException {
+
+    Schema schema = new Schema();
+    schema.addColumn("id", TajoDataTypes.Type.INT4);
+    schema.addColumn("age", TajoDataTypes.Type.INT8);
+
+    TableMeta meta = CatalogUtil.newTableMeta(schema, CatalogProtos.StoreType.CSV);
+    meta.putOption("compression.codec", BZip2Codec.class.getCanonicalName());
+
+    Path tablePath = new Path(testDir, "SplitCompression");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
+    appender.enableStats();
+
+    appender.init();
+
+    String extention = "";
+    if (appender instanceof CSVFile.CSVAppender) {
+      extention = ((CSVFile.CSVAppender) appender).getExtension();
+    }
+
+    int tupleNum = 100000;
+    VTuple vTuple;
+
+    for (int i = 0; i < tupleNum; i++) {
+      vTuple = new VTuple(2);
+      vTuple.put(0, DatumFactory.createInt4(i + 1));
+      vTuple.put(1, DatumFactory.createInt8(25l));
+      appender.addTuple(vTuple);
+    }
+    appender.close();
+
+    TableStat stat = appender.getStats();
+    assertEquals(tupleNum, stat.getNumRows().longValue());
+    tablePath = tablePath.suffix(extention);
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    long fileLen = status.getLen();
+    long randomNum = (long) (Math.random() * fileLen) + 1;
+
+    Fragment[] tablets = new Fragment[2];
+    tablets[0] = new Fragment("SplitCompression", tablePath, meta, 0, randomNum);
+    tablets[1] = new Fragment("SplitCompression", tablePath, meta, randomNum, (fileLen -
randomNum));
+
+    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, tablets[0],
schema);
+    scanner.init();
+    int tupleCnt = 0;
+    Tuple tuple;
+    while ((tuple = scanner.next()) != null) {
+      tupleCnt++;
+    }
+    scanner.close();
+
+    scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, tablets[1],
schema);
+    scanner.init();
+    while ((tuple = scanner.next()) != null) {
+      tupleCnt++;
+    }
+
+    scanner.close();
+    assertEquals(tupleNum, tupleCnt);
+  }
+
+  private void storageCompressionTest(CatalogProtos.StoreType storeType, Class<? extends
CompressionCodec> codec) throws IOException {
+    Schema schema = new Schema();
+    schema.addColumn("id", TajoDataTypes.Type.INT4);
+    schema.addColumn("age", TajoDataTypes.Type.INT8);
+
+    TableMeta meta = CatalogUtil.newTableMeta(schema, storeType);
+    meta.putOption("compression.codec", codec.getCanonicalName());
+
+    String fileName = "Compression_" + codec.getSimpleName();
+    Path tablePath = new Path(testDir, fileName);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
+    appender.enableStats();
+
+    appender.init();
+
+    String extension = "";
+    if (appender instanceof CSVFile.CSVAppender) {
+      extension = ((CSVFile.CSVAppender) appender).getExtension();
+    }
+
+    int tupleNum = 10000;
+    VTuple vTuple;
+
+    for (int i = 0; i < tupleNum; i++) {
+      vTuple = new VTuple(2);
+      vTuple.put(0, DatumFactory.createInt4(i + 1));
+      vTuple.put(1, DatumFactory.createInt8(25l));
+      appender.addTuple(vTuple);
+    }
+    appender.close();
+
+    TableStat stat = appender.getStats();
+    assertEquals(tupleNum, stat.getNumRows().longValue());
+    tablePath = tablePath.suffix(extension);
+    FileStatus status = fs.getFileStatus(tablePath);
+    long fileLen = status.getLen();
+    Fragment[] tablets = new Fragment[1];
+    tablets[0] = new Fragment(fileName, tablePath, meta, 0, fileLen);
+
+    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, tablets[0],
schema);
+    scanner.init();
+    int tupleCnt = 0;
+    while (scanner.next() != null) {
+      tupleCnt++;
+    }
+    scanner.close();
+    assertEquals(tupleCnt, tupleNum);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4449d9c4/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVScanner.java
b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVScanner.java
new file mode 100644
index 0000000..bf56943
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVScanner.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestCSVScanner {
+  private TajoConf conf;
+  private static String TEST_PATH = "target/test-data/v2/TestCSVScanner";
+  AbstractStorageManager sm = null;
+  private Path testDir;
+  private FileSystem fs;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new TajoConf();
+    conf.set("tajo.storage.manager.v2", "true");
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    fs = testDir.getFileSystem(conf);
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public final void testGetScannerAndAppender() throws IOException {
+    Schema schema = new Schema();
+    schema.addColumn("id", TajoDataTypes.Type.INT4);
+    schema.addColumn("age", TajoDataTypes.Type.INT4);
+    schema.addColumn("name", TajoDataTypes.Type.TEXT);
+
+    TableMeta meta = CatalogUtil.newTableMeta(schema, CatalogProtos.StoreType.CSV);
+
+    Tuple[] tuples = new Tuple[4];
+    for(int i=0; i < tuples.length; i++) {
+      tuples[i] = new VTuple(3);
+      tuples[i].put(new Datum[] {
+          DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i + 32),
+          DatumFactory.createText("name" + i)});
+    }
+
+    Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
+    fs.mkdirs(path.getParent());
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, path);
+    appender.init();
+    for(Tuple t : tuples) {
+      appender.addTuple(t);
+    }
+    appender.close();
+
+    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, path);
+    scanner.init();
+    int i=0;
+    Tuple tuple = null;
+    while( (tuple = scanner.next()) != null) {
+      i++;
+    }
+    assertEquals(4,i);
+  }
+
+  @Test
+  public final void testPartitionFile() throws IOException {
+    Schema schema = new Schema();
+    schema.addColumn("key", TajoDataTypes.Type.TEXT);
+    schema.addColumn("age", TajoDataTypes.Type.INT4);
+    schema.addColumn("name", TajoDataTypes.Type.TEXT);
+
+    TableMeta meta = CatalogUtil.newTableMeta(schema, CatalogProtos.StoreType.CSV);
+
+
+    Path path = StorageUtil.concatPath(testDir, "testPartitionFile", "table.csv");
+    fs.mkdirs(path.getParent());
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, path);
+    appender.init();
+
+    String keyValue = "";
+    for(int i = 0; i < 100; i++) {
+      keyValue += "0123456789";
+    }
+    keyValue = "key_" + keyValue + "_";
+
+    String nameValue = "";
+    for(int i = 0; i < 100; i++) {
+      nameValue += "0123456789";
+    }
+    nameValue = "name_" + nameValue + "_";
+
+    int numTuples = 100000;
+    for(int i = 0; i < numTuples; i++) {
+      Tuple tuple = new VTuple(3);
+      tuple.put(new Datum[] {
+          DatumFactory.createText(keyValue + i),
+          DatumFactory.createInt4(i + 32),
+          DatumFactory.createText(nameValue + i)});
+      appender.addTuple(tuple);
+    }
+    appender.close();
+
+    long fileLength = fs.getLength(path);
+    long totalTupleCount = 0;
+
+    int scanCount = 0;
+    Tuple startTuple = null;
+    Tuple lastTuple = null;
+    while(true) {
+      long startOffset = (64 * 1024 * 1024) * scanCount;
+      long length = Math.min(64 * 1024 * 1024, fileLength - startOffset);
+
+      Fragment fragment = new Fragment("Test", path, meta, startOffset, length, null, null);
+      Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, fragment,
schema);
+      scanner.init();
+      Tuple tuple = null;
+      while( (tuple = scanner.next()) != null) {
+        if(startTuple == null) {
+          startTuple = tuple;
+        }
+        lastTuple = tuple;
+        totalTupleCount++;
+      }
+      scanCount++;
+      if(length < 64 * 1024 * 1024) {
+        break;
+      }
+    }
+    assertEquals(numTuples, totalTupleCount);
+    assertEquals(keyValue + 0, startTuple.get(0).toString());
+    assertEquals(keyValue + (numTuples - 1), lastTuple.get(0).toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/4449d9c4/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
new file mode 100644
index 0000000..3d1b6f7
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.rcfile.RCFile;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class TestStorages {
+	private TajoConf conf;
+	private static String TEST_PATH = "target/test-data/v2/TestStorages";
+
+  private StoreType storeType;
+  private boolean splitable;
+  private boolean statsable;
+  private Path testDir;
+  private FileSystem fs;
+
+  public TestStorages(StoreType type, boolean splitable, boolean statsable) throws IOException
{
+    this.storeType = type;
+    this.splitable = splitable;
+    this.statsable = statsable;
+
+    conf = new TajoConf();
+    conf.set("tajo.storage.manager.v2", "true");
+
+    if (storeType == StoreType.RCFILE) {
+      conf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, 100);
+    }
+
+
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    fs = testDir.getFileSystem(conf);
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][]{
+        {StoreType.CSV, true, true},
+        {StoreType.RCFILE, true, true},
+        {StoreType.TREVNI, false, true},
+        {StoreType.RAW, false, false},
+    });
+  }
+		
+	@Test
+  public void testSplitable() throws IOException {
+    if (splitable) {
+      Schema schema = new Schema();
+      schema.addColumn("id", Type.INT4);
+      schema.addColumn("age", Type.INT8);
+
+      TableMeta meta = CatalogUtil.newTableMeta(schema, storeType);
+      Path tablePath = new Path(testDir, "Splitable.data");
+      Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta,
tablePath);
+      appender.enableStats();
+      appender.init();
+      int tupleNum = 10000;
+      VTuple vTuple;
+
+      for(int i = 0; i < tupleNum; i++) {
+        vTuple = new VTuple(2);
+        vTuple.put(0, DatumFactory.createInt4(i + 1));
+        vTuple.put(1, DatumFactory.createInt8(25l));
+        appender.addTuple(vTuple);
+      }
+      appender.close();
+      TableStat stat = appender.getStats();
+      assertEquals(tupleNum, stat.getNumRows().longValue());
+
+      FileStatus status = fs.getFileStatus(tablePath);
+      long fileLen = status.getLen();
+      long randomNum = (long) (Math.random() * fileLen) + 1;
+
+      Fragment[] tablets = new Fragment[2];
+      tablets[0] = new Fragment("Splitable", tablePath, meta, 0, randomNum);
+      tablets[1] = new Fragment("Splitable", tablePath, meta, randomNum, (fileLen - randomNum));
+
+      Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, tablets[0],
schema);
+      scanner.init();
+      int tupleCnt = 0;
+      while (scanner.next() != null) {
+        tupleCnt++;
+      }
+      scanner.close();
+
+      scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, tablets[1],
schema);
+      scanner.init();
+      while (scanner.next() != null) {
+        tupleCnt++;
+      }
+      scanner.close();
+
+      assertEquals(tupleNum, tupleCnt);
+    }
+	}
+
+  @Test
+  public void testProjection() throws IOException {
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.INT8);
+    schema.addColumn("score", Type.FLOAT4);
+
+    TableMeta meta = CatalogUtil.newTableMeta(schema, storeType);
+
+    Path tablePath = new Path(testDir, "testProjection.data");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
+    appender.init();
+    int tupleNum = 10000;
+    VTuple vTuple;
+
+    for(int i = 0; i < tupleNum; i++) {
+      vTuple = new VTuple(3);
+      vTuple.put(0, DatumFactory.createInt4(i + 1));
+      vTuple.put(1, DatumFactory.createInt8(i + 2));
+      vTuple.put(2, DatumFactory.createFloat4(i + 3));
+      appender.addTuple(vTuple);
+    }
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    Fragment fragment = new Fragment("testReadAndWrite", tablePath, meta, 0, status.getLen());
+
+    Schema target = new Schema();
+    target.addColumn("age", Type.INT8);
+    target.addColumn("score", Type.FLOAT4);
+    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, fragment,
target);
+    scanner.init();
+    int tupleCnt = 0;
+    Tuple tuple;
+    while ((tuple = scanner.next()) != null) {
+      if (storeType == StoreType.RCFILE || storeType == StoreType.TREVNI || storeType ==
StoreType.CSV) {
+        assertTrue(tuple.get(0) == null || tuple.get(0) instanceof NullDatum);
+      }
+      assertEquals(DatumFactory.createInt8(tupleCnt + 2), tuple.getLong(1));
+      assertEquals(DatumFactory.createFloat4(tupleCnt + 3), tuple.getFloat(2));
+      tupleCnt++;
+    }
+    scanner.close();
+
+    assertEquals(tupleNum, tupleCnt);
+  }
+
+  @Test
+  public void testVariousTypes() throws IOException {
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.BOOLEAN);
+    schema.addColumn("col2", Type.BIT);
+    schema.addColumn("col3", Type.CHAR, 7);
+    schema.addColumn("col4", Type.INT2);
+    schema.addColumn("col5", Type.INT4);
+    schema.addColumn("col6", Type.INT8);
+    schema.addColumn("col7", Type.FLOAT4);
+    schema.addColumn("col8", Type.FLOAT8);
+    schema.addColumn("col9", Type.TEXT);
+    schema.addColumn("col10", Type.BLOB);
+    schema.addColumn("col11", Type.INET4);
+    schema.addColumn("col12", Type.NULL);
+
+    Options options = new Options();
+    TableMeta meta = CatalogUtil.newTableMeta(schema, storeType, options);
+
+    Path tablePath = new Path(testDir, "testVariousTypes.data");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
+    appender.init();
+
+    Tuple tuple = new VTuple(12);
+    tuple.put(new Datum[] {
+        DatumFactory.createBool(true),
+        DatumFactory.createBit((byte) 0x99),
+        DatumFactory.createChar("hyunsik"),
+        DatumFactory.createInt2((short) 17),
+        DatumFactory.createInt4(59),
+        DatumFactory.createInt8(23l),
+        DatumFactory.createFloat4(77.9f),
+        DatumFactory.createFloat8(271.9f),
+        DatumFactory.createText("hyunsik"),
+        DatumFactory.createBlob("hyunsik".getBytes()),
+        DatumFactory.createInet4("192.168.0.1"),
+        NullDatum.get()
+    });
+    appender.addTuple(tuple);
+    appender.flush();
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    Fragment fragment = new Fragment("table", tablePath, meta, 0, status.getLen());
+    Scanner scanner =  StorageManagerFactory.getStorageManager(conf).getScanner(meta, fragment);
+    scanner.init();
+
+    Tuple retrieved;
+    while ((retrieved=scanner.next()) != null) {
+      for (int i = 0; i < tuple.size(); i++) {
+        assertEquals(tuple.get(i), retrieved.get(i));
+      }
+    }
+  }
+}


Mime
View raw message