tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject git commit: TAJO-1043: Implement nextFetch(RowBlock) of CSVScanner. (hyunsik)
Date Wed, 01 Oct 2014 06:23:37 GMT
Repository: tajo
Updated Branches:
  refs/heads/block_iteration a71a98147 -> c128eca31


TAJO-1043: Implement nextFetch(RowBlock) of CSVScanner. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/c128eca3
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/c128eca3
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/c128eca3

Branch: refs/heads/block_iteration
Commit: c128eca315fdb084815c4a9ac4fb9c00c87f5466
Parents: a71a981
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Sat Sep 27 13:48:26 2014 -0700
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Sat Sep 27 13:48:26 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/tajo/storage/CSVFile.java   |  51 ++
 .../org/apache/tajo/storage/FileScanner.java    |   2 +-
 .../java/org/apache/tajo/storage/Scanner.java   |   2 +-
 .../storage/TextSerializerDeserializer.java     |  94 +-
 .../apache/tajo/tuple/offheap/UnSafeTuple.java  |   4 +
 .../apache/tajo/storage/TestNextFetches.java    | 854 +++++++++++++++++++
 6 files changed, 1004 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/c128eca3/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
index 2113794..06ff081 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -35,11 +35,15 @@ import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.exception.UnimplementedException;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.storage.compress.CodecPool;
 import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlockWriter;
+import org.apache.tajo.tuple.offheap.RowWriter;
 import org.apache.tajo.util.BytesUtils;
 
 import java.io.*;
@@ -480,6 +484,53 @@ public class CSVFile {
       }
     }
 
+    TextSerializerDeserializer deserializer = new TextSerializerDeserializer();
+
+    boolean hasNext() throws IOException {
+      if (currentIdx == validIdx) {
+        if (eof) {
+          return false;
+        } else {
+          page();
+
+          if(currentIdx == validIdx){
+            return false;
+          }
+        }
+      }
+
+      return true;
+    }
+
+    @Override
+    public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
+      rowBlock.clear();
+      OffHeapRowBlockWriter writer = (OffHeapRowBlockWriter) rowBlock.getWriter();
+
+      while(hasNext() && rowBlock.rows() < rowBlock.maxRowNum()) {
+        byte[][] cells = BytesUtils.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx),
+            rowLengthList.get(currentIdx), delimiter, targetColumnIndexes);
+        currentIdx++;
+
+        int fieldIdx = 0;
+        writer.startRow();
+        for (; fieldIdx < cells.length && fieldIdx < schema.size(); fieldIdx++)
{
+          if (cells[fieldIdx] == null) {
+            writer.skipField();
+          } else {
+            deserializer.write(writer, schema.getColumn(fieldIdx), cells[fieldIdx], 0, cells[fieldIdx].length,
nullChars);
+
+          }
+        }
+        for (; fieldIdx < schema.size(); fieldIdx++) {
+          writer.skipField();
+        }
+        writer.endRow();
+      }
+
+      return rowBlock.rows() > 0;
+    }
+
     private boolean isCompress() {
       return codec != null;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/c128eca3/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
index 6aa59e6..d4357e3 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
@@ -83,7 +83,7 @@ public abstract class FileScanner implements Scanner {
   }
 
   @Override
-  public boolean nextFetch(OffHeapRowBlock rowBlock) {
+  public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
     throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented");
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/c128eca3/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java
index f532e0e..3478e23 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java
@@ -49,7 +49,7 @@ public interface Scanner extends SchemaObject, Closeable {
    * @param rowBlock
    * @return
    */
-  boolean nextFetch(OffHeapRowBlock rowBlock);
+  boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException;
   
   /**
    * Reset the cursor. After executed, the scanner 

http://git-wip-us.apache.org/repos/asf/tajo/blob/c128eca3/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
index b42c1b5..6dfe6c1 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
@@ -25,8 +25,11 @@ import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.*;
 import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
+import org.apache.tajo.tuple.offheap.RowWriter;
 import org.apache.tajo.util.Bytes;
+import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.NumberUtil;
+import org.apache.tajo.util.datetime.DateTimeUtil;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -35,7 +38,7 @@ import java.io.OutputStream;
 public class TextSerializerDeserializer implements SerializerDeserializer {
   public static final byte[] trueBytes = "true".getBytes();
   public static final byte[] falseBytes = "false".getBytes();
-  private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
+  private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
 
 
   @Override
@@ -213,6 +216,95 @@ public class TextSerializerDeserializer implements SerializerDeserializer
{
     return datum;
   }
 
+  public static void write(RowWriter writer, Column col, byte [] bytes, int offset, int length,
byte [] nullChar) throws IOException {
+    TajoDataTypes.Type type = col.getDataType().getType();
+    boolean nullField;
+    if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) {
+      nullField = isNullText(bytes, offset, length, nullChar);
+    } else {
+      nullField = isNull(bytes, offset, length, nullChar);
+    }
+
+    if (nullField) {
+      writer.skipField();
+      return;
+    } else {
+      switch (col.getDataType().getType()) {
+      case BOOLEAN:
+        writer.putBool(bytes[offset] == 't' || bytes[offset] == 'T');
+        break;
+
+      case CHAR:
+      case TEXT:
+        writer.putText(bytes);
+        break;
+
+      case INT1:
+      case INT2:
+        writer.putInt2((short) NumberUtil.parseInt(bytes, offset, length));
+        break;
+
+      case INT4:
+        writer.putInt4(NumberUtil.parseInt(bytes, offset, length));
+        break;
+
+      case INT8:
+        writer.putInt8(Long.parseLong(new String(bytes, offset, length)));
+        break;
+
+      case FLOAT4:
+        writer.putFloat4(Float.parseFloat(new String(bytes, offset, length)));
+        break;
+
+      case FLOAT8:
+        writer.putFloat8(Double.parseDouble(new String(bytes, offset, length)));
+        break;
+
+      case DATE:
+        writer.putDate(DateTimeUtil.toJulianDate(new String(bytes, offset, length)));
+        break;
+
+      case TIME:
+        writer.putInt8(DateTimeUtil.toJulianTime(new String(bytes, offset, length)));
+        break;
+
+      case TIMESTAMP:
+        writer.putInt8(DateTimeUtil.toJulianTimestamp(new String(bytes, offset, length)));
+        break;
+
+      case INTERVAL:
+        writer.putInterval(DatumFactory.createInterval(new String(bytes, offset, length)));
+        break;
+
+      case PROTOBUF:
+        ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType());
+        Message.Builder builder = factory.newBuilder();
+        try {
+          byte[] protoBytes = new byte[length];
+          System.arraycopy(bytes, offset, protoBytes, 0, length);
+          protobufJsonFormat.merge(protoBytes, builder);
+          writer.putProtoDatum(factory.createDatum(builder.build()));
+        } catch (IOException e) {
+          e.printStackTrace();
+          throw new RuntimeException(e);
+        }
+
+        break;
+
+      case INET4:
+        writer.putInet4(NetUtils.convertIPStringToInt(new String(bytes, offset, length)));
+        break;
+
+      case BLOB:
+        writer.putBlob(Base64.decodeBase64(bytes));
+        break;
+
+      default:
+        writer.skipField();
+      }
+    }
+  }
+
   private static boolean isNull(byte[] val, int offset, int length, byte[] nullBytes) {
     return length == 0 || ((length == nullBytes.length)
         && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length));

http://git-wip-us.apache.org/repos/asf/tajo/blob/c128eca3/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
index d8bafea..6f4d385 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
@@ -145,6 +145,8 @@ public abstract class UnSafeTuple implements Tuple {
     switch (types[fieldId].getType()) {
     case BOOLEAN:
       return DatumFactory.createBool(getBool(fieldId));
+    case CHAR:
+      return DatumFactory.createChar(getBytes(fieldId));
     case INT1:
     case INT2:
       return DatumFactory.createInt2(getInt2(fieldId));
@@ -158,6 +160,8 @@ public abstract class UnSafeTuple implements Tuple {
       return DatumFactory.createFloat8(getFloat8(fieldId));
     case TEXT:
       return DatumFactory.createText(getText(fieldId));
+    case BLOB:
+      return DatumFactory.createBlob(getBytes(fieldId));
     case TIMESTAMP:
       return DatumFactory.createTimestamp(getInt8(fieldId));
     case DATE:

http://git-wip-us.apache.org/repos/asf/tajo/blob/c128eca3/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
new file mode 100644
index 0000000..d1b3afd
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
@@ -0,0 +1,854 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.ProtobufDatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.rcfile.RCFile;
+import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
+import org.apache.tajo.tuple.RowBlockReader;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.UnSafeTuple;
+import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.UnsafeUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import sun.misc.Unsafe;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class TestNextFetches {
+	private TajoConf conf;
+	private static String TEST_PATH = "target/test-data/TestStorages";
+
+  private static String TEST_PROJECTION_AVRO_SCHEMA =
+      "{\n" +
+      "  \"type\": \"record\",\n" +
+      "  \"namespace\": \"org.apache.tajo\",\n" +
+      "  \"name\": \"testProjection\",\n" +
+      "  \"fields\": [\n" +
+      "    { \"name\": \"id\", \"type\": \"int\" },\n" +
+      "    { \"name\": \"age\", \"type\": \"long\" },\n" +
+      "    { \"name\": \"score\", \"type\": \"float\" }\n" +
+      "  ]\n" +
+      "}\n";
+
+  private static String TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA =
+      "{\n" +
+      "  \"type\": \"record\",\n" +
+      "  \"namespace\": \"org.apache.tajo\",\n" +
+      "  \"name\": \"testNullHandlingTypes\",\n" +
+      "  \"fields\": [\n" +
+      "    { \"name\": \"col1\", \"type\": [\"null\", \"boolean\"] },\n" +
+      "    { \"name\": \"col2\", \"type\": [\"null\", \"int\"] },\n" +
+      "    { \"name\": \"col3\", \"type\": [\"null\", \"string\"] },\n" +
+      "    { \"name\": \"col4\", \"type\": [\"null\", \"int\"] },\n" +
+      "    { \"name\": \"col5\", \"type\": [\"null\", \"int\"] },\n" +
+      "    { \"name\": \"col6\", \"type\": [\"null\", \"long\"] },\n" +
+      "    { \"name\": \"col7\", \"type\": [\"null\", \"float\"] },\n" +
+      "    { \"name\": \"col8\", \"type\": [\"null\", \"double\"] },\n" +
+      "    { \"name\": \"col9\", \"type\": [\"null\", \"string\"] },\n" +
+      "    { \"name\": \"col10\", \"type\": [\"null\", \"bytes\"] },\n" +
+      "    { \"name\": \"col11\", \"type\": [\"null\", \"bytes\"] },\n" +
+      "    { \"name\": \"col12\", \"type\": \"null\" },\n" +
+      "    { \"name\": \"col13\", \"type\": [\"null\", \"bytes\"] }\n" +
+      "  ]\n" +
+      "}\n";
+
+  private StoreType storeType;
+  private boolean splitable;
+  private boolean statsable;
+  private Path testDir;
+  private FileSystem fs;
+
+  public TestNextFetches(StoreType type, boolean splitable, boolean statsable) throws IOException
{
+    this.storeType = type;
+    this.splitable = splitable;
+    this.statsable = statsable;
+
+    conf = new TajoConf();
+
+    if (storeType == StoreType.RCFILE) {
+      conf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, 100);
+    }
+
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    fs = testDir.getFileSystem(conf);
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][] {
+        {StoreType.CSV, true, true},
+        // TODO - to be implemented
+//        {StoreType.RAW, false, false},
+//        {StoreType.RCFILE, true, true},
+//        {StoreType.PARQUET, false, false},
+//        {StoreType.SEQUENCEFILE, true, true},
+//        {StoreType.AVRO, false, false},
+    });
+  }
+
+	@Test
+  public void testSplitable() throws IOException {
+    if (splitable) {
+      Schema schema = new Schema();
+      schema.addColumn("id", Type.INT4);
+      schema.addColumn("age", Type.INT8);
+
+      TableMeta meta = CatalogUtil.newTableMeta(storeType);
+      Path tablePath = new Path(testDir, "Splitable.data");
+      Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta,
schema, tablePath);
+      appender.enableStats();
+      appender.init();
+      int tupleNum = 10000;
+      VTuple vTuple;
+
+      for (int i = 0; i < tupleNum; i++) {
+        vTuple = new VTuple(2);
+        vTuple.put(0, DatumFactory.createInt4(i + 1));
+        vTuple.put(1, DatumFactory.createInt8(25l));
+        appender.addTuple(vTuple);
+      }
+      appender.close();
+      TableStats stat = appender.getStats();
+      assertEquals(tupleNum, stat.getNumRows().longValue());
+
+      FileStatus status = fs.getFileStatus(tablePath);
+      long fileLen = status.getLen();
+      long randomNum = (long) (Math.random() * fileLen) + 1;
+
+      FileFragment[] tablets = new FileFragment[2];
+      tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum);
+      tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum));
+
+      Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema,
tablets[0], schema);
+      assertTrue(scanner.isSplittable());
+      scanner.init();
+      int tupleCnt = 0;
+
+      OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
+      rowBlock.setRows(1024);
+
+      while (scanner.nextFetch(rowBlock)) {
+        tupleCnt += rowBlock.rows();
+      }
+      scanner.close();
+
+      scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[1],
schema);
+      assertTrue(scanner.isSplittable());
+      scanner.init();
+      while (scanner.nextFetch(rowBlock)) {
+        tupleCnt += rowBlock.rows();
+      }
+      scanner.close();
+
+      assertEquals(tupleNum, tupleCnt);
+
+      rowBlock.release();
+    }
+	}
+
+  @Test
+  public void testSplitableForRCFileBug() throws IOException {
+    if (storeType == StoreType.RCFILE) {
+      Schema schema = new Schema();
+      schema.addColumn("id", Type.INT4);
+      schema.addColumn("age", Type.INT8);
+
+      TableMeta meta = CatalogUtil.newTableMeta(storeType);
+      Path tablePath = new Path(testDir, "Splitable.data");
+      Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta,
schema, tablePath);
+      appender.enableStats();
+      appender.init();
+      int tupleNum = 10000;
+      VTuple vTuple;
+
+      for (int i = 0; i < tupleNum; i++) {
+        vTuple = new VTuple(2);
+        vTuple.put(0, DatumFactory.createInt4(i + 1));
+        vTuple.put(1, DatumFactory.createInt8(25l));
+        appender.addTuple(vTuple);
+      }
+      appender.close();
+      TableStats stat = appender.getStats();
+      assertEquals(tupleNum, stat.getNumRows().longValue());
+
+      FileStatus status = fs.getFileStatus(tablePath);
+      long fileLen = status.getLen();
+      long randomNum = 122; // header size
+
+      FileFragment[] tablets = new FileFragment[2];
+      tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum);
+      tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum));
+
+      Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema,
tablets[0], schema);
+      assertTrue(scanner.isSplittable());
+      scanner.init();
+      int tupleCnt = 0;
+
+      OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
+      rowBlock.setRows(1024);
+
+      while (scanner.nextFetch(rowBlock)) {
+        tupleCnt += rowBlock.rows();
+      }
+      scanner.close();
+
+      scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[1],
schema);
+      assertTrue(scanner.isSplittable());
+      scanner.init();
+      while (scanner.nextFetch(rowBlock)) {
+        tupleCnt += rowBlock.rows();
+      }
+      scanner.close();
+
+      assertEquals(tupleNum, tupleCnt);
+
+      rowBlock.release();
+    }
+  }
+
+  @Test
+  public void testProjection() throws IOException {
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.INT8);
+    schema.addColumn("score", Type.FLOAT4);
+
+    TableMeta meta = CatalogUtil.newTableMeta(storeType);
+    meta.setOptions(StorageUtil.newPhysicalProperties(storeType));
+    if (storeType == StoreType.AVRO) {
+      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
+                     TEST_PROJECTION_AVRO_SCHEMA);
+    }
+
+    Path tablePath = new Path(testDir, "testProjection.data");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema,
tablePath);
+    appender.init();
+    int tupleNum = 10000;
+    VTuple vTuple;
+
+    for (int i = 0; i < tupleNum; i++) {
+      vTuple = new VTuple(3);
+      vTuple.put(0, DatumFactory.createInt4(i + 1));
+      vTuple.put(1, DatumFactory.createInt8(i + 2));
+      vTuple.put(2, DatumFactory.createFloat4(i + 3));
+      appender.addTuple(vTuple);
+    }
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    FileFragment fragment = new FileFragment("testReadAndWrite", tablePath, 0, status.getLen());
+
+    Schema target = new Schema();
+    target.addColumn("age", Type.INT8);
+    target.addColumn("score", Type.FLOAT4);
+    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema,
fragment, target);
+    scanner.init();
+    int tupleCnt = 0;
+
+    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
+    rowBlock.setRows(1024);
+
+    ZeroCopyTuple tuple = new ZeroCopyTuple();
+    while (scanner.nextFetch(rowBlock)) {
+      RowBlockReader reader = rowBlock.getReader();
+      while (reader.next(tuple)) {
+        if (storeType == StoreType.RCFILE
+            || storeType == StoreType.TREVNI
+            || storeType == StoreType.CSV
+            || storeType == StoreType.PARQUET
+            || storeType == StoreType.SEQUENCEFILE
+            || storeType == StoreType.AVRO) {
+          assertTrue(tuple.isNull(0));
+        }
+        assertTrue(tupleCnt + 2 == tuple.getInt8(1));
+        assertTrue(tupleCnt + 3 == tuple.getFloat4(2));
+        tupleCnt++;
+      }
+    }
+    scanner.close();
+
+    assertEquals(tupleNum, tupleCnt);
+
+    rowBlock.release();
+  }
+
+  @Test
+  public void testVariousTypes() throws IOException {
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.BOOLEAN);
+    schema.addColumn("col2", Type.CHAR, 7);
+    schema.addColumn("col3", Type.INT2);
+    schema.addColumn("col4", Type.INT4);
+    schema.addColumn("col5", Type.INT8);
+    schema.addColumn("col6", Type.FLOAT4);
+    schema.addColumn("col7", Type.FLOAT8);
+    schema.addColumn("col8", Type.TEXT);
+    schema.addColumn("col9", Type.BLOB);
+    schema.addColumn("col10", Type.INET4);
+    schema.addColumn("col11", Type.NULL_TYPE);
+    schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+    KeyValueSet options = new KeyValueSet();
+    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    meta.setOptions(StorageUtil.newPhysicalProperties(storeType));
+    if (storeType == StoreType.AVRO) {
+      String path = FileUtil.getResourcePath("testVariousTypes.avsc").toString();
+      meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path);
+    }
+
+    Path tablePath = new Path(testDir, "testVariousTypes.data");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema,
tablePath);
+    appender.init();
+
+    QueryId queryid = new QueryId("12345", 5);
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+    Tuple tuple = new VTuple(12);
+    tuple.put(new Datum[] {
+        DatumFactory.createBool(true),
+        DatumFactory.createChar("hyunsik"),
+        DatumFactory.createInt2((short) 17),
+        DatumFactory.createInt4(59),
+        DatumFactory.createInt8(23l),
+        DatumFactory.createFloat4(77.9f),
+        DatumFactory.createFloat8(271.9f),
+        DatumFactory.createText("hyunsik"),
+        DatumFactory.createBlob("hyunsik".getBytes()),
+        DatumFactory.createInet4("192.168.0.1"),
+        NullDatum.get(),
+        factory.createDatum(queryid.getProto())
+    });
+    appender.addTuple(tuple);
+    appender.flush();
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner =  StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema,
fragment);
+    scanner.init();
+
+    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
+    rowBlock.setRows(1024);
+
+    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+    while (scanner.nextFetch(rowBlock)) {
+      RowBlockReader reader = rowBlock.getReader();
+      while (reader.next(zcTuple)) {
+        for (int i = 0; i < tuple.size(); i++) {
+          assertEquals(tuple.get(i), zcTuple.get(i));
+        }
+      }
+    }
+    scanner.close();
+
+    rowBlock.release();
+  }
+
+  @Test
+  public void testNullHandlingTypes() throws IOException {
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.BOOLEAN);
+    schema.addColumn("col2", Type.CHAR, 7);
+    schema.addColumn("col3", Type.INT2);
+    schema.addColumn("col4", Type.INT4);
+    schema.addColumn("col5", Type.INT8);
+    schema.addColumn("col6", Type.FLOAT4);
+    schema.addColumn("col7", Type.FLOAT8);
+    schema.addColumn("col8", Type.TEXT);
+    schema.addColumn("col9", Type.BLOB);
+    schema.addColumn("col10", Type.INET4);
+    schema.addColumn("col11", Type.NULL_TYPE);
+    schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+    KeyValueSet options = new KeyValueSet();
+    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    meta.setOptions(StorageUtil.newPhysicalProperties(storeType));
+    meta.putOption(StorageConstants.CSVFILE_NULL, "\\\\N");
+    meta.putOption(StorageConstants.RCFILE_NULL, "\\\\N");
+    meta.putOption(StorageConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName());
+    meta.putOption(StorageConstants.SEQUENCEFILE_NULL, "\\");
+    if (storeType == StoreType.AVRO) {
+      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
+                     TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA);
+    }
+
+    Path tablePath = new Path(testDir, "testVariousTypes.data");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema,
tablePath);
+    appender.init();
+
+    QueryId queryid = new QueryId("12345", 5);
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+    Tuple seedTuple = new VTuple(12);
+    seedTuple.put(new Datum[]{
+        DatumFactory.createBool(true),                // 0
+        DatumFactory.createChar("hyunsik"),           // 1
+        DatumFactory.createInt2((short) 17),          // 2
+        DatumFactory.createInt4(59),                  // 3
+        DatumFactory.createInt8(23l),                 // 4
+        DatumFactory.createFloat4(77.9f),             // 5
+        DatumFactory.createFloat8(271.9f),            // 6
+        DatumFactory.createText("hyunsik"),           // 7
+        DatumFactory.createBlob("hyunsik".getBytes()),// 8
+        DatumFactory.createInet4("192.168.0.1"),      // 9
+        NullDatum.get(),                              // 10
+        factory.createDatum(queryid.getProto())       // 11
+    });
+
+    // Making tuples with different null column positions
+    Tuple tuple;
+    for (int i = 0; i < 12; i++) {
+      tuple = new VTuple(12);
+      for (int j = 0; j < 12; j++) {
+        if (i == j) { // i'th column will have NULL value
+          tuple.put(j, NullDatum.get());
+        } else {
+          tuple.put(j, seedTuple.get(j));
+        }
+      }
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema,
fragment);
+    scanner.init();
+
+    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
+    rowBlock.setRows(1024);
+
+    ZeroCopyTuple retrieved = new ZeroCopyTuple();
+
+    int i = 0;
+    while (scanner.nextFetch(rowBlock)) {
+      RowBlockReader reader = rowBlock.getReader();
+
+      while(reader.next(retrieved)) {
+        assertEquals(12, retrieved.size());
+        for (int j = 0; j < 12; j++) {
+          if (i == j) {
+            assertEquals(NullDatum.get(), retrieved.get(j));
+          } else {
+            assertEquals(seedTuple.get(j), retrieved.get(j));
+          }
+        }
+
+        i++;
+      }
+    }
+    scanner.close();
+
+    rowBlock.release();
+  }
+
+  @Test
+  public void testRCFileTextSerializeDeserialize() throws IOException {
+    if(storeType != StoreType.RCFILE) return;
+
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.BOOLEAN);
+    schema.addColumn("col2", Type.CHAR, 7);
+    schema.addColumn("col3", Type.INT2);
+    schema.addColumn("col4", Type.INT4);
+    schema.addColumn("col5", Type.INT8);
+    schema.addColumn("col6", Type.FLOAT4);
+    schema.addColumn("col7", Type.FLOAT8);
+    schema.addColumn("col8", Type.TEXT);
+    schema.addColumn("col9", Type.BLOB);
+    schema.addColumn("col10", Type.INET4);
+    schema.addColumn("col11", Type.NULL_TYPE);
+    schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+    KeyValueSet options = new KeyValueSet();
+    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    meta.putOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName());
+
+    Path tablePath = new Path(testDir, "testVariousTypes.data");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema,
tablePath);
+    appender.enableStats();
+    appender.init();
+
+    QueryId queryid = new QueryId("12345", 5);
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+    Tuple tuple = new VTuple(12);
+    tuple.put(new Datum[] {
+        DatumFactory.createBool(true),
+        DatumFactory.createChar("jinho"),
+        DatumFactory.createInt2((short) 17),
+        DatumFactory.createInt4(59),
+        DatumFactory.createInt8(23l),
+        DatumFactory.createFloat4(77.9f),
+        DatumFactory.createFloat8(271.9f),
+        DatumFactory.createText("jinho"),
+        DatumFactory.createBlob("hyunsik babo".getBytes()),
+        DatumFactory.createInet4("192.168.0.1"),
+        NullDatum.get(),
+        factory.createDatum(queryid.getProto())
+    });
+    appender.addTuple(tuple);
+    appender.flush();
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner =  StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema,
fragment);
+    scanner.init();
+
+    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
+    rowBlock.setRows(1024);
+
+    ZeroCopyTuple retrieved = new ZeroCopyTuple();
+    while (scanner.nextFetch(rowBlock)) {
+      RowBlockReader reader = rowBlock.getReader();
+      while (reader.next(retrieved)) {
+        for (int i = 0; i < tuple.size(); i++) {
+          assertEquals(tuple.get(i), retrieved.get(i));
+        }
+      }
+    }
+    scanner.close();
+    assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+    assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+
+    rowBlock.release();
+  }
+
+  @Test
+  public void testRCFileBinarySerializeDeserialize() throws IOException {
+    if(storeType != StoreType.RCFILE) return;
+
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.BOOLEAN);
+    schema.addColumn("col2", Type.CHAR, 7);
+    schema.addColumn("col3", Type.INT2);
+    schema.addColumn("col4", Type.INT4);
+    schema.addColumn("col5", Type.INT8);
+    schema.addColumn("col6", Type.FLOAT4);
+    schema.addColumn("col7", Type.FLOAT8);
+    schema.addColumn("col8", Type.TEXT);
+    schema.addColumn("col9", Type.BLOB);
+    schema.addColumn("col10", Type.INET4);
+    schema.addColumn("col11", Type.NULL_TYPE);
+    schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+    KeyValueSet options = new KeyValueSet();
+    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    meta.putOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName());
+
+    Path tablePath = new Path(testDir, "testVariousTypes.data");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema,
tablePath);
+    appender.enableStats();
+    appender.init();
+
+    QueryId queryid = new QueryId("12345", 5);
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+    Tuple tuple = new VTuple(12);
+    tuple.put(new Datum[] {
+        DatumFactory.createBool(true),
+        DatumFactory.createBit((byte) 0x99),
+        DatumFactory.createChar("jinho"),
+        DatumFactory.createInt2((short) 17),
+        DatumFactory.createInt4(59),
+        DatumFactory.createInt8(23l),
+        DatumFactory.createFloat4(77.9f),
+        DatumFactory.createFloat8(271.9f),
+        DatumFactory.createText("jinho"),
+        DatumFactory.createBlob("hyunsik babo".getBytes()),
+        DatumFactory.createInet4("192.168.0.1"),
+        NullDatum.get(),
+        factory.createDatum(queryid.getProto())
+    });
+    appender.addTuple(tuple);
+    appender.flush();
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner =  StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema,
fragment);
+    scanner.init();
+
+    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
+    rowBlock.setRows(1024);
+
+    ZeroCopyTuple retrieved = new ZeroCopyTuple();
+    while (scanner.nextFetch(rowBlock)) {
+      RowBlockReader reader = rowBlock.getReader();
+      while (reader.next(retrieved)) {
+        for (int i = 0; i < tuple.size(); i++) {
+          assertEquals(tuple.get(i), retrieved.get(i));
+        }
+      }
+    }
+    scanner.close();
+    assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+    assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+
+    rowBlock.release();
+  }
+
+  @Test
+  public void testSequenceFileTextSerializeDeserialize() throws IOException {
+    if(storeType != StoreType.SEQUENCEFILE) return;
+
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.BOOLEAN);
+    schema.addColumn("col2", Type.CHAR, 7);
+    schema.addColumn("col3", Type.INT2);
+    schema.addColumn("col4", Type.INT4);
+    schema.addColumn("col5", Type.INT8);
+    schema.addColumn("col6", Type.FLOAT4);
+    schema.addColumn("col7", Type.FLOAT8);
+    schema.addColumn("col8", Type.TEXT);
+    schema.addColumn("col9", Type.BLOB);
+    schema.addColumn("col10", Type.INET4);
+    schema.addColumn("col11", Type.NULL_TYPE);
+    schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+    KeyValueSet options = new KeyValueSet();
+    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName());
+
+    Path tablePath = new Path(testDir, "testVariousTypes.data");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema,
tablePath);
+    appender.enableStats();
+    appender.init();
+
+    QueryId queryid = new QueryId("12345", 5);
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+    Tuple tuple = new VTuple(12);
+    tuple.put(new Datum[] {
+        DatumFactory.createBool(true),
+        DatumFactory.createChar("jinho"),
+        DatumFactory.createInt2((short) 17),
+        DatumFactory.createInt4(59),
+        DatumFactory.createInt8(23l),
+        DatumFactory.createFloat4(77.9f),
+        DatumFactory.createFloat8(271.9f),
+        DatumFactory.createText("jinho"),
+        DatumFactory.createBlob("hyunsik babo".getBytes()),
+        DatumFactory.createInet4("192.168.0.1"),
+        NullDatum.get(),
+        factory.createDatum(queryid.getProto())
+    });
+    appender.addTuple(tuple);
+    appender.flush();
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner =  StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema,
fragment);
+    scanner.init();
+
+    assertTrue(scanner instanceof SequenceFileScanner);
+    Writable key = ((SequenceFileScanner) scanner).getKey();
+    assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName());
+
+    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
+    rowBlock.setRows(1024);
+
+    ZeroCopyTuple retrieved = new ZeroCopyTuple();
+
+    while (scanner.nextFetch(rowBlock)) {
+      RowBlockReader reader = rowBlock.getReader();
+      while (reader.next(retrieved)) {
+        for (int i = 0; i < tuple.size(); i++) {
+          assertEquals(tuple.get(i), retrieved.get(i));
+        }
+      }
+    }
+    scanner.close();
+    assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+    assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+
+    rowBlock.release();
+  }
+
+  @Test
+  public void testSequenceFileBinarySerializeDeserialize() throws IOException {
+    if(storeType != StoreType.SEQUENCEFILE) return;
+
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.BOOLEAN);
+    schema.addColumn("col2", Type.BIT);
+    schema.addColumn("col3", Type.CHAR, 7);
+    schema.addColumn("col4", Type.INT2);
+    schema.addColumn("col5", Type.INT4);
+    schema.addColumn("col6", Type.INT8);
+    schema.addColumn("col7", Type.FLOAT4);
+    schema.addColumn("col8", Type.FLOAT8);
+    schema.addColumn("col9", Type.TEXT);
+    schema.addColumn("col10", Type.BLOB);
+    schema.addColumn("col11", Type.INET4);
+    schema.addColumn("col12", Type.NULL_TYPE);
+    schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+
+    KeyValueSet options = new KeyValueSet();
+    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, BinarySerializerDeserializer.class.getName());
+
+    Path tablePath = new Path(testDir, "testVariousTypes.data");
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema,
tablePath);
+    appender.enableStats();
+    appender.init();
+
+    QueryId queryid = new QueryId("12345", 5);
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+
+    Tuple tuple = new VTuple(13);
+    tuple.put(new Datum[] {
+        DatumFactory.createBool(true),
+        DatumFactory.createBit((byte) 0x99),
+        DatumFactory.createChar("jinho"),
+        DatumFactory.createInt2((short) 17),
+        DatumFactory.createInt4(59),
+        DatumFactory.createInt8(23l),
+        DatumFactory.createFloat4(77.9f),
+        DatumFactory.createFloat8(271.9f),
+        DatumFactory.createText("jinho"),
+        DatumFactory.createBlob("hyunsik babo".getBytes()),
+        DatumFactory.createInet4("192.168.0.1"),
+        NullDatum.get(),
+        factory.createDatum(queryid.getProto())
+    });
+    appender.addTuple(tuple);
+    appender.flush();
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
+
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner =  StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema,
fragment);
+    scanner.init();
+
+    assertTrue(scanner instanceof SequenceFileScanner);
+    Writable key = ((SequenceFileScanner) scanner).getKey();
+    assertEquals(key.getClass().getCanonicalName(), BytesWritable.class.getCanonicalName());
+
+    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
+    rowBlock.setRows(1024);
+
+    ZeroCopyTuple retrieved = new ZeroCopyTuple();
+
+    while (scanner.nextFetch(rowBlock)) {
+      RowBlockReader reader = rowBlock.getReader();
+      while (reader.next(retrieved)) {
+        for (int i = 0; i < tuple.size(); i++) {
+          assertEquals(tuple.get(i), retrieved.get(i));
+        }
+      }
+    }
+    scanner.close();
+    assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue());
+    assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue());
+  }
+
+  @Test
+  public void testTime() throws IOException {
+    if (storeType == StoreType.CSV || storeType == StoreType.RAW) {
+      Schema schema = new Schema();
+      schema.addColumn("col1", Type.DATE);
+      schema.addColumn("col2", Type.TIME);
+      schema.addColumn("col3", Type.TIMESTAMP);
+
+      KeyValueSet options = new KeyValueSet();
+      TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+
+      Path tablePath = new Path(testDir, "testTime.data");
+      Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta,
schema, tablePath);
+      appender.init();
+
+      Tuple tuple = new VTuple(3);
+      tuple.put(new Datum[]{
+          DatumFactory.createDate("1980-04-01"),
+          DatumFactory.createTime("12:34:56"),
+          DatumFactory.createTimestmpDatumWithUnixTime((int)(System.currentTimeMillis() /
1000))
+      });
+      appender.addTuple(tuple);
+      appender.flush();
+      appender.close();
+
+      FileStatus status = fs.getFileStatus(tablePath);
+      FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+      Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema,
fragment);
+      scanner.init();
+
+      OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
+      rowBlock.setRows(1024);
+
+      ZeroCopyTuple retrieved = new ZeroCopyTuple();
+
+      while (scanner.nextFetch(rowBlock)) {
+        RowBlockReader reader = rowBlock.getReader();
+        while (reader.next(retrieved)) {
+          for (int i = 0; i < tuple.size(); i++) {
+            assertEquals(tuple.get(i), retrieved.get(i));
+          }
+        }
+      }
+      scanner.close();
+
+      rowBlock.release();
+    }
+  }
+
+}


Mime
View raw message