tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hj...@apache.org
Subject [6/6] tajo git commit: TAJO-1131: Supports Inserting or Creating table into the HBase mapped table.
Date Sun, 16 Nov 2014 01:13:43 GMT
TAJO-1131: Supports Inserting or Creating table into the HBase mapped table.

Closes #232


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/69373878
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/69373878
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/69373878

Branch: refs/heads/hbase_storage
Commit: 69373878b5b722ab82add0f62303019022302a73
Parents: 03c3ea2
Author: HyoungJun Kim <babokim@babokim-MacBook-Pro.local>
Authored: Sun Nov 16 10:12:17 2014 +0900
Committer: HyoungJun Kim <babokim@babokim-MacBook-Pro.local>
Committed: Sun Nov 16 10:12:17 2014 +0900

----------------------------------------------------------------------
 BUILDING                                        |    2 +
 CHANGES                                         |    4 +
 .../org/apache/tajo/catalog/CatalogUtil.java    |    2 +
 .../src/main/proto/CatalogProtos.proto          |    4 +-
 .../java/org/apache/tajo/cli/tsql/TajoCli.java  |    2 +-
 .../main/java/org/apache/tajo/QueryVars.java    |   58 +
 .../java/org/apache/tajo/TajoConstants.java     |    1 +
 .../java/org/apache/tajo/conf/TajoConf.java     |    5 +
 tajo-core/pom.xml                               |   27 +
 .../org/apache/tajo/engine/parser/SQLParser.g4  |   10 +-
 .../tajo/engine/function/string/ToCharLong.java |   55 +
 .../apache/tajo/engine/parser/SQLAnalyzer.java  |   18 +-
 .../engine/planner/PhysicalPlannerImpl.java     |   13 +-
 .../engine/planner/global/ExecutionBlock.java   |    2 +
 .../planner/physical/ColPartitionStoreExec.java |    1 +
 .../planner/physical/ExternalSortExec.java      |    4 +-
 .../physical/RangeShuffleFileWriteExec.java     |    2 +
 .../engine/planner/physical/SeqScanExec.java    |    6 +-
 .../engine/planner/physical/StoreTableExec.java |   37 +-
 .../apache/tajo/engine/query/QueryContext.java  |   45 +-
 .../org/apache/tajo/engine/utils/TupleUtil.java |    7 +-
 .../DefaultFragmentScheduleAlgorithm.java       |    2 +-
 .../org/apache/tajo/master/GlobalEngine.java    |   33 +-
 .../master/GreedyFragmentScheduleAlgorithm.java |    9 +-
 .../apache/tajo/master/LazyTaskScheduler.java   |   18 +-
 .../master/NonForwardQueryResultScanner.java    |   21 +-
 .../apache/tajo/master/querymaster/Query.java   |  353 +----
 .../master/querymaster/QueryMasterTask.java     |   40 +-
 .../tajo/master/querymaster/QueryUnit.java      |   11 +-
 .../tajo/master/querymaster/Repartitioner.java  |   71 +-
 .../tajo/master/querymaster/SubQuery.java       |   16 +-
 .../main/java/org/apache/tajo/worker/Task.java  |   44 +-
 .../main/resources/webapps/worker/queryunit.jsp |    5 +-
 .../org/apache/tajo/HBaseTestClusterUtil.java   |  182 +++
 .../java/org/apache/tajo/QueryTestCaseBase.java |    8 +-
 .../org/apache/tajo/TajoTestingCluster.java     |   12 +-
 .../TestStringOperatorsAndFunctions.java        |    1 +
 .../tajo/engine/planner/TestPlannerUtil.java    |    2 +-
 .../planner/physical/TestBSTIndexExec.java      |    3 +-
 .../planner/physical/TestPhysicalPlanner.java   |    3 +-
 .../tajo/engine/query/TestHBaseTable.java       | 1474 ++++++++++++++++++
 .../tajo/worker/TestRangeRetrieverHandler.java  |    4 +-
 .../dataset/TestHBaseTable/splits.data          |    4 +
 .../TestHBaseTable/testBinaryMappedQuery.result |   81 +
 .../results/TestHBaseTable/testCATS.result      |  100 ++
 .../testColumnKeyValueSelectQuery.result        |   12 +
 .../TestHBaseTable/testIndexPredication.result  |   38 +
 .../TestHBaseTable/testInsertInto.result        |    3 +
 .../testInsertIntoBinaryMultiRegion.result      |  100 ++
 .../testInsertIntoColumnKeyValue.result         |   21 +
 .../testInsertIntoMultiRegion.result            |  100 ++
 .../testInsertIntoMultiRegion2.result           |  100 ++
 ...stInsertIntoMultiRegionMultiRowFields.result |  100 ++
 ...estInsertIntoMultiRegionWithSplitFile.result |  100 ++
 .../testInsertIntoRowField.result               |    4 +
 .../testInsertIntoUsingPut.result               |    3 +
 .../results/TestHBaseTable/testJoin.result      |    7 +
 .../TestHBaseTable/testNonForwardQuery.result   |  102 ++
 .../testRowFieldSelectQuery.result              |   88 ++
 .../TestHBaseTable/testSimpleSelectQuery.result |   88 ++
 tajo-dist/src/main/bin/tajo                     |   15 +
 tajo-dist/src/main/conf/tajo-env.sh             |    3 +
 .../org/apache/tajo/plan/LogicalOptimizer.java  |    7 +
 .../org/apache/tajo/plan/logical/SortNode.java  |   20 +-
 .../rewrite/rules/PartitionedTableRewriter.java |    2 +-
 .../org/apache/tajo/plan/util/PlannerUtil.java  |  112 +-
 .../plan/verifier/PreLogicalPlanVerifier.java   |    3 +
 tajo-project/pom.xml                            |    1 +
 tajo-storage/pom.xml                            |   23 +
 .../java/org/apache/tajo/storage/CSVFile.java   |   10 +-
 .../org/apache/tajo/storage/FileAppender.java   |   28 +-
 .../apache/tajo/storage/FileStorageManager.java |  237 ++-
 .../storage/HashShuffleAppenderManager.java     |    1 -
 .../org/apache/tajo/storage/MergeScanner.java   |    8 +-
 .../java/org/apache/tajo/storage/RawFile.java   |    7 +-
 .../java/org/apache/tajo/storage/RowFile.java   |    6 +-
 .../org/apache/tajo/storage/StorageManager.java |  787 +++++++++-
 .../apache/tajo/storage/StorageProperty.java    |   40 +
 .../apache/tajo/storage/avro/AvroAppender.java  |    8 +-
 .../tajo/storage/fragment/FileFragment.java     |    6 +-
 .../storage/fragment/FragmentConvertor.java     |   25 +-
 .../storage/hbase/AbstractHBaseAppender.java    |  223 +++
 .../storage/hbase/AddSortForInsertRewriter.java |   87 ++
 .../tajo/storage/hbase/ColumnMapping.java       |  236 +++
 .../HBaseBinarySerializerDeserializer.java      |   97 ++
 .../tajo/storage/hbase/HBaseFragment.java       |  198 +++
 .../tajo/storage/hbase/HBasePutAppender.java    |  120 ++
 .../apache/tajo/storage/hbase/HBaseScanner.java |  445 ++++++
 .../storage/hbase/HBaseStorageConstants.java    |   33 +
 .../tajo/storage/hbase/HBaseStorageManager.java | 1126 +++++++++++++
 .../hbase/HBaseTextSerializerDeserializer.java  |   71 +
 .../tajo/storage/hbase/HFileAppender.java       |  167 ++
 .../tajo/storage/hbase/IndexPredication.java    |   61 +
 .../tajo/storage/hbase/RowKeyMapping.java       |   40 +
 .../tajo/storage/parquet/ParquetAppender.java   |    9 +-
 .../org/apache/tajo/storage/rcfile/RCFile.java  |    6 +-
 .../sequencefile/SequenceFileAppender.java      |    6 +-
 .../tajo/storage/text/DelimitedTextFile.java    |    7 +-
 .../tajo/storage/trevni/TrevniAppender.java     |    6 +-
 .../src/main/proto/StorageFragmentProtos.proto  |   35 +
 .../src/main/resources/storage-default.xml      |   25 +-
 .../tajo/storage/hbase/TestColumnMapping.java   |   95 ++
 .../storage/hbase/TestHBaseStorageManager.java  |  109 ++
 103 files changed, 7530 insertions(+), 689 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/BUILDING
----------------------------------------------------------------------
diff --git a/BUILDING b/BUILDING
index 974fac8..f318f88 100644
--- a/BUILDING
+++ b/BUILDING
@@ -44,6 +44,8 @@ Maven build goals:
   * Use -Dtar to create a TAR with the distribution (using -Pdist)
   * Use -Dhadoop.version to build with the specific hadoop version (-Dhadoop.version=2.5.1)
     * Currently, 2.3.0 or higher are supported.
+  * Use -Dhbase.version to build with the specific hbase version (-Dhbase.version=0.98.7-hadoop2)
+    * Currently, 0.98.x-hadoop2 or higher are tested.
 
  Tests options:
   * Use -DskipTests to skip tests when running the following Maven goals:

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index e37d44b..2b723d4 100644
--- a/CHANGES
+++ b/CHANGES
@@ -5,6 +5,10 @@ Release 0.9.1 - unreleased
 
   NEW FEATURES
 
+    TAJO-1131: Supports Inserting or Creating table into 
+    the HBase mapped table.(Hyoungjun Kim)
+
+
     TAJO-1026: Implement Query history persistency manager.(Hyoungjun Kim)
 
     TAJO-233: Support PostgreSQL CatalogStore. (Jihun Kang via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index 6e66d2a..66417df 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -280,6 +280,8 @@ public class CatalogUtil {
       return StoreType.AVRO;
     } else if (typeStr.equalsIgnoreCase(StoreType.TEXTFILE.name())) {
       return StoreType.TEXTFILE;
+    } else if (typeStr.equalsIgnoreCase(StoreType.HBASE.name())) {
+      return StoreType.HBASE;
     } else {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index 31a7446..9c475aa 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -37,6 +37,7 @@ enum StoreType {
   SEQUENCEFILE = 8;
   AVRO = 9;
   TEXTFILE = 10;
+  HBASE = 11;
 }
 
 enum OrderType {
@@ -69,7 +70,8 @@ message SchemaProto {
 
 message FragmentProto {
   required string id = 1;
-  required bytes contents = 2;
+  required string storeType = 2;
+  required bytes contents = 3;
 }
 
 message FileFragmentProto {

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
index 34e2170..63ff873 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
@@ -25,13 +25,13 @@ import jline.UnsupportedTerminal;
 import jline.console.ConsoleReader;
 import org.apache.commons.cli.*;
 import org.apache.tajo.*;
+import org.apache.tajo.ipc.*;
 import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.cli.tsql.commands.*;
 import org.apache.tajo.client.*;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.util.FileUtil;
 
 import java.io.*;

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-common/src/main/java/org/apache/tajo/QueryVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryVars.java b/tajo-common/src/main/java/org/apache/tajo/QueryVars.java
new file mode 100644
index 0000000..ba76d63
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/QueryVars.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import org.apache.tajo.validation.Validator;
+
+public enum QueryVars implements ConfigKey {
+  COMMAND_TYPE,
+  STAGING_DIR,
+  OUTPUT_TABLE_NAME,
+  OUTPUT_TABLE_PATH,
+  OUTPUT_PARTITIONS,
+  OUTPUT_OVERWRITE,
+  OUTPUT_AS_DIRECTORY,
+  OUTPUT_PER_FILE_SIZE,
+  ;
+
+  QueryVars() {
+  }
+
+  @Override
+  public String keyname() {
+    return name().toLowerCase();
+  }
+
+  @Override
+  public ConfigType type() {
+    return ConfigType.QUERY;
+  }
+
+  @Override
+  public Class<?> valueClass() {
+    return null;
+  }
+
+  @Override
+  public Validator validator() {
+    return null;
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java b/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java
index 08909b4..de09f09 100644
--- a/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java
@@ -39,6 +39,7 @@ public class TajoConstants {
   public static final String SYSTEM_HA_BACKUP_DIR_NAME = "backup";
 
   public static final int UNKNOWN_ROW_NUMBER = -1;
+  public static final int UNKNOWN_LENGTH = -1;
 
   private TajoConstants() {}
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 3966410..d96bcdd 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -302,6 +302,11 @@ public class TajoConf extends Configuration {
     HISTORY_EXPIRY_TIME_DAY("tajo.history.expiry-time-day", 7),
 
     // Misc -------------------------------------------------------------------
+    // Fragment
+    // When making physical plan, the length of fragment is used to determine the physical operation.
+    // Some storage does not know the size of the fragment.
+    // In this case PhysicalPlanner uses this value to determine.
+    FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH("tajo.fragment.alternative.unknown.length", (long)(512 * 1024 * 1024)),
 
     // Geo IP
     GEOIP_DATA("tajo.function.geoip-database-location", ""),

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index fce96e4..b58ae89 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -396,6 +396,33 @@
       <artifactId>gmetric4j</artifactId>
       <version>1.0.3</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <version>${hbase.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <version>${hbase.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-hadoop-compat</artifactId>
+      <version>${hbase.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-hadoop2-compat</artifactId>
+      <version>${hbase.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
index beba248..9b63a24 100644
--- a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
+++ b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
@@ -89,11 +89,11 @@ if_exists
   ;
 
 create_table_statement
-  : CREATE EXTERNAL TABLE (if_not_exists)? table_name table_elements USING file_type=identifier
-    (param_clause)? (table_partitioning_clauses)? (LOCATION path=Character_String_Literal)
-  | CREATE TABLE (if_not_exists)? table_name table_elements (USING file_type=identifier)?
+  : CREATE EXTERNAL TABLE (if_not_exists)? table_name table_elements USING storage_type=identifier
+    (param_clause)? (table_partitioning_clauses)? (LOCATION path=Character_String_Literal)?
+  | CREATE TABLE (if_not_exists)? table_name table_elements (USING storage_type=identifier)?
     (param_clause)? (table_partitioning_clauses)? (AS query_expression)?
-  | CREATE TABLE (if_not_exists)? table_name (USING file_type=identifier)?
+  | CREATE TABLE (if_not_exists)? table_name (USING storage_type=identifier)?
     (param_clause)? (table_partitioning_clauses)? AS query_expression
   | CREATE TABLE (if_not_exists)? table_name LIKE like_table_name=table_name
   ;
@@ -1559,7 +1559,7 @@ null_ordering
 
 insert_statement
   : INSERT (OVERWRITE)? INTO table_name (LEFT_PAREN column_name_list RIGHT_PAREN)? query_expression
-  | INSERT (OVERWRITE)? INTO LOCATION path=Character_String_Literal (USING file_type=identifier (param_clause)?)? query_expression
+  | INSERT (OVERWRITE)? INTO LOCATION path=Character_String_Literal (USING storage_type=identifier (param_clause)?)? query_expression
   ;
 
 /*

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToCharLong.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToCharLong.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToCharLong.java
new file mode 100644
index 0000000..5fed940
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToCharLong.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.string;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+import org.apache.tajo.plan.function.GeneralFunction;
+import org.apache.tajo.storage.Tuple;
+
+import java.text.DecimalFormat;
+
+@Description(
+    functionName = "to_char",
+    description = "convert integer to string.",
+    example = "> SELECT to_char(125, '00999');\n"
+        + "00125",
+    returnType = TajoDataTypes.Type.TEXT,
+    paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.INT8, TajoDataTypes.Type.TEXT})}
+)
+
+public class ToCharLong extends GeneralFunction {
+  DecimalFormat df = null;
+
+  public ToCharLong() {
+    super(new Column[]{new Column("val", TajoDataTypes.Type.INT8), new Column("format", TajoDataTypes.Type.TEXT)});
+  }
+
+  @Override
+  public Datum eval(Tuple params) {
+    if (df == null) {
+      df = new DecimalFormat(params.get(1).asChars());
+    }
+    return new TextDatum(df.format(params.get(0).asInt8()));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index 03b10c9..40e5f8a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -29,6 +29,7 @@ import org.apache.tajo.algebra.Aggregation.GroupType;
 import org.apache.tajo.algebra.LiteralValue.LiteralType;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.engine.parser.SQLParser.*;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.StringUtils;
 
@@ -62,6 +63,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
     try {
       context = parser.sql();
     } catch (SQLParseError e) {
+      e.printStackTrace();
       throw new SQLSyntaxError(e);
     }
     return visitSql(context);
@@ -1162,12 +1164,14 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
       createTable.setExternal();
 
       ColumnDefinition[] elements = getDefinitions(ctx.table_elements());
-      String fileType = ctx.file_type.getText();
-      String path = stripQuote(ctx.path.getText());
-
+      String storageType = ctx.storage_type.getText();
       createTable.setTableElements(elements);
-      createTable.setStorageType(fileType);
-      createTable.setLocation(path);
+      createTable.setStorageType(storageType);
+
+      if (PlannerUtil.isFileStorageType(storageType)) {
+        String path = stripQuote(ctx.path.getText());
+        createTable.setLocation(path);
+      }
     } else {
       if (checkIfExist(ctx.table_elements())) {
         ColumnDefinition[] elements = getDefinitions(ctx.table_elements());
@@ -1175,7 +1179,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
       }
 
       if (checkIfExist(ctx.USING())) {
-        String fileType = ctx.file_type.getText();
+        String fileType = ctx.storage_type.getText();
         createTable.setStorageType(fileType);
       }
 
@@ -1449,7 +1453,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
       insertExpr.setLocation(stripQuote(ctx.path.getText()));
 
       if (ctx.USING() != null) {
-        insertExpr.setStorageType(ctx.file_type.getText());
+        insertExpr.setStorageType(ctx.storage_type.getText());
 
         if (ctx.param_clause() != null) {
           insertExpr.setParams(escapeTableMeta(getParams(ctx.param_clause())));

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 5f47db7..98a621e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -29,11 +29,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.SessionVars;
+import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.physical.*;
@@ -250,11 +252,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
   public long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) throws IOException {
     long size = 0;
     for (String tableId : tableIds) {
-      // TODO - CSV is a hack.
-      List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), CatalogProtos.StoreType.CSV,
-          ctx.getTables(tableId));
-      for (FileFragment frag : fragments) {
-        size += frag.getLength();
+      FragmentProto[] fragmentProtos = ctx.getTables(tableId);
+      List<Fragment> fragments = FragmentConvertor.convert(ctx.getConf(), fragmentProtos);
+      for (Fragment frag : fragments) {
+        size += StorageManager.getFragmentLength(ctx.getConf(), frag);
       }
     }
     return size;
@@ -1182,7 +1183,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
     FragmentProto [] fragmentProtos = ctx.getTables(annotation.getTableName());
     List<FileFragment> fragments =
-        FragmentConvertor.convert(ctx.getConf(), ctx.getDataChannel().getStoreType(), fragmentProtos);
+        FragmentConvertor.convert(ctx.getConf(), fragmentProtos);
 
     String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys());
     Path indexPath = new Path(

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index 77eb32d..aecb364 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -82,6 +82,8 @@ public class ExecutionBlock {
       } else if (node instanceof TableSubQueryNode) {
         TableSubQueryNode subQuery = (TableSubQueryNode) node;
         s.add(s.size(), subQuery.getSubQuery());
+      } else if (node instanceof StoreTableNode) {
+        store = (StoreTableNode)node;
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
index 7818fd7..c5df5f9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
@@ -120,6 +120,7 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec {
     super.init();
 
     storeTablePath = context.getOutputPath();
+
     FileSystem fs = storeTablePath.getFileSystem(context.getConf());
     if (!fs.exists(storeTablePath.getParent())) {
       fs.mkdirs(storeTablePath.getParent());

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 6a69763..4e19114 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -173,7 +173,7 @@ public class ExternalSortExec extends SortExec {
 
     long chunkWriteStart = System.currentTimeMillis();
     Path outputPath = getChunkPathForWrite(0, chunkId);
-    final RawFileAppender appender = new RawFileAppender(context.getConf(), inSchema, meta, outputPath);
+    final RawFileAppender appender = new RawFileAppender(context.getConf(), null, inSchema, meta, outputPath);
     appender.init();
     for (Tuple t : tupleBlock) {
       appender.addTuple(t);
@@ -471,7 +471,7 @@ public class ExternalSortExec extends SortExec {
       final Path outputPath = getChunkPathForWrite(level + 1, nextRunId);
       info(LOG, mergeFanout + " files are being merged to an output file " + outputPath.getName());
       long mergeStartTime = System.currentTimeMillis();
-      final RawFileAppender output = new RawFileAppender(context.getConf(), inSchema, meta, outputPath);
+      final RawFileAppender output = new RawFileAppender(context.getConf(), null, inSchema, meta, outputPath);
       output.init();
       final Scanner merger = createKWayMerger(inputFiles, startIdx, mergeFanout);
       merger.init();

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index a9ca836..568c6ec 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.TextDatum;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.index.bst.BSTIndex;

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index b863e4f..b2ef278 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -80,8 +80,7 @@ public class SeqScanExec extends PhysicalExec {
       String pathNameKey = "";
       if (fragments != null) {
         for (FragmentProto f : fragments) {
-          Fragment fragement = FragmentConvertor.convert(
-              context.getConf(), plan.getTableDesc().getMeta().getStoreType(), f);
+          Fragment fragement = FragmentConvertor.convert(context.getConf(), f);
           pathNameKey += fragement.getKey();
         }
       }
@@ -216,8 +215,7 @@ public class SeqScanExec extends PhysicalExec {
     if (fragments != null) {
       if (fragments.length > 1) {
         this.scanner = new MergeScanner(context.getConf(), plan.getPhysicalSchema(), plan.getTableDesc().getMeta(),
-            FragmentConvertor.convert(context.getConf(), plan.getTableDesc().getMeta().getStoreType(),
-                fragments), projected
+            FragmentConvertor.convert(context.getConf(), fragments), projected
         );
       } else {
         StorageManager storageManager = StorageManager.getStorageManager(

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index 725478f..a5e0b5d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -23,11 +23,13 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.plan.logical.InsertNode;
 import org.apache.tajo.plan.logical.PersistentStoreNode;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.Appender;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.Tuple;
@@ -78,31 +80,32 @@ public class StoreTableExec extends UnaryPhysicalExec {
   }
 
   public void openNewFile(int suffixId) throws IOException {
-    String prevFile = null;
+    Schema appenderSchema = (plan instanceof InsertNode) ? ((InsertNode) plan).getTableSchema() : outSchema;
 
-    lastFileName = context.getOutputPath();
-    if (suffixId > 0) {
-      prevFile = lastFileName.toString();
+    if (PlannerUtil.isFileStorageType(meta.getStoreType())) {
+      String prevFile = null;
 
-      lastFileName = new Path(lastFileName + "_" + suffixId);
-    }
+      lastFileName = context.getOutputPath();
+
+      if (suffixId > 0) {
+        prevFile = lastFileName.toString();
+        lastFileName = new Path(lastFileName + "_" + suffixId);
+      }
+
+      appender = StorageManager.getFileStorageManager(context.getConf()).getAppender(meta, appenderSchema, lastFileName);
 
-    if (plan instanceof InsertNode) {
-      InsertNode createTableNode = (InsertNode) plan;
-      appender = StorageManager.getStorageManager(context.getConf(), meta.getStoreType()).getAppender(meta,
-          createTableNode.getTableSchema(), context.getOutputPath());
+      if (suffixId > 0) {
+        LOG.info(prevFile + " exceeds " + SessionVars.MAX_OUTPUT_FILE_SIZE.keyname() + " (" + maxPerFileSize + " MB), " +
+            "The remain output will be written into " + lastFileName.toString());
+      }
     } else {
-      appender = StorageManager.getStorageManager(context.getConf(), meta.getStoreType()).getAppender(meta,
-          outSchema, lastFileName);
+      appender = StorageManager.getStorageManager(context.getConf(), meta.getStoreType()).getAppender(
+          context.getQueryContext(),
+          context.getTaskId(), meta, appenderSchema, context.getQueryContext().getStagingDir());
     }
 
     appender.enableStats();
     appender.init();
-
-    if (suffixId > 0) {
-      LOG.info(prevFile + " exceeds " + SessionVars.MAX_OUTPUT_FILE_SIZE.keyname() + " (" + maxPerFileSize + " MB), " +
-          "The remain output will be written into " + lastFileName.toString());
-    }
   }
 
   /* (non-Javadoc)

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
index d8f7f08..488cae5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
@@ -21,11 +21,11 @@ package org.apache.tajo.engine.query;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.ConfigKey;
 import org.apache.tajo.OverridableConf;
+import org.apache.tajo.QueryVars;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.master.session.Session;
-import org.apache.tajo.validation.Validator;
 import org.apache.tajo.plan.logical.NodeType;
 
 import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetProto;
@@ -34,41 +34,6 @@ import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetPro
  * QueryContent is a overridable config, and it provides a set of various configs for a query instance.
  */
 public class QueryContext extends OverridableConf {
-  public static enum QueryVars implements ConfigKey {
-    COMMAND_TYPE,
-    STAGING_DIR,
-    OUTPUT_TABLE_NAME,
-    OUTPUT_TABLE_PATH,
-    OUTPUT_PARTITIONS,
-    OUTPUT_OVERWRITE,
-    OUTPUT_AS_DIRECTORY,
-    OUTPUT_PER_FILE_SIZE,
-    ;
-
-    QueryVars() {
-    }
-
-    @Override
-    public String keyname() {
-      return name().toLowerCase();
-    }
-
-    @Override
-    public ConfigType type() {
-      return ConfigType.QUERY;
-    }
-
-    @Override
-    public Class<?> valueClass() {
-      return null;
-    }
-
-    @Override
-    public Validator validator() {
-      return null;
-    }
-  }
-
   public QueryContext(TajoConf conf) {
     super(conf, ConfigKey.ConfigType.QUERY);
   }
@@ -103,8 +68,8 @@ public class QueryContext extends OverridableConf {
   }
 
   public Path getStagingDir() {
-    String strVal = get(QueryVars.STAGING_DIR);
-    return strVal != null ? new Path(strVal) : null;
+    String strVal = get(QueryVars.STAGING_DIR, "");
+    return strVal != null && !strVal.isEmpty() ? new Path(strVal) : null;
   }
 
   /**
@@ -127,7 +92,9 @@ public class QueryContext extends OverridableConf {
   }
 
   public void setOutputPath(Path path) {
-    put(QueryVars.OUTPUT_TABLE_PATH, path.toUri().toString());
+    if (path != null) {
+      put(QueryVars.OUTPUT_TABLE_PATH, path.toUri().toString());
+    }
   }
 
   public Path getOutputPath() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
index aeb4e05..3bb1b5b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -125,12 +125,13 @@ public class TupleUtil {
     Tuple startTuple = new VTuple(target.size());
     Tuple endTuple = new VTuple(target.size());
     int i = 0;
+    int sortSpecIndex = 0;
 
     // In outer join, empty table could be searched.
     // As a result, min value and max value would be null.
     // So, we should put NullDatum for this case.
     for (Column col : target.getColumns()) {
-      if (sortSpecs[i].isAscending()) {
+      if (sortSpecs[sortSpecIndex].isAscending()) {
         if (statSet.get(col).getMinValue() != null)
           startTuple.put(i, statSet.get(col).getMinValue());
         else
@@ -164,6 +165,10 @@ public class TupleUtil {
         else
           endTuple.put(i, DatumFactory.createNullDatum());
       }
+      if (target.getColumns().size() == sortSpecs.length) {
+        // Not composite column sort
+        sortSpecIndex++;
+      }
       i++;
     }
     return new TupleRange(sortSpecs, startTuple, endTuple);

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
index 6a2a705..406550d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
@@ -87,7 +87,7 @@ public class DefaultFragmentScheduleAlgorithm implements FragmentScheduleAlgorit
       diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds();
     }
     for (int i = 0; i < hosts.length; i++) {
-      addFragment(hosts[i], diskIds[i], fragmentPair);
+      addFragment(hosts[i], diskIds != null ? diskIds[i] : -1, fragmentPair);
     }
     fragmentNum++;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 90d8dc7..d87ca30 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -303,6 +303,15 @@ public class GlobalEngine extends AbstractService {
         responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
       }
     } else { // it requires distributed execution. So, the query is forwarded to a query master.
+      StoreType storeType = PlannerUtil.getStoreType(plan);
+      if (storeType != null) {
+        StorageManager sm = StorageManager.getStorageManager(context.getConf(), storeType);
+        StorageProperty storageProperty = sm.getStorageProperty();
+        if (!storageProperty.isSupportsInsertInto()) {
+          throw new VerifyException("Inserting into non-file storage is not supported.");
+        }
+        sm.beforeInsertOrCATS(rootNode.getChild());
+      }
       context.getSystemMetrics().counter("Query", "numDMLQuery").inc();
       hookManager.doHooks(queryContext, plan);
 
@@ -519,6 +528,7 @@ public class GlobalEngine extends AbstractService {
     LOG.info("=============================================");
 
     annotatedPlanVerifier.verify(queryContext, state, plan);
+    verifyInsertTableSchema(queryContext, state, plan);
 
     if (!state.verified()) {
       StringBuilder sb = new StringBuilder();
@@ -531,6 +541,25 @@ public class GlobalEngine extends AbstractService {
     return plan;
   }
 
+  private void verifyInsertTableSchema(QueryContext queryContext, VerificationState state, LogicalPlan plan) {
+    StoreType storeType = PlannerUtil.getStoreType(plan);
+    if (storeType != null) {
+      LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+      if (rootNode.getChild().getType() == NodeType.INSERT) {
+        try {
+          TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild());
+          InsertNode iNode = rootNode.getChild();
+          Schema outSchema = iNode.getChild().getOutSchema();
+
+          StorageManager.getStorageManager(queryContext.getConf(), storeType)
+              .verifyInsertTableSchema(tableDesc, outSchema);
+        } catch (Throwable t) {
+          state.addVerification(t.getMessage());
+        }
+      }
+    }
+  }
+
   /**
    * Alter a given table
    */
@@ -693,7 +722,7 @@ public class GlobalEngine extends AbstractService {
       meta = CatalogUtil.newTableMeta(createTable.getStorageType());
     }
 
-    if(StorageUtil.isFileStorageType(createTable.getStorageType()) && createTable.isExternal()){
+    if(PlannerUtil.isFileStorageType(createTable.getStorageType()) && createTable.isExternal()){
       Preconditions.checkState(createTable.hasPath(), "ERROR: LOCATION must be given.");
     }
 
@@ -735,7 +764,7 @@ public class GlobalEngine extends AbstractService {
       desc.setPartitionMethod(partitionDesc);
     }
 
-    StorageManager.getStorageManager(queryContext.getConf(), storeType).createTable(desc);
+    StorageManager.getStorageManager(queryContext.getConf(), storeType).createTable(desc, ifNotExists);
 
     if (catalog.createTable(desc)) {
       LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")");

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
index 3798399..56cf8e5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
@@ -107,7 +107,7 @@ public class GreedyFragmentScheduleAlgorithm implements FragmentScheduleAlgorith
       diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds();
     }
     for (int i = 0; i < hosts.length; i++) {
-      addFragment(hosts[i], diskIds[i], fragmentPair);
+      addFragment(hosts[i], diskIds != null ? diskIds[i] : -1, fragmentPair);
     }
     totalFragmentNum++;
   }
@@ -285,21 +285,22 @@ public class GreedyFragmentScheduleAlgorithm implements FragmentScheduleAlgorith
       diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds();
     }
     for (int i = 0; i < hosts.length; i++) {
+      int diskId = diskIds == null ? -1 : diskIds[i];
       String normalizedHost = NetUtils.normalizeHost(hosts[i]);
       Map<Integer, FragmentsPerDisk> diskFragmentMap = fragmentHostMapping.get(normalizedHost);
 
       if (diskFragmentMap != null) {
-        FragmentsPerDisk fragmentsPerDisk = diskFragmentMap.get(diskIds[i]);
+        FragmentsPerDisk fragmentsPerDisk = diskFragmentMap.get(diskId);
         if (fragmentsPerDisk != null) {
           boolean isRemoved = fragmentsPerDisk.removeFragmentPair(fragmentPair);
           if (isRemoved) {
             if (fragmentsPerDisk.size() == 0) {
-              diskFragmentMap.remove(diskIds[i]);
+              diskFragmentMap.remove(diskId);
               if (diskFragmentMap.size() == 0) {
                 fragmentHostMapping.remove(normalizedHost);
               }
             }
-            HostAndDisk hostAndDisk = new HostAndDisk(normalizedHost, diskIds[i]);
+            HostAndDisk hostAndDisk = new HostAndDisk(normalizedHost, diskId);
             if (totalHostPriority.containsKey(hostAndDisk)) {
               PrioritizedHost prioritizedHost = totalHostPriority.get(hostAndDisk);
               updateHostPriority(prioritizedHost.hostAndDisk, prioritizedHost.priority-1);

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
index 50a118e..aff4b7d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
@@ -26,7 +26,9 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryUnitRequest;
@@ -38,6 +40,7 @@ import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 import org.apache.tajo.master.querymaster.QueryUnit;
 import org.apache.tajo.master.querymaster.QueryUnitAttempt;
 import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.NetUtils;
@@ -368,6 +371,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
       long taskSize = adjustTaskSize();
       LOG.info("Adjusted task size: " + taskSize);
 
+      TajoConf conf = subQuery.getContext().getConf();
       // host local, disk local
       String normalized = NetUtils.normalizeHost(host);
       Integer diskId = hostDiskBalancerMap.get(normalized).getDiskId(container.containerID);
@@ -378,13 +382,14 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
             break;
           }
 
-          if (assignedFragmentSize + fragmentPair.getLeftFragment().getLength() > taskSize) {
+          if (assignedFragmentSize +
+              StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()) > taskSize) {
             break;
           } else {
             fragmentPairs.add(fragmentPair);
-            assignedFragmentSize += fragmentPair.getLeftFragment().getLength();
+            assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment());
             if (fragmentPair.getRightFragment() != null) {
-              assignedFragmentSize += fragmentPair.getRightFragment().getLength();
+              assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getRightFragment());
             }
           }
           scheduledFragments.removeFragment(fragmentPair);
@@ -400,13 +405,14 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
             break;
           }
 
-          if (assignedFragmentSize + fragmentPair.getLeftFragment().getLength() > taskSize) {
+          if (assignedFragmentSize +
+              StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()) > taskSize) {
             break;
           } else {
             fragmentPairs.add(fragmentPair);
-            assignedFragmentSize += fragmentPair.getLeftFragment().getLength();
+            assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment());
             if (fragmentPair.getRightFragment() != null) {
-              assignedFragmentSize += fragmentPair.getRightFragment().getLength();
+              assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getRightFragment());
             }
           }
           scheduledFragments.removeFragment(fragmentPair);

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
index 26cfb2e..37fa4fe 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
@@ -35,6 +35,9 @@ import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
@@ -42,7 +45,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class NonForwardQueryResultScanner {
-  private static final int MAX_FILE_NUM_PER_SCAN = 100;
+  private static final int MAX_FRAGMENT_NUM_PER_SCAN = 100;
 
   private QueryId queryId;
   private String sessionId;
@@ -55,7 +58,7 @@ public class NonForwardQueryResultScanner {
   private TajoConf tajoConf;
   private ScanNode scanNode;
 
-  private int currentFileIndex = 0;
+  private int currentFragmentIndex = 0;
 
   public NonForwardQueryResultScanner(TajoConf tajoConf, String sessionId,
                                       QueryId queryId,
@@ -77,22 +80,24 @@ public class NonForwardQueryResultScanner {
   }
 
   private void initSeqScanExec() throws IOException {
-    FragmentProto[] fragments = PhysicalPlanUtil.getNonZeroLengthDataFiles(tajoConf, tableDesc,
-        currentFileIndex, MAX_FILE_NUM_PER_SCAN);
-    if (fragments != null && fragments.length > 0) {
+    List<Fragment> fragments = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType())
+        .getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN);
+
+    if (fragments != null && !fragments.isEmpty()) {
+      FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[]{}));
       this.taskContext = new TaskAttemptContext(
           new QueryContext(tajoConf), null,
           new QueryUnitAttemptId(new QueryUnitId(new ExecutionBlockId(queryId, 1), 0), 0),
-          fragments, null);
+          fragmentProtos, null);
 
       try {
         // scanNode must be clone cause SeqScanExec change target in the case of a partitioned table.
-        scanExec = new SeqScanExec(taskContext, (ScanNode)scanNode.clone(), fragments);
+        scanExec = new SeqScanExec(taskContext, (ScanNode)scanNode.clone(), fragmentProtos);
       } catch (CloneNotSupportedException e) {
         throw new IOException(e.getMessage(), e);
       }
       scanExec.init();
-      currentFileIndex += fragments.length;
+      currentFragmentIndex += fragments.size();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index caeadea..218798b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -23,7 +23,6 @@ import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -32,31 +31,27 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.SessionVars;
-import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.plan.logical.CreateTableNode;
-import org.apache.tajo.plan.logical.InsertNode;
-import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.event.*;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.StorageConstants;
-import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.history.QueryHistory;
 import org.apache.tajo.util.history.SubQueryHistory;
 
 import java.io.IOException;
-import java.text.NumberFormat;
 import java.util.*;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -410,8 +405,12 @@ public class Query implements EventHandler<QueryEvent> {
       QueryCompletedEvent subQueryEvent = (QueryCompletedEvent) queryEvent;
       QueryState finalState;
       if (subQueryEvent.getState() == SubQueryState.SUCCEEDED) {
-        finalizeQuery(query, subQueryEvent);
-        finalState = QueryState.QUERY_SUCCEEDED;
+        boolean success = finalizeQuery(query, subQueryEvent);
+        if (success) {
+          finalState = QueryState.QUERY_SUCCEEDED;
+        } else {
+          finalState = QueryState.QUERY_ERROR;
+        }
       } else if (subQueryEvent.getState() == SubQueryState.FAILED) {
         finalState = QueryState.QUERY_FAILED;
       } else if (subQueryEvent.getState() == SubQueryState.KILLED) {
@@ -419,324 +418,48 @@ public class Query implements EventHandler<QueryEvent> {
       } else {
         finalState = QueryState.QUERY_ERROR;
       }
+      if (finalState != QueryState.QUERY_SUCCEEDED) {
+        SubQuery lastStage = query.getSubQuery(subQueryEvent.getExecutionBlockId());
+        if (lastStage != null && lastStage.getTableMeta() != null) {
+          StoreType storeType = lastStage.getTableMeta().getStoreType();
+          if (storeType != null) {
+            LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
+            try {
+              StorageManager.getStorageManager(query.systemConf, storeType).rollbackOutputCommit(rootNode.getChild());
+            } catch (IOException e) {
+              LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
+            }
+          }
+        }
+      }
       query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
       query.setFinishTime();
 
       return finalState;
     }
 
-    private void finalizeQuery(Query query, QueryCompletedEvent event) {
-      MasterPlan masterPlan = query.getPlan();
+    private boolean finalizeQuery(Query query, QueryCompletedEvent event) {
+      SubQuery lastStage = query.getSubQuery(event.getExecutionBlockId());
+      StoreType storeType = lastStage.getTableMeta().getStoreType();
+      try {
+        LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
+        CatalogService catalog = lastStage.getContext().getQueryMasterContext().getWorkerContext().getCatalog();
+        TableDesc tableDesc =  PlannerUtil.getTableDesc(catalog, rootNode.getChild());
 
-      ExecutionBlock terminal = query.getPlan().getTerminalBlock();
-      DataChannel finalChannel = masterPlan.getChannel(event.getExecutionBlockId(), terminal.getId());
-      Path finalOutputDir = commitOutputData(query);
+        Path finalOutputDir = StorageManager.getStorageManager(query.systemConf, storeType)
+            .commitOutputData(query.context.getQueryContext(),
+                lastStage.getId(), lastStage.getMasterPlan().getLogicalPlan(), lastStage.getSchema(), tableDesc);
 
-      QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
-      try {
-        hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(),
-            finalOutputDir);
+        QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
+          hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir);
+        return true;
       } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
         query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
+        return false;
       }
     }
 
-    /**
-     * It moves a result data stored in a staging output dir into a final output dir.
-     */
-    public Path commitOutputData(Query query) {
-      QueryContext queryContext = query.context.getQueryContext();
-      Path stagingResultDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME);
-      Path finalOutputDir;
-      if (queryContext.hasOutputPath()) {
-        finalOutputDir = queryContext.getOutputPath();
-        try {
-          FileSystem fs = stagingResultDir.getFileSystem(query.systemConf);
-
-          if (queryContext.isOutputOverwrite()) { // INSERT OVERWRITE INTO
-
-            // It moves the original table into the temporary location.
-            // Then it moves the new result table into the original table location.
-            // Upon failed, it recovers the original table if possible.
-            boolean movedToOldTable = false;
-            boolean committed = false;
-            Path oldTableDir = new Path(queryContext.getStagingDir(), TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
-
-            if (queryContext.hasPartition()) {
-              // This is a map for existing non-leaf directory to rename. A key is current directory and a value is
-              // renaming directory.
-              Map<Path, Path> renameDirs = TUtil.newHashMap();
-              // This is a map for recovering existing partition directory. A key is current directory and a value is
-              // temporary directory to back up.
-              Map<Path, Path> recoveryDirs = TUtil.newHashMap();
-
-              try {
-                if (!fs.exists(finalOutputDir)) {
-                  fs.mkdirs(finalOutputDir);
-                }
-
-                visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(),
-                    renameDirs, oldTableDir);
-
-                // Rename target partition directories
-                for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
-                  // Backup existing data files for recovering
-                  if (fs.exists(entry.getValue())) {
-                    String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(),
-                        oldTableDir.toString());
-                    Path recoveryPath = new Path(recoveryPathString);
-                    fs.rename(entry.getValue(), recoveryPath);
-                    fs.exists(recoveryPath);
-                    recoveryDirs.put(entry.getValue(), recoveryPath);
-                  }
-                  // Delete existing directory
-                  fs.delete(entry.getValue(), true);
-                  // Rename staging directory to final output directory
-                  fs.rename(entry.getKey(), entry.getValue());
-                }
-
-              } catch (IOException ioe) {
-                // Remove created dirs
-                for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
-                  fs.delete(entry.getValue(), true);
-                }
-
-                // Recovery renamed dirs
-                for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) {
-                  fs.delete(entry.getValue(), true);
-                  fs.rename(entry.getValue(), entry.getKey());
-                }
-                throw new IOException(ioe.getMessage());
-              }
-            } else {
-              try {
-                if (fs.exists(finalOutputDir)) {
-                  fs.rename(finalOutputDir, oldTableDir);
-                  movedToOldTable = fs.exists(oldTableDir);
-                } else { // if the parent does not exist, make its parent directory.
-                  fs.mkdirs(finalOutputDir.getParent());
-                }
-
-                fs.rename(stagingResultDir, finalOutputDir);
-                committed = fs.exists(finalOutputDir);
-              } catch (IOException ioe) {
-                // recover the old table
-                if (movedToOldTable && !committed) {
-                  fs.rename(oldTableDir, finalOutputDir);
-                }
-              }
-            }
-          } else {
-            NodeType queryType = queryContext.getCommandType();
-
-            if (queryType == NodeType.INSERT) { // INSERT INTO an existing table
-
-              NumberFormat fmt = NumberFormat.getInstance();
-              fmt.setGroupingUsed(false);
-              fmt.setMinimumIntegerDigits(3);
-
-              if (queryContext.hasPartition()) {
-                for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
-                  if (eachFile.isFile()) {
-                    LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath());
-                    continue;
-                  }
-                  moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1);
-                }
-              } else {
-                int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1;
-                for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
-                  moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++);
-                }
-              }
-              // checking all file moved and remove empty dir
-              verifyAllFileMoved(fs, stagingResultDir);
-              FileStatus[] files = fs.listStatus(stagingResultDir);
-              if (files != null && files.length != 0) {
-                for (FileStatus eachFile: files) {
-                  LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
-                }
-              }
-            } else { // CREATE TABLE AS SELECT (CTAS)
-              fs.rename(stagingResultDir, finalOutputDir);
-              LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir);
-            }
-          }
-        } catch (IOException e) {
-          // TODO report to client
-          e.printStackTrace();
-        }
-      } else {
-        finalOutputDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME);
-      }
-
-      return finalOutputDir;
-    }
-
-    /**
-     * This method sets a rename map which includes renamed staging directory to final output directory recursively.
-     * If there exists some data files, this delete it for duplicate data.
-     *
-     *
-     * @param fs
-     * @param stagingPath
-     * @param outputPath
-     * @param stagingParentPathString
-     * @throws IOException
-     */
-    private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath,
-                                        String stagingParentPathString,
-                                        Map<Path, Path> renameDirs, Path oldTableDir) throws IOException {
-      FileStatus[] files = fs.listStatus(stagingPath);
-
-      for(FileStatus eachFile : files) {
-        if (eachFile.isDirectory()) {
-          Path oldPath = eachFile.getPath();
-
-          // Make recover directory.
-          String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString,
-          oldTableDir.toString());
-          Path recoveryPath = new Path(recoverPathString);
-          if (!fs.exists(recoveryPath)) {
-            fs.mkdirs(recoveryPath);
-          }
-
-          visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString,
-          renameDirs, oldTableDir);
-          // Find last order partition for renaming
-          String newPathString = oldPath.toString().replaceAll(stagingParentPathString,
-          outputPath.toString());
-          Path newPath = new Path(newPathString);
-          if (!isLeafDirectory(fs, eachFile.getPath())) {
-           renameDirs.put(eachFile.getPath(), newPath);
-          } else {
-            if (!fs.exists(newPath)) {
-             fs.mkdirs(newPath);
-            }
-          }
-        }
-      }
-    }
-
-    private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException {
-      boolean retValue = false;
-
-      FileStatus[] files = fs.listStatus(path);
-      for (FileStatus file : files) {
-        if (fs.isDirectory(file.getPath())) {
-          retValue = true;
-          break;
-        }
-      }
-
-      return retValue;
-    }
-
-    private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException {
-      FileStatus[] files = fs.listStatus(stagingPath);
-      if (files != null && files.length != 0) {
-        for (FileStatus eachFile: files) {
-          if (eachFile.isFile()) {
-            LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
-            return false;
-          } else {
-            if (verifyAllFileMoved(fs, eachFile.getPath())) {
-              fs.delete(eachFile.getPath(), false);
-            } else {
-              return false;
-            }
-          }
-        }
-      }
-
-      return true;
-    }
-
-    /**
-     * Attach the sequence number to a path.
-     *
-     * @param path Path
-     * @param seq sequence number
-     * @param nf Number format
-     * @return New path attached with sequence number
-     * @throws IOException
-     */
-    private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException {
-      String[] tokens = path.getName().split("-");
-      if (tokens.length != 4) {
-        throw new IOException("Wrong result file name:" + path);
-      }
-      return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq);
-    }
-
-    /**
-     * Attach the sequence number to the output file name and than move the file into the final result path.
-     *
-     * @param fs FileSystem
-     * @param stagingResultDir The staging result dir
-     * @param fileStatus The file status
-     * @param finalOutputPath Final output path
-     * @param nf Number format
-     * @param fileSeq The sequence number
-     * @throws IOException
-     */
-    private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir,
-                                            FileStatus fileStatus, Path finalOutputPath,
-                                            NumberFormat nf,
-                                            int fileSeq) throws IOException {
-      if (fileStatus.isDirectory()) {
-        String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
-        if (subPath != null) {
-          Path finalSubPath = new Path(finalOutputPath, subPath);
-          if (!fs.exists(finalSubPath)) {
-            fs.mkdirs(finalSubPath);
-          }
-          int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false);
-          for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) {
-            moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq);
-          }
-        } else {
-          throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath());
-        }
-      } else {
-        String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
-        if (subPath != null) {
-          Path finalSubPath = new Path(finalOutputPath, subPath);
-          finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf));
-          if (!fs.exists(finalSubPath.getParent())) {
-            fs.mkdirs(finalSubPath.getParent());
-          }
-          if (fs.exists(finalSubPath)) {
-            throw new IOException("Already exists data file:" + finalSubPath);
-          }
-          boolean success = fs.rename(fileStatus.getPath(), finalSubPath);
-          if (success) {
-            LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " +
-                "to final output[" + finalSubPath + "]");
-          } else {
-            LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " +
-                "to final output[" + finalSubPath + "]");
-          }
-        }
-      }
-    }
-
-    private String extractSubPath(Path parentPath, Path childPath) {
-      String parentPathStr = parentPath.toUri().getPath();
-      String childPathStr = childPath.toUri().getPath();
-
-      if (parentPathStr.length() > childPathStr.length()) {
-        return null;
-      }
-
-      int index = childPathStr.indexOf(parentPathStr);
-      if (index != 0) {
-        return null;
-      }
-
-      return childPathStr.substring(parentPathStr.length() + 1);
-    }
-
     private static interface QueryHook {
       boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir);
       void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query,

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index ef30135..6a55de6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -35,10 +35,13 @@ import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.algebra.JsonHelper;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.plan.rewrite.RewriteRule;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.plan.logical.LogicalNode;
@@ -54,10 +57,12 @@ import org.apache.tajo.master.TajoContainerProxy;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.master.session.Session;
+import org.apache.tajo.plan.verifier.VerifyException;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.StorageProperty;
 import org.apache.tajo.util.HAServiceUtil;
 import org.apache.tajo.util.metrics.TajoMetrics;
 import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
@@ -345,6 +350,8 @@ public class QueryMasterTask extends CompositeService {
   }
 
   public synchronized void startQuery() {
+    StorageManager sm = null;
+    LogicalPlan plan = null;
     try {
       if (query != null) {
         LOG.warn("Query already started");
@@ -354,7 +361,29 @@ public class QueryMasterTask extends CompositeService {
       LogicalPlanner planner = new LogicalPlanner(catalog);
       LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
       Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
-      LogicalPlan plan = planner.createPlan(queryContext, expr);
+      plan = planner.createPlan(queryContext, expr);
+
+      StoreType storeType = PlannerUtil.getStoreType(plan);
+      if (storeType != null) {
+        sm = StorageManager.getStorageManager(systemConf, storeType);
+        StorageProperty storageProperty = sm.getStorageProperty();
+        if (storageProperty.isSortedInsert()) {
+          String tableName = PlannerUtil.getStoreTableName(plan);
+          LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+          TableDesc tableDesc =  PlannerUtil.getTableDesc(catalog, rootNode.getChild());
+          if (tableDesc == null) {
+            throw new VerifyException("Can't get table meta data from catalog: " + tableName);
+          }
+          List<RewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules(
+              getQueryTaskContext().getQueryContext(), tableDesc);
+          if (storageSpecifiedRewriteRules != null) {
+            for (RewriteRule eachRule: storageSpecifiedRewriteRules) {
+              optimizer.addRuleAfterToJoinOpt(eachRule);
+            }
+          }
+        }
+      }
+
       optimizer.optimize(queryContext, plan);
 
       GlobalEngine.DistributedQueryHookManager hookManager = new GlobalEngine.DistributedQueryHookManager();
@@ -389,6 +418,15 @@ public class QueryMasterTask extends CompositeService {
     } catch (Throwable t) {
       LOG.error(t.getMessage(), t);
       initError = t;
+
+      if (plan != null && sm != null) {
+        LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+        try {
+          sm.rollbackOutputCommit(rootNode.getChild());
+        } catch (IOException e) {
+          LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
+        }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 6bc185e..75402c2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -266,8 +266,13 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 
     List<String> fragmentList = new ArrayList<String>();
     for (FragmentProto eachFragment : getAllFragments()) {
-      FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, eachFragment);
-      fragmentList.add(fileFragment.toString());
+      try {
+        Fragment fragment = FragmentConvertor.convert(systemConf, eachFragment);
+        fragmentList.add(fragment.toString());
+      } catch (Exception e) {
+        LOG.error(e.getMessage());
+        fragmentList.add("ERROR: " + eachFragment.getStoreType() + "," + eachFragment.getId() + ": " + e.getMessage());
+      }
     }
     queryUnitHistory.setFragments(fragmentList.toArray(new String[]{}));
 
@@ -321,7 +326,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
       diskIds = ((FileFragment)fragment).getDiskIds();
     }
     for (int i = 0; i < hosts.length; i++) {
-      dataLocations.add(new DataLocation(hosts[i], diskIds[i]));
+      dataLocations.add(new DataLocation(hosts[i], diskIds == null ? -1 : diskIds[i]));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 046b310..c5a41a1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -45,6 +45,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAg
 import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
 import org.apache.tajo.master.TaskSchedulerContext;
 import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
+import org.apache.tajo.plan.logical.SortNode.SortPurpose;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.logical.*;
@@ -419,7 +420,8 @@ public class Repartitioner {
         } else {
           StorageManager storageManager = StorageManager.getStorageManager(subQuery.getContext().getConf(),
               tableDesc.getMeta().getStoreType());
-          Collection<Fragment> scanFragments = storageManager.getSplits(eachScan.getCanonicalName(), tableDesc);
+          Collection<Fragment> scanFragments = storageManager.getSplits(eachScan.getCanonicalName(),
+              tableDesc, eachScan);
           if (scanFragments != null) {
             rightFragments.addAll(scanFragments);
           }
@@ -539,7 +541,7 @@ public class Repartitioner {
         StorageManager storageManager =
             StorageManager.getStorageManager(subQuery.getContext().getConf(), desc.getMeta().getStoreType());
 
-        scanFragments = storageManager.getSplits(scan.getCanonicalName(), desc);
+        scanFragments = storageManager.getSplits(scan.getCanonicalName(), desc, scan);
       }
 
       if (scanFragments != null) {
@@ -649,39 +651,58 @@ public class Repartitioner {
     SortSpec [] sortSpecs = sortNode.getSortKeys();
     Schema sortSchema = new Schema(channel.getShuffleKeys());
 
+    TupleRange[] ranges;
+    int determinedTaskNum;
+
     // calculate the number of maximum query ranges
     TableStats totalStat = computeChildBlocksStats(subQuery.getContext(), masterPlan, subQuery.getId());
 
     // If there is an empty table in inner join, it should return zero rows.
-    if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() == 0 ) {
+    if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() == 0) {
       return;
     }
     TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats(), false);
-    RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs);
-    BigInteger card = partitioner.getTotalCardinality();
 
-    // if the number of the range cardinality is less than the desired number of tasks,
-    // we set the the number of tasks to the number of range cardinality.
-    int determinedTaskNum;
-    if (card.compareTo(BigInteger.valueOf(maxNum)) < 0) {
-      LOG.info(subQuery.getId() + ", The range cardinality (" + card
-          + ") is less then the desired number of tasks (" + maxNum + ")");
-      determinedTaskNum = card.intValue();
+    if (sortNode.getSortPurpose() == SortPurpose.STORAGE_SPECIFIED) {
+      StoreType storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan());
+      CatalogService catalog = subQuery.getContext().getQueryMasterContext().getWorkerContext().getCatalog();
+      LogicalRootNode rootNode = masterPlan.getLogicalPlan().getRootBlock().getRoot();
+      TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild());
+      if (tableDesc == null) {
+        throw new IOException("Can't get table meta data from catalog: " +
+            PlannerUtil.getStoreTableName(masterPlan.getLogicalPlan()));
+      }
+      ranges = StorageManager.getStorageManager(subQuery.getContext().getConf(), storeType)
+          .getInsertSortRanges(subQuery.getContext().getQueryContext(), tableDesc,
+              sortNode.getInSchema(), sortSpecs,
+              mergedRange);
+      determinedTaskNum = ranges.length;
     } else {
-      determinedTaskNum = maxNum;
-    }
+      RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs);
+      BigInteger card = partitioner.getTotalCardinality();
+
+      // if the number of the range cardinality is less than the desired number of tasks,
+      // we set the the number of tasks to the number of range cardinality.
+      if (card.compareTo(BigInteger.valueOf(maxNum)) < 0) {
+        LOG.info(subQuery.getId() + ", The range cardinality (" + card
+            + ") is less then the desired number of tasks (" + maxNum + ")");
+        determinedTaskNum = card.intValue();
+      } else {
+        determinedTaskNum = maxNum;
+      }
 
-    LOG.info(subQuery.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum +
-        " sub ranges (total units: " + determinedTaskNum + ")");
-    TupleRange [] ranges = partitioner.partition(determinedTaskNum);
-    if (ranges == null || ranges.length == 0) {
-      LOG.warn(subQuery.getId() + " no range infos.");
-    }
-    TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges);
-    if (LOG.isDebugEnabled()) {
-      if (ranges != null) {
-        for (TupleRange eachRange : ranges) {
-          LOG.debug(subQuery.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd());
+      LOG.info(subQuery.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum +
+          " sub ranges (total units: " + determinedTaskNum + ")");
+      ranges = partitioner.partition(determinedTaskNum);
+      if (ranges == null || ranges.length == 0) {
+        LOG.warn(subQuery.getId() + " no range infos.");
+      }
+      TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges);
+      if (LOG.isDebugEnabled()) {
+        if (ranges != null) {
+          for (TupleRange eachRange : ranges) {
+            LOG.debug(subQuery.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd());
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 18d4c28..6676072 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -23,7 +23,6 @@ import com.google.common.collect.Lists;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -37,7 +36,7 @@ import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.statistics.ColumnStats;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -62,7 +61,6 @@ import org.apache.tajo.storage.FileStorageManager;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.KeyValueSet;
@@ -677,14 +675,14 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     }
 
     DataChannel channel = masterPlan.getOutgoingChannels(getId()).get(0);
-    // get default or store type
-    CatalogProtos.StoreType storeType = CatalogProtos.StoreType.CSV; // default setting
 
     // if store plan (i.e., CREATE or INSERT OVERWRITE)
-    StoreTableNode storeTableNode = PlannerUtil.findTopNode(getBlock().getPlan(), NodeType.STORE);
-    if (storeTableNode != null) {
-      storeType = storeTableNode.getStorageType();
+    StoreType storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan());
+    if (storeType == null) {
+      // get default or store type
+      storeType = StoreType.CSV;
     }
+
     schema = channel.getSchema();
     meta = CatalogUtil.newTableMeta(storeType, new KeyValueSet());
     inputStatistics = statsArray[0];
@@ -1058,7 +1056,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       } else {
         StorageManager storageManager =
             StorageManager.getStorageManager(subQuery.getContext().getConf(), meta.getStoreType());
-        fragments = storageManager.getSplits(scan.getCanonicalName(), table);
+        fragments = storageManager.getSplits(scan.getCanonicalName(), table, scan);
       }
 
       SubQuery.scheduleFragments(subQuery, fragments);


Mime
View raw message