tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject git commit: TAJO-711: Add Avro storage support. (David Chen via hyunsik)
Date Wed, 16 Apr 2014 08:09:40 GMT
Repository: tajo
Updated Branches:
  refs/heads/master 014f9f3ef -> 8da52ede0


TAJO-711: Add Avro storage support. (David Chen via hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8da52ede
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8da52ede
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8da52ede

Branch: refs/heads/master
Commit: 8da52ede0afecdc9c3302748d322ee4929986d5a
Parents: 014f9f3
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Wed Apr 16 17:09:20 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Wed Apr 16 17:09:20 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 pom.xml                                         |   3 +-
 .../org/apache/tajo/catalog/CatalogUtil.java    |   2 +
 .../src/main/proto/CatalogProtos.proto          | 127 ++++----
 tajo-storage/pom.xml                            |   6 +-
 .../apache/tajo/storage/StorageConstants.java   |   3 +
 .../apache/tajo/storage/avro/AvroAppender.java  | 219 ++++++++++++++
 .../apache/tajo/storage/avro/AvroScanner.java   | 286 +++++++++++++++++++
 .../org/apache/tajo/storage/avro/AvroUtil.java  |  47 +++
 .../apache/tajo/storage/avro/package-info.java  |  85 ++++++
 .../src/main/resources/storage-default.xml      |  23 +-
 .../apache/tajo/storage/TestMergeScanner.java   |  23 +-
 .../org/apache/tajo/storage/TestStorages.java   |  59 +++-
 .../apache/tajo/storage/v2/TestStorages.java    |  47 ++-
 .../src/test/resources/storage-default.xml      |  22 +-
 .../src/test/resources/testVariousTypes.avsc    |  21 ++
 16 files changed, 891 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/8da52ede/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a643f1a..16b7f29 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,8 @@ Release 0.8.0 - unreleased
 
   NEW FEATURES
 
+    TAJO-711: Add Avro storage support. (David Chen via hyunsik)
+
     TAJO-746: Implements function COALESCE. (hyoungjunkim via hyunsik)
 
     TAJO-616: SequenceFile support. (jaehwa)

http://git-wip-us.apache.org/repos/asf/tajo/blob/8da52ede/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d09559e..bd3358a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,8 +75,6 @@
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
     <tajo.version>0.8.0-SNAPSHOT</tajo.version>
     <tajo.root>${basedir}</tajo.root>
-    <parquet.version>1.3.2</parquet.version>
-    <parquet.format.version>2.0.0</parquet.format.version>
   </properties>
 
   <modules>
@@ -349,6 +347,7 @@
             <exclude>**/*.tbl</exclude>
             <exclude>**/*.js</exclude>
             <exclude>**/*.result</exclude>
+            <exclude>**/*.avsc</exclude>
             <!-- generated content -->
             <exclude>**/target/**</exclude>
             <exclude>**/*.log</exclude>

http://git-wip-us.apache.org/repos/asf/tajo/blob/8da52ede/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index 13eed38..1dd33b2 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -290,6 +290,8 @@ public class CatalogUtil {
       return StoreType.PARQUET;
     } else if (typeStr.equalsIgnoreCase(StoreType.SEQUENCEFILE.name())) {
       return StoreType.SEQUENCEFILE;
+    } else if (typeStr.equalsIgnoreCase(StoreType.AVRO.name())) {
+      return StoreType.AVRO;
     } else {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8da52ede/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index 7ec68c5..7f41596 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -25,28 +25,29 @@ option java_generate_equals_and_hash = true;
 import "DataTypes.proto";
 
 enum StoreType {
-	MEM = 0;
-	CSV = 1;
-	RAW = 2;
+  MEM = 0;
+  CSV = 1;
+  RAW = 2;
   RCFILE = 3;
   ROWFILE = 4;
   HCFILE = 5;
   TREVNI = 6;
   PARQUET = 7;
   SEQUENCEFILE = 8;
+  AVRO = 9;
 }
 
 enum OrderType {
-    ORDER_NONE = 0;
-    ASC = 1;
-    DSC = 2;
+  ORDER_NONE = 0;
+  ASC = 1;
+  DSC = 2;
 }
 
 enum PartitionType {
-    RANGE = 0;
-    HASH = 1;
-    LIST = 2;
-    COLUMN = 3;
+  RANGE = 0;
+  HASH = 1;
+  LIST = 2;
+  COLUMN = 3;
 }
 
 enum AlterTableType {
@@ -56,21 +57,21 @@ enum AlterTableType {
 }
 
 message ColumnProto {
-	required string name = 1;
-	required DataType dataType = 3;
+  required string name = 1;
+  required DataType dataType = 3;
 }
 
 message SchemaProto {
-	repeated ColumnProto fields = 1;
+  repeated ColumnProto fields = 1;
 }
 
 message KeyValueProto {
-	required string key = 1;
-	required string value = 2;
+  required string key = 1;
+  required string value = 2;
 }
 
 message KeyValueSetProto {
-	repeated KeyValueProto keyval = 1;
+  repeated KeyValueProto keyval = 1;
 }
 
 message FragmentProto {
@@ -103,11 +104,11 @@ message CreateDatabaseRequest {
 }
 
 message TableDescProto {
-	required string table_name = 2;
-	optional string path = 3;
-	required TableProto meta = 4;
-	required SchemaProto schema = 5;
-	optional TableStatsProto stats = 6;
+  required string table_name = 2;
+  optional string path = 3;
+  required TableProto meta = 4;
+  required SchemaProto schema = 5;
+  optional TableStatsProto stats = 6;
   optional PartitionMethodProto partition = 7;
   optional bool isExternal = 8 [default = false];
 }
@@ -126,46 +127,46 @@ enum FunctionType {
   GENERAL = 0;
   AGGREGATION = 1;
   DISTINCT_AGGREGATION = 2;
-	UDF = 3;
-	UDA = 4;
-	DISTINCT_UDA = 5;
+  UDF = 3;
+  UDA = 4;
+  DISTINCT_UDA = 5;
 }
 
 message FunctionDescProto {
-	required string signature = 1;
-	required string className = 2;
-	required FunctionType type = 3;
-	repeated DataType parameterTypes = 4;
-	required DataType returnType = 5;
+  required string signature = 1;
+  required string className = 2;
+  required FunctionType type = 3;
+  repeated DataType parameterTypes = 4;
+  required DataType returnType = 5;
   optional string description = 6;
   optional string example = 7;
   optional string detail = 8;
 }
 
 message IndexDescProto {
-    required TableIdentifierProto tableIdentifier = 1;
-    required string indexName = 2;
-    required ColumnProto column = 3;
-    required IndexMethod indexMethod = 4;
-    optional bool isUnique = 5 [default = false];
-    optional bool isClustered = 6 [default = false];
-    optional bool isAscending = 7 [default = false];
+  required TableIdentifierProto tableIdentifier = 1;
+  required string indexName = 2;
+  required ColumnProto column = 3;
+  required IndexMethod indexMethod = 4;
+  optional bool isUnique = 5 [default = false];
+  optional bool isClustered = 6 [default = false];
+  optional bool isAscending = 7 [default = false];
 }
 
 enum IndexMethod {
-    TWO_LEVEL_BIN_TREE = 0;
-    BTREE_IDX = 1;
-    HASH_IDX = 2;
-    BITMAP_IDX = 3;
+  TWO_LEVEL_BIN_TREE = 0;
+  BTREE_IDX = 1;
+  HASH_IDX = 2;
+  BITMAP_IDX = 3;
 }
 
 message GetAllTableNamesResponse {
-    repeated string tableName = 1;
+  repeated string tableName = 1;
 }
 
 message GetIndexByColumnRequest {
-    required TableIdentifierProto tableIdentifier = 1;
-    required string columnName = 2;
+  required TableIdentifierProto tableIdentifier = 1;
+  required string columnName = 2;
 }
 
 message IndexNameProto {
@@ -175,41 +176,41 @@ message IndexNameProto {
 }
 
 message GetFunctionsResponse {
-	repeated FunctionDescProto functionDesc = 1;
+  repeated FunctionDescProto functionDesc = 1;
 }
 
 message UnregisterFunctionRequest {
-	required string signature = 1;
+  required string signature = 1;
 }
 
 message GetFunctionMetaRequest {
-	required string signature = 1;
-	optional FunctionType functionType = 2;
-	repeated DataType parameterTypes = 3;
+  required string signature = 1;
+  optional FunctionType functionType = 2;
+  repeated DataType parameterTypes = 3;
 }
 
 message ContainFunctionRequest {
-	required string signature = 1;
-	optional FunctionType functionType = 2;
-	repeated DataType parameterTypes = 3;
+  required string signature = 1;
+  optional FunctionType functionType = 2;
+  repeated DataType parameterTypes = 3;
 }
 
 message TableStatsProto {
-	required int64 numRows = 1;
-	required int64 numBytes = 2;
-	optional int32 numBlocks = 4;
-	optional int32 numShuffleOutputs = 5;
-	optional int64 avgRows = 6;
-	optional int64 readBytes = 7;
-	repeated ColumnStatsProto colStat = 8;
+  required int64 numRows = 1;
+  required int64 numBytes = 2;
+  optional int32 numBlocks = 4;
+  optional int32 numShuffleOutputs = 5;
+  optional int64 avgRows = 6;
+  optional int64 readBytes = 7;
+  repeated ColumnStatsProto colStat = 8;
 }
 
 message ColumnStatsProto {
-    required ColumnProto column = 1;
-    optional int64 numDistVal = 2;
-    optional int64 numNulls = 3;
-    optional bytes minValue = 4;
-    optional bytes maxValue = 5;
+  required ColumnProto column = 1;
+  optional int64 numDistVal = 2;
+  optional int64 numNulls = 3;
+  optional bytes minValue = 4;
+  optional bytes maxValue = 5;
 }
 
 enum StatType {

http://git-wip-us.apache.org/repos/asf/tajo/blob/8da52ede/tajo-storage/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/pom.xml b/tajo-storage/pom.xml
index ad81f71..6f552f3 100644
--- a/tajo-storage/pom.xml
+++ b/tajo-storage/pom.xml
@@ -35,6 +35,8 @@
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+    <parquet.version>1.3.2</parquet.version>
+    <parquet.format.version>2.0.0</parquet.format.version>
   </properties>
 
   <repositories>
@@ -176,10 +178,6 @@
           <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-core</artifactId>
         </exclusion>
-        <exclusion>
-          <artifactId>avro-mapred</artifactId>
-          <groupId>org.apache.avro</groupId>
-        </exclusion>
       </exclusions>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/tajo/blob/8da52ede/tajo-storage/src/main/java/org/apache/tajo/storage/StorageConstants.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageConstants.java
index 414fa35..4b238f9 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageConstants.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageConstants.java
@@ -48,6 +48,9 @@ public class StorageConstants {
   public static final String PARQUET_DEFAULT_IS_DICTIONARY_ENABLED;
   public static final String PARQUET_DEFAULT_IS_VALIDATION_ENABLED;
 
+  public static final String AVRO_SCHEMA_LITERAL = "avro.schema.literal";
+  public static final String AVRO_SCHEMA_URL = "avro.schema.url";
+
   static {
     PARQUET_DEFAULT_BLOCK_SIZE =
         Integer.toString(ParquetWriter.DEFAULT_BLOCK_SIZE);

http://git-wip-us.apache.org/repos/asf/tajo/blob/8da52ede/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
new file mode 100644
index 0000000..6af8da0
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.avro;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.FileAppender;
+import org.apache.tajo.storage.TableStatistics;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+/**
+ * FileAppender for writing to Avro files.
+ */
+public class AvroAppender extends FileAppender {
+  private TableStatistics stats;
+  private Schema avroSchema;
+  private List<Schema.Field> avroFields;
+  private DataFileWriter<GenericRecord> dataFileWriter;
+
+  /**
+   * Creates a new AvroAppender.
+   *
+   * @param conf Configuration properties.
+   * @param schema The table schema.
+   * @param meta The table metadata.
+   * @param path The path of the Parquet file to write to.
+   */
+  public AvroAppender(Configuration conf,
+                      org.apache.tajo.catalog.Schema schema,
+                      TableMeta meta, Path path) throws IOException {
+    super(conf, schema, meta, path);
+  }
+
+  /**
+   * Initializes the Appender.
+   */
+  public void init() throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    if (!fs.exists(path.getParent())) {
+      throw new FileNotFoundException(path.toString());
+    }
+    FSDataOutputStream outputStream = fs.create(path);
+
+    avroSchema = AvroUtil.getAvroSchema(meta, conf);
+    avroFields = avroSchema.getFields();
+
+    DatumWriter<GenericRecord> datumWriter =
+        new GenericDatumWriter<GenericRecord>(avroSchema);
+    dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
+    dataFileWriter.create(avroSchema, outputStream);
+
+    if (enabledStats) {
+      this.stats = new TableStatistics(schema);
+    }
+    super.init();
+  }
+
+  /**
+   * Gets the current offset. Tracking offsets is currenly not implemented, so
+   * this method always returns 0.
+   *
+   * @return 0
+   */
+  @Override
+  public long getOffset() throws IOException {
+    return 0;
+  }
+
+  private Object getPrimitive(Tuple tuple, int i, Schema.Type avroType) {
+    if (tuple.get(i) instanceof NullDatum) {
+      return null;
+    }
+    switch (avroType) {
+      case NULL:
+        return null;
+      case BOOLEAN:
+        return tuple.getBool(i);
+      case INT:
+        return tuple.getInt4(i);
+      case LONG:
+        return tuple.getInt8(i);
+      case FLOAT:
+        return tuple.getFloat4(i);
+      case DOUBLE:
+        return tuple.getFloat8(i);
+      case BYTES:
+      case FIXED:
+        return ByteBuffer.wrap(tuple.getBytes(i));
+      case STRING:
+        return tuple.getText(i);
+      default:
+        throw new RuntimeException("Unknown primitive type.");
+    }
+  }
+
+  /**
+   * Write a Tuple to the Avro file.
+   *
+   * @param tuple The Tuple to write.
+   */
+  @Override
+  public void addTuple(Tuple tuple) throws IOException {
+    GenericRecord record = new GenericData.Record(avroSchema);
+    for (int i = 0; i < schema.size(); ++i) {
+      Column column = schema.getColumn(i);
+      if (enabledStats) {
+        stats.analyzeField(i, tuple.get(i));
+      }
+      Object value;
+      Schema.Field avroField = avroFields.get(i);
+      Schema.Type avroType = avroField.schema().getType();
+      switch (avroType) {
+        case NULL:
+        case BOOLEAN:
+        case INT:
+        case LONG:
+        case FLOAT:
+        case DOUBLE:
+        case BYTES:
+        case STRING:
+        case FIXED:
+          value = getPrimitive(tuple, i, avroType);
+          break;
+        case RECORD:
+          throw new RuntimeException("Avro RECORD not supported.");
+        case ENUM:
+          throw new RuntimeException("Avro ENUM not supported.");
+        case MAP:
+          throw new RuntimeException("Avro MAP not supported.");
+        case UNION:
+          List<Schema> schemas = avroField.schema().getTypes();
+          if (schemas.size() != 2) {
+            throw new RuntimeException("Avro UNION not supported.");
+          }
+          if (schemas.get(0).getType().equals(Schema.Type.NULL)) {
+            value = getPrimitive(tuple, i, schemas.get(1).getType());
+          } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) {
+            value = getPrimitive(tuple, i, schemas.get(0).getType());
+          } else {
+            throw new RuntimeException("Avro UNION not supported.");
+          }
+          break;
+        default:
+          throw new RuntimeException("Unknown type: " + avroType);
+      }
+      record.put(i, value);
+    }
+    dataFileWriter.append(record);
+
+    if (enabledStats) {
+      stats.incrementRow();
+    }
+  }
+
+  /**
+   * Flushes the current state of the file.
+   */
+  @Override
+  public void flush() throws IOException {
+    dataFileWriter.flush();
+  }
+
+  /**
+   * Closes the Appender.
+   */
+  @Override
+  public void close() throws IOException {
+    dataFileWriter.close();
+  }
+
+  /**
+   * If table statistics is enabled, retrieve the table statistics.
+   *
+   * @return Table statistics if enabled or null otherwise.
+   */
+  @Override
+  public TableStats getStats() {
+    if (enabledStats) {
+      return stats.getTableStat();
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8da52ede/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
new file mode 100644
index 0000000..816ae25
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.avro;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.storage.FileScanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.fragment.FileFragment;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * FileScanner for reading Avro files
+ */
+public class AvroScanner extends FileScanner {
+  private Schema avroSchema;
+  private List<Schema.Field> avroFields;
+  private DataFileReader<GenericRecord> dataFileReader;
+  private int[] projectionMap;
+
+  /**
+   * Creates a new AvroScanner.
+   *
+   * @param conf
+   * @param schema
+   * @param meta
+   * @param fragment
+   */
+  public AvroScanner(Configuration conf,
+                     final org.apache.tajo.catalog.Schema schema,
+                     final TableMeta meta, final FileFragment fragment) {
+    super(conf, schema, meta, fragment);
+  }
+
+  /**
+   * Initializes the AvroScanner.
+   */
+  @Override
+  public void init() throws IOException {
+    if (targets == null) {
+      targets = schema.toArray();
+    }
+    prepareProjection(targets);
+
+    avroSchema = AvroUtil.getAvroSchema(meta, conf);
+    avroFields = avroSchema.getFields();
+
+    DatumReader<GenericRecord> datumReader =
+        new GenericDatumReader<GenericRecord>(avroSchema);
+    SeekableInput input = new FsInput(fragment.getPath(), conf);
+    dataFileReader = new DataFileReader<GenericRecord>(input, datumReader);
+    super.init();
+  }
+
+  private void prepareProjection(Column[] targets) {
+    projectionMap = new int[targets.length];
+    for (int i = 0; i < targets.length; ++i) {
+      projectionMap[i] = schema.getColumnId(targets[i].getQualifiedName());
+    }
+  }
+
+  private static String fromAvroString(Object value) {
+    if (value instanceof Utf8) {
+      Utf8 utf8 = (Utf8)value;
+      return utf8.toString();
+    }
+    return value.toString();
+  }
+
+  private static Schema getNonNull(Schema schema) {
+    if (!schema.getType().equals(Schema.Type.UNION)) {
+      return schema;
+    }
+    List<Schema> schemas = schema.getTypes();
+    if (schemas.size() != 2) {
+      return schema;
+    }
+    if (schemas.get(0).getType().equals(Schema.Type.NULL)) {
+      return schemas.get(1);
+    } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) {
+      return schemas.get(0);
+    } else {
+      return schema;
+    }
+  }
+
+  private Datum convertInt(Object value, TajoDataTypes.Type tajoType) {
+    int intValue = (Integer)value;
+    switch (tajoType) {
+      case BIT:
+        return DatumFactory.createBit((byte)(intValue & 0xff));
+      case INT2:
+        return DatumFactory.createInt2((short)intValue);
+      default:
+        return DatumFactory.createInt4(intValue);
+    }
+  }
+
+  private Datum convertBytes(Object value, TajoDataTypes.Type tajoType,
+                             DataType dataType) {
+    ByteBuffer buffer = (ByteBuffer)value;
+    byte[] bytes = new byte[buffer.capacity()];
+    buffer.get(bytes, 0, bytes.length);
+    switch (tajoType) {
+      case INET4:
+        return DatumFactory.createInet4(bytes);
+      case PROTOBUF:
+        try {
+          ProtobufDatumFactory factory =
+              ProtobufDatumFactory.get(dataType.getCode());
+          Message.Builder builder = factory.newBuilder();
+          builder.mergeFrom(bytes);
+          return factory.createDatum(builder);
+        } catch (InvalidProtocolBufferException e) {
+          throw new RuntimeException(e);
+        }
+      default:
+        return new BlobDatum(bytes);
+    }
+  }
+
+  private Datum convertString(Object value, TajoDataTypes.Type tajoType) {
+    switch (tajoType) {
+      case CHAR:
+        return DatumFactory.createChar(fromAvroString(value));
+      default:
+        return DatumFactory.createText(fromAvroString(value));
+    }
+  }
+
+  /**
+   * Reads the next Tuple from the Avro file.
+   *
+   * @return The next Tuple from the Avro file or null if end of file is
+   *         reached.
+   */
+  @Override
+  public Tuple next() throws IOException {
+    if (!dataFileReader.hasNext()) {
+      return null;
+    }
+
+    Tuple tuple = new VTuple(schema.size());
+    GenericRecord record = dataFileReader.next();
+    for (int i = 0; i < projectionMap.length; ++i) {
+      int columnIndex = projectionMap[i];
+      Object value = record.get(columnIndex);
+      if (value == null) {
+        tuple.put(columnIndex, NullDatum.get());
+        continue;
+      }
+
+      // Get Avro type.
+      Schema.Field avroField = avroFields.get(columnIndex);
+      Schema nonNullAvroSchema = getNonNull(avroField.schema());
+      Schema.Type avroType = nonNullAvroSchema.getType();
+
+      // Get Tajo type.
+      Column column = schema.getColumn(columnIndex);
+      DataType dataType = column.getDataType();
+      TajoDataTypes.Type tajoType = dataType.getType();
+      switch (avroType) {
+        case NULL:
+          tuple.put(columnIndex, NullDatum.get());
+          break;
+        case BOOLEAN:
+          tuple.put(columnIndex, DatumFactory.createBool((Boolean)value));
+          break;
+        case INT:
+          tuple.put(columnIndex, convertInt(value, tajoType));
+          break;
+        case LONG:
+          tuple.put(columnIndex, DatumFactory.createInt8((Long)value));
+          break;
+        case FLOAT:
+          tuple.put(columnIndex, DatumFactory.createFloat4((Float)value));
+          break;
+        case DOUBLE:
+          tuple.put(columnIndex, DatumFactory.createFloat8((Double)value));
+          break;
+        case BYTES:
+          tuple.put(columnIndex, convertBytes(value, tajoType, dataType));
+          break;
+        case STRING:
+          tuple.put(columnIndex, convertString(value, tajoType));
+          break;
+        case RECORD:
+          throw new RuntimeException("Avro RECORD not supported.");
+        case ENUM:
+          throw new RuntimeException("Avro ENUM not supported.");
+        case MAP:
+          throw new RuntimeException("Avro MAP not supported.");
+        case UNION:
+          throw new RuntimeException("Avro UNION not supported.");
+        case FIXED:
+          tuple.put(columnIndex, new BlobDatum(((GenericFixed)value).bytes()));
+          break;
+        default:
+          throw new RuntimeException("Unknown type.");
+      }
+    }
+    return tuple;
+  }
+
+  /**
+   * Resets the scanner
+   */
+  @Override
+  public void reset() throws IOException {
+  }
+
+  /**
+   * Closes the scanner.
+   */
+  @Override
+  public void close() throws IOException {
+    if (dataFileReader != null) {
+      dataFileReader.close();
+    }
+  }
+
+  /**
+   * Returns whether this scanner is projectable.
+   *
+   * @return true
+   */
+  @Override
+  public boolean isProjectable() {
+    return true;
+  }
+
+  /**
+   * Returns whether this scanner is selectable.
+   *
+   * @return false
+   */
+  @Override
+  public boolean isSelectable() {
+    return false;
+  }
+
+  /**
+   * Returns whether this scanner is splittable.
+   *
+   * @return false
+   */
+  @Override
+  public boolean isSplittable() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8da52ede/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
new file mode 100644
index 0000000..962c63d
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.catalog.TableMeta;
+
+public class AvroUtil {
+  public static Schema getAvroSchema(TableMeta meta, Configuration conf)
+      throws IOException {
+    String schemaLiteral = meta.getOption(StorageConstants.AVRO_SCHEMA_LITERAL);
+    String schemaUrl = meta.getOption(StorageConstants.AVRO_SCHEMA_URL);
+    if (schemaLiteral == null && schemaUrl == null) {
+      throw new RuntimeException("No Avro schema for table.");
+    }
+    if (schemaLiteral != null) {
+      return new Schema.Parser().parse(schemaLiteral);
+    }
+    Path schemaPath = new Path(schemaUrl);
+    FileSystem fs = schemaPath.getFileSystem(conf);
+    FSDataInputStream inputStream = fs.open(schemaPath);
+    return new Schema.Parser().parse(inputStream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/8da52ede/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java
new file mode 100644
index 0000000..40d1545
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * <p>
+ * Provides read and write support for Avro files. Avro schemas are
+ * converted to Tajo schemas according to the following mapping of Avro
+ * and Tajo types:
+ * </p>
+ *
+ * <table>
+ *   <tr>
+ *     <th>Avro type</th>
+ *     <th>Tajo type</th>
+ *   </tr>
+ *   <tr>
+ *     <td>NULL</td>
+ *     <td>NULL_TYPE</td>
+ *   </tr>
+ *   <tr>
+ *     <td>BOOLEAN</td>
+ *     <td>BOOLEAN</td>
+ *   </tr>
+ *   <tr>
+ *     <td>INT</td>
+ *     <td>INT4</td>
+ *   </tr>
+ *   <tr>
+ *     <td>LONG</td>
+ *     <td>INT8</td>
+ *   </tr>
+ *   <tr>
+ *     <td>FLOAT</td>
+ *     <td>FLOAT4</td>
+ *   </tr>
+ *   <tr>
+ *     <td>DOUBLE</td>
+ *     <td>FLOAT8</td>
+ *   </tr>
+ *   <tr>
+ *     <td>BYTES</td>
+ *     <td>BLOB</td>
+ *   </tr>
+ *   <tr>
+ *     <td>STRING</td>
+ *     <td>TEXT</td>
+ *   </tr>
+ *   <tr>
+ *     <td>FIXED</td>
+ *     <td>BLOB</td>
+ *   </tr>
+ *   <tr>
+ *     <td>RECORD</td>
+ *     <td>Not currently supported</td>
+ *   </tr>
+ *   <tr>
+ *     <td>ENUM</td>
+ *     <td>Not currently supported.</td>
+ *   </tr>
+ *   <tr>
+ *     <td>MAP</td>
+ *     <td>Not currently supported.</td>
+ *   </tr>
+ *   <tr>
+ *     <td>UNION</td>
+ *     <td>Not currently supported.</td>
+ *   </tr>
+ * </table>
+ */
+
+package org.apache.tajo.storage.avro;

http://git-wip-us.apache.org/repos/asf/tajo/blob/8da52ede/tajo-storage/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/resources/storage-default.xml b/tajo-storage/src/main/resources/storage-default.xml
index ae1380c..4669477 100644
--- a/tajo-storage/src/main/resources/storage-default.xml
+++ b/tajo-storage/src/main/resources/storage-default.xml
@@ -40,7 +40,7 @@
   <!--- Registered Scanner Handler -->
   <property>
     <name>tajo.storage.scanner-handler</name>
-    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile</value>
+    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
   </property>
 
   <!--- Fragment Class Configurations -->
@@ -72,6 +72,10 @@
     <name>tajo.storage.fragment.sequencefile.class</name>
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
+  <property>
+    <name>tajo.storage.fragment.avro.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
 
   <!--- Scanner Handler -->
   <property>
@@ -144,10 +148,20 @@
     <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
   </property>
 
+  <property>
+    <name>tajo.storage.scanner-handler.avro.class</name>
+    <value>org.apache.tajo.storage.avro.AvroScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.v2.avro.class</name>
+    <value>org.apache.tajo.storage.avro.AvroScanner</value>
+  </property>
+
   <!--- Appender Handler -->
   <property>
     <name>tajo.storage.appender-handler</name>
-    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile</value>
+    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
   </property>
 
   <property>
@@ -184,4 +198,9 @@
     <name>tajo.storage.appender-handler.sequencefile.class</name>
     <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
   </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.avro.class</name>
+    <value>org.apache.tajo.storage.avro.AvroAppender</value>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/tajo/blob/8da52ede/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
index d50c356..5a4b092 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -51,6 +51,20 @@ public class TestMergeScanner {
   private TajoConf conf;
   AbstractStorageManager sm;
   private static String TEST_PATH = "target/test-data/TestMergeScanner";
+
+  private static String TEST_MULTIPLE_FILES_AVRO_SCHEMA =
+      "{\n" +
+      "  \"type\": \"record\",\n" +
+      "  \"namespace\": \"org.apache.tajo\",\n" +
+      "  \"name\": \"testMultipleFiles\",\n" +
+      "  \"fields\": [\n" +
+      "    { \"name\": \"id\", \"type\": \"int\" },\n" +
+      "    { \"name\": \"file\", \"type\": \"string\" },\n" +
+      "    { \"name\": \"name\", \"type\": \"string\" },\n" +
+      "    { \"name\": \"age\", \"type\": \"long\" }\n" +
+      "  ]\n" +
+      "}\n";
+
   private Path testDir;
   private StoreType storeType;
   private FileSystem fs;
@@ -68,9 +82,9 @@ public class TestMergeScanner {
         {StoreType.TREVNI},
         {StoreType.PARQUET},
         {StoreType.SEQUENCEFILE},
+        {StoreType.AVRO},
         // RowFile requires Byte-buffer read support, so we omitted RowFile.
         //{StoreType.ROWFILE},
-
     });
   }
 
@@ -78,7 +92,7 @@ public class TestMergeScanner {
   public void setup() throws Exception {
     conf = new TajoConf();
     conf.setVar(ConfVars.ROOT_DIR, TEST_PATH);
-    conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "trevni", "parquet");
+    conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "trevni", "parquet", "avro");
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     fs = testDir.getFileSystem(conf);
     sm = StorageManagerFactory.getStorageManager(conf, testDir);
@@ -95,6 +109,10 @@ public class TestMergeScanner {
     Options options = new Options();
     TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
     meta.setOptions(StorageUtil.newPhysicalProperties(storeType));
+    if (storeType == StoreType.AVRO) {
+      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
+                     TEST_MULTIPLE_FILES_AVRO_SCHEMA);
+    }
 
     Path table1Path = new Path(testDir, storeType + "_1.data");
     Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, table1Path);
@@ -176,6 +194,7 @@ public class TestMergeScanner {
       case PARQUET:
       case SEQUENCEFILE:
       case CSV:
+      case AVRO:
         return true;
       default:
         return false;

http://git-wip-us.apache.org/repos/asf/tajo/blob/8da52ede/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index 8a700ac..80d0a3b 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -51,6 +51,40 @@ public class TestStorages {
 	private TajoConf conf;
 	private static String TEST_PATH = "target/test-data/TestStorages";
 
+  private static String TEST_PROJECTION_AVRO_SCHEMA =
+      "{\n" +
+      "  \"type\": \"record\",\n" +
+      "  \"namespace\": \"org.apache.tajo\",\n" +
+      "  \"name\": \"testProjection\",\n" +
+      "  \"fields\": [\n" +
+      "    { \"name\": \"id\", \"type\": \"int\" },\n" +
+      "    { \"name\": \"age\", \"type\": \"long\" },\n" +
+      "    { \"name\": \"score\", \"type\": \"float\" }\n" +
+      "  ]\n" +
+      "}\n";
+
+  private static String TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA =
+      "{\n" +
+      "  \"type\": \"record\",\n" +
+      "  \"namespace\": \"org.apache.tajo\",\n" +
+      "  \"name\": \"testNullHandlingTypes\",\n" +
+      "  \"fields\": [\n" +
+      "    { \"name\": \"col1\", \"type\": [\"null\", \"boolean\"] },\n" +
+      "    { \"name\": \"col2\", \"type\": [\"null\", \"int\"] },\n" +
+      "    { \"name\": \"col3\", \"type\": [\"null\", \"string\"] },\n" +
+      "    { \"name\": \"col4\", \"type\": [\"null\", \"int\"] },\n" +
+      "    { \"name\": \"col5\", \"type\": [\"null\", \"int\"] },\n" +
+      "    { \"name\": \"col6\", \"type\": [\"null\", \"long\"] },\n" +
+      "    { \"name\": \"col7\", \"type\": [\"null\", \"float\"] },\n" +
+      "    { \"name\": \"col8\", \"type\": [\"null\", \"double\"] },\n" +
+      "    { \"name\": \"col9\", \"type\": [\"null\", \"string\"] },\n" +
+      "    { \"name\": \"col10\", \"type\": [\"null\", \"bytes\"] },\n" +
+      "    { \"name\": \"col11\", \"type\": [\"null\", \"bytes\"] },\n" +
+      "    { \"name\": \"col12\", \"type\": \"null\" },\n" +
+      "    { \"name\": \"col13\", \"type\": [\"null\", \"bytes\"] }\n" +
+      "  ]\n" +
+      "}\n";
+
   private StoreType storeType;
   private boolean splitable;
   private boolean statsable;
@@ -68,7 +102,6 @@ public class TestStorages {
       conf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, 100);
     }
 
-
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     fs = testDir.getFileSystem(conf);
   }
@@ -81,6 +114,7 @@ public class TestStorages {
         {StoreType.RCFILE, true, true},
         {StoreType.PARQUET, false, false},
         {StoreType.SEQUENCEFILE, true, true},
+        {StoreType.AVRO, false, false},
     });
   }
 
@@ -99,7 +133,7 @@ public class TestStorages {
       int tupleNum = 10000;
       VTuple vTuple;
 
-      for(int i = 0; i < tupleNum; i++) {
+      for (int i = 0; i < tupleNum; i++) {
         vTuple = new VTuple(2);
         vTuple.put(0, DatumFactory.createInt4(i + 1));
         vTuple.put(1, DatumFactory.createInt8(25l));
@@ -147,6 +181,10 @@ public class TestStorages {
 
     TableMeta meta = CatalogUtil.newTableMeta(storeType);
     meta.setOptions(StorageUtil.newPhysicalProperties(storeType));
+    if (storeType == StoreType.AVRO) {
+      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
+                     TEST_PROJECTION_AVRO_SCHEMA);
+    }
 
     Path tablePath = new Path(testDir, "testProjection.data");
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
@@ -154,7 +192,7 @@ public class TestStorages {
     int tupleNum = 10000;
     VTuple vTuple;
 
-    for(int i = 0; i < tupleNum; i++) {
+    for (int i = 0; i < tupleNum; i++) {
       vTuple = new VTuple(3);
       vTuple.put(0, DatumFactory.createInt4(i + 1));
       vTuple.put(1, DatumFactory.createInt8(i + 2));
@@ -178,7 +216,8 @@ public class TestStorages {
           || storeType == StoreType.TREVNI
           || storeType == StoreType.CSV
           || storeType == StoreType.PARQUET
-          || storeType == StoreType.SEQUENCEFILE) {
+          || storeType == StoreType.SEQUENCEFILE
+          || storeType == StoreType.AVRO) {
         assertTrue(tuple.get(0) == null);
       }
       assertTrue(tupleCnt + 2 == tuple.get(1).asInt8());
@@ -210,6 +249,10 @@ public class TestStorages {
     Options options = new Options();
     TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
     meta.setOptions(StorageUtil.newPhysicalProperties(storeType));
+    if (storeType == StoreType.AVRO) {
+      meta.putOption(StorageConstants.AVRO_SCHEMA_URL,
+                     "src/test/resources/testVariousTypes.avsc");
+    }
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
@@ -244,7 +287,7 @@ public class TestStorages {
     scanner.init();
 
     Tuple retrieved;
-    while ((retrieved=scanner.next()) != null) {
+    while ((retrieved = scanner.next()) != null) {
       for (int i = 0; i < tuple.size(); i++) {
         assertEquals(tuple.get(i), retrieved.get(i));
       }
@@ -276,6 +319,10 @@ public class TestStorages {
     meta.putOption(StorageConstants.RCFILE_NULL, "\\\\N");
     meta.putOption(StorageConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName());
     meta.putOption(StorageConstants.SEQUENCEFILE_NULL, "\\");
+    if (storeType == StoreType.AVRO) {
+      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
+                     TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA);
+    }
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
@@ -655,6 +702,4 @@ public class TestStorages {
     }
   }
 
-
-
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8da52ede/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
index be5b096..d6cfee3 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
@@ -53,6 +53,39 @@ public class TestStorages {
 	private TajoConf conf;
 	private static String TEST_PATH = "target/test-data/v2/TestStorages";
 
+  private static String TEST_PROJECTION_AVRO_SCHEMA =
+      "{\n" +
+      "  \"type\": \"record\",\n" +
+      "  \"namespace\": \"org.apache.tajo\",\n" +
+      "  \"name\": \"testProjection\",\n" +
+      "  \"fields\": [\n" +
+      "    { \"name\": \"id\", \"type\": \"int\" },\n" +
+      "    { \"name\": \"age\", \"type\": \"long\" },\n" +
+      "    { \"name\": \"score\", \"type\": \"float\" }\n" +
+      "  ]\n" +
+      "}\n";
+
+  private static String TEST_VARIOUS_TYPES_AVRO_SCHEMA =
+      "{\n" +
+      "  \"type\": \"record\",\n" +
+      "  \"namespace\": \"org.apache.tajo\",\n" +
+      "  \"name\": \"testVariousTypes\",\n" +
+      "  \"fields\": [\n" +
+      "    { \"name\": \"col1\", \"type\": \"boolean\" },\n" +
+      "    { \"name\": \"col2\", \"type\": \"int\" },\n" +
+      "    { \"name\": \"col3\", \"type\": \"string\" },\n" +
+      "    { \"name\": \"col4\", \"type\": \"int\" },\n" +
+      "    { \"name\": \"col5\", \"type\": \"int\" },\n" +
+      "    { \"name\": \"col6\", \"type\": \"long\" },\n" +
+      "    { \"name\": \"col7\", \"type\": \"float\" },\n" +
+      "    { \"name\": \"col8\", \"type\": \"double\" },\n" +
+      "    { \"name\": \"col9\", \"type\": \"string\" },\n" +
+      "    { \"name\": \"col10\", \"type\": \"bytes\" },\n" +
+      "    { \"name\": \"col11\", \"type\": \"bytes\" },\n" +
+      "    { \"name\": \"col12\", \"type\": \"null\" }\n" +
+      "  ]\n" +
+      "}\n";
+
   private StoreType storeType;
   private boolean splitable;
   private boolean statsable;
@@ -83,10 +116,11 @@ public class TestStorages {
         {StoreType.RCFILE, true, true},
         {StoreType.TREVNI, false, true},
         {StoreType.PARQUET, false, false},
+        {StoreType.AVRO, false, false},
         {StoreType.RAW, false, false},
     });
   }
-		
+
 	@Test
   public void testSplitable() throws IOException {
     if (splitable) {
@@ -149,6 +183,10 @@ public class TestStorages {
 
     TableMeta meta = CatalogUtil.newTableMeta(storeType);
     meta.setOptions(StorageUtil.newPhysicalProperties(storeType));
+    if (storeType == StoreType.AVRO) {
+      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
+                     TEST_PROJECTION_AVRO_SCHEMA);
+    }
 
     Path tablePath = new Path(testDir, "testProjection.data");
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
@@ -179,7 +217,8 @@ public class TestStorages {
       if (storeType == StoreType.RCFILE
           || storeType == StoreType.TREVNI
           || storeType == StoreType.CSV
-          || storeType == StoreType.PARQUET) {
+          || storeType == StoreType.PARQUET
+          || storeType == StoreType.AVRO) {
         assertTrue(tuple.get(0) == null || tuple.get(0) instanceof NullDatum);
       }
       assertTrue(tupleCnt + 2 == tuple.get(1).asInt8());
@@ -210,6 +249,10 @@ public class TestStorages {
     Options options = new Options();
     TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
     meta.setOptions(StorageUtil.newPhysicalProperties(storeType));
+    if (storeType == StoreType.AVRO) {
+      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
+                     TEST_VARIOUS_TYPES_AVRO_SCHEMA);
+    }
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);

http://git-wip-us.apache.org/repos/asf/tajo/blob/8da52ede/tajo-storage/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/storage-default.xml b/tajo-storage/src/test/resources/storage-default.xml
index cbbdafa..a1cc2ff 100644
--- a/tajo-storage/src/test/resources/storage-default.xml
+++ b/tajo-storage/src/test/resources/storage-default.xml
@@ -45,7 +45,7 @@
   <!--- Registered Scanner Handler -->
   <property>
     <name>tajo.storage.scanner-handler</name>
-    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile</value>
+    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
   </property>
 
   <!--- Fragment Class Configurations -->
@@ -77,6 +77,10 @@
     <name>tajo.storage.fragment.sequencefile.class</name>
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
+  <property>
+    <name>tajo.storage.fragment.avro.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
 
   <!--- Scanner Handler -->
   <property>
@@ -149,10 +153,20 @@
     <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
   </property>
 
+  <property>
+    <name>tajo.storage.scanner-handler.avro.class</name>
+    <value>org.apache.tajo.storage.avro.AvroScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.v2.avro.class</name>
+    <value>org.apache.tajo.storage.avro.AvroScanner</value>
+  </property>
+
   <!--- Appender Handler -->
   <property>
     <name>tajo.storage.appender-handler</name>
-    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile</value>
+    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
   </property>
 
   <property>
@@ -190,4 +204,8 @@
     <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
   </property>
 
+  <property>
+    <name>tajo.storage.appender-handler.avro.class</name>
+    <value>org.apache.tajo.storage.avro.AvroAppender</value>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/tajo/blob/8da52ede/tajo-storage/src/test/resources/testVariousTypes.avsc
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/testVariousTypes.avsc b/tajo-storage/src/test/resources/testVariousTypes.avsc
new file mode 100644
index 0000000..611b97f
--- /dev/null
+++ b/tajo-storage/src/test/resources/testVariousTypes.avsc
@@ -0,0 +1,21 @@
+{
+  "type": "record",
+  "namespace": "org.apache.tajo",
+  "name": "testVariousTypes",
+  "fields": [
+    { "name": "col1", "type": "boolean" },
+    { "name": "col2", "type": "int" },
+    { "name": "col3", "type": "string" },
+    { "name": "col4", "type": "int" },
+    { "name": "col5", "type": "int" },
+    { "name": "col6", "type": "long" },
+    { "name": "col7", "type": "float" },
+    { "name": "col8", "type": "double" },
+    { "name": "col9", "type": "string" },
+    { "name": "col10", "type": "bytes" },
+    { "name": "col11", "type": "bytes" },
+    { "name": "col12", "type": "null" },
+    { "name": "col13", "type": "bytes" }
+  ]
+}
+


Mime
View raw message