tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From blrun...@apache.org
Subject [2/2] tajo git commit: TAJO-1346: Create dynamic partitions to CatalogStore by running insert query or CTAS query. (jaehwa)
Date Fri, 31 Jul 2015 03:07:00 GMT
TAJO-1346: Create dynamic partitions to CatalogStore by running insert query or CTAS query. (jaehwa)

Closes #630


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/d80c32b2
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/d80c32b2
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/d80c32b2

Branch: refs/heads/master
Commit: d80c32b28738a69c3a512108e65ed5c7a3e3adc6
Parents: 00ccb8b
Author: JaeHwa Jung <blrunner@apache.org>
Authored: Fri Jul 31 12:04:02 2015 +0900
Committer: JaeHwa Jung <blrunner@apache.org>
Committed: Fri Jul 31 12:04:02 2015 +0900

----------------------------------------------------------------------
 .../org/apache/tajo/algebra/AlterTable.java     |  29 +-
 .../tajo/catalog/AbstractCatalogClient.java     |  25 ++
 .../src/main/proto/CatalogProtocol.proto        |   1 +
 .../org/apache/tajo/catalog/CatalogService.java |   3 +
 ...xistsAssumedPartitionDirectoryException.java |  28 --
 ...biguousPartitionDirectoryExistException.java |  30 ++
 .../exception/NoPartitionedTableException.java  |  26 --
 .../exception/NoSuchPartitionException.java     |  37 ---
 .../exception/NoSuchPartitionKeyException.java  |  29 --
 .../UndefinedPartitionKeyException.java         |  30 ++
 .../UndefinedPartitionMethodException.java      |  30 ++
 .../src/main/proto/CatalogProtos.proto          |  12 +-
 .../tajo/catalog/store/HiveCatalogStore.java    | 125 +++++++-
 .../org/apache/tajo/catalog/CatalogServer.java  |  46 ++-
 .../tajo/catalog/store/AbstractDBStore.java     | 284 ++++++++++++-------
 .../apache/tajo/catalog/store/CatalogStore.java |   3 +
 .../org/apache/tajo/catalog/store/MemStore.java |  76 +++--
 .../src/main/resources/schemas/derby/derby.xml  |   9 +-
 .../main/resources/schemas/mariadb/mariadb.xml  |  10 +-
 .../src/main/resources/schemas/mysql/mysql.xml  |  10 +-
 .../main/resources/schemas/oracle/oracle.xml    |  11 +-
 .../resources/schemas/postgresql/postgresql.xml |  16 +-
 .../org/apache/tajo/catalog/TestCatalog.java    |   5 +-
 .../java/org/apache/tajo/conf/TajoConf.java     |   4 +
 .../apache/tajo/exception/ErrorMessages.java    |   5 +
 tajo-common/src/main/proto/errors.proto         |   2 +
 .../org/apache/tajo/engine/parser/SQLParser.g4  |   4 +-
 .../apache/tajo/engine/parser/SQLAnalyzer.java  |   2 +
 .../planner/physical/ColPartitionStoreExec.java |  44 ++-
 .../apache/tajo/master/exec/DDLExecutor.java    |  81 +++---
 .../java/org/apache/tajo/querymaster/Query.java |  42 ++-
 .../java/org/apache/tajo/querymaster/Stage.java |  15 +
 .../apache/tajo/querymaster/TaskAttempt.java    |  18 ++
 .../apache/tajo/worker/TaskAttemptContext.java  |  19 ++
 .../java/org/apache/tajo/worker/TaskImpl.java   |   4 +
 tajo-core/src/main/proto/ResourceProtos.proto   |   1 +
 .../tajo/engine/parser/TestSQLAnalyzer.java     |  38 +++
 .../tajo/engine/query/TestAlterTable.java       |   3 +-
 .../tajo/engine/query/TestTablePartitions.java  | 130 ++++++++-
 .../alter_table_add_partition2.sql              |   1 +
 .../alter_table_drop_partition2.sql             |   1 +
 .../default/alter_table_add_partition_5.sql     |   1 +
 .../default/alter_table_drop_partition_4.sql    |   1 +
 .../testAlterTableAddDropPartition.result       |   2 +-
 .../src/main/conf/catalog-site.xml.template     |   4 +-
 .../main/sphinx/sql_language/alter_table.rst    |   7 +-
 .../org/apache/tajo/plan/LogicalPlanner.java    |   2 +
 .../tajo/plan/logical/AlterTableNode.java       |  22 +-
 .../plan/serder/LogicalNodeDeserializer.java    |   3 +
 tajo-plan/src/main/proto/Plan.proto             |   2 +
 50 files changed, 1002 insertions(+), 331 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTable.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTable.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTable.java
index 260025f..f0bd62a 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTable.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTable.java
@@ -54,6 +54,11 @@ public class AlterTable extends Expr {
   @Expose @SerializedName("IsPurge")
   private boolean purge;
 
+  @Expose @SerializedName("IfNotExists")
+  private boolean ifNotExists;
+  @Expose @SerializedName("IfExists")
+  private boolean ifExists;
+
   public AlterTable(final String tableName) {
     super(OpType.AlterTable);
     this.tableName = tableName;
@@ -140,10 +145,26 @@ public class AlterTable extends Expr {
     this.purge = purge;
   }
 
+  public boolean isIfNotExists() {
+    return ifNotExists;
+  }
+
+  public void setIfNotExists(boolean ifNotExists) {
+    this.ifNotExists = ifNotExists;
+  }
+
+  public boolean isIfExists() {
+    return ifExists;
+  }
+
+  public void setIfExists(boolean ifExists) {
+    this.ifExists = ifExists;
+  }
+
   @Override
   public int hashCode() {
     return Objects.hashCode(tableName, newTableName, columnName, newColumnName, addNewColumn, alterTableOpType,
-      columns, values, location, params, purge
+      columns, values, location, params, purge, ifNotExists, ifExists
     );
 
   }
@@ -161,7 +182,9 @@ public class AlterTable extends Expr {
         TUtil.checkEquals(values, another.values) &&
         TUtil.checkEquals(location, another.location) &&
         TUtil.checkEquals(params, another.params) &&
-        TUtil.checkEquals(purge, another.purge)
+        TUtil.checkEquals(purge, another.purge) &&
+        TUtil.checkEquals(ifNotExists, another.ifNotExists) &&
+        TUtil.checkEquals(ifExists, another.ifExists)
     ;
   }
 
@@ -183,6 +206,8 @@ public class AlterTable extends Expr {
       alter.params = new HashMap<String, String>(params);
     }
     alter.purge = purge;
+    alter.ifNotExists = ifNotExists;
+    alter.ifExists = ifExists;
     return alter;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 402df0f..52f4b8e 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -416,6 +416,31 @@ public abstract class AbstractCatalogClient implements CatalogService, Closeable
   }
 
   @Override
+  public boolean addPartitions(String databaseName, String tableName, List<PartitionDescProto> partitions
+    , boolean ifNotExists) {
+    try {
+      final BlockingInterface stub = getStub();
+      final AddPartitionsProto.Builder builder = AddPartitionsProto.newBuilder();
+
+      TableIdentifierProto.Builder identifier = TableIdentifierProto.newBuilder();
+      identifier.setDatabaseName(databaseName);
+      identifier.setTableName(tableName);
+      builder.setTableIdentifier(identifier.build());
+
+      for (PartitionDescProto partition: partitions) {
+        builder.addPartitionDesc(partition);
+      }
+
+      builder.setIfNotExists(ifNotExists);
+
+      return isSuccess(stub.addPartitions(null, builder.build()));
+    } catch (ServiceException e) {
+      LOG.error(e.getMessage(), e);
+      return false;
+    }
+  }
+
+  @Override
   public final Collection<String> getAllTableNames(final String databaseName) {
     try {
       final BlockingInterface stub = getStub();

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
index ee74aa0..39201e6 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
+++ b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
@@ -124,6 +124,7 @@ service CatalogProtocolService {
   rpc getPartitionByPartitionName(PartitionIdentifierProto) returns (GetPartitionDescResponse);
   rpc getPartitionsByTableName(PartitionIdentifierProto) returns (GetPartitionsResponse);
   rpc getAllPartitions(NullProto) returns (GetTablePartitionsResponse);
+  rpc addPartitions(AddPartitionsProto) returns (ReturnState);
 
   rpc createIndex(IndexDescProto) returns (ReturnState);
   rpc dropIndex(IndexNameProto) returns (ReturnState);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java
index 7704191..26fc564 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java
@@ -183,6 +183,9 @@ public interface CatalogService {
 
   List<TablePartitionProto> getAllPartitions();
 
+  boolean addPartitions(String databaseName, String tableName, List<PartitionDescProto> partitions
+    , boolean ifNotExists);
+
   boolean createIndex(IndexDesc index);
 
   boolean existIndexByName(String databaseName, String indexName);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsAssumedPartitionDirectoryException.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsAssumedPartitionDirectoryException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsAssumedPartitionDirectoryException.java
deleted file mode 100644
index df13f82..0000000
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsAssumedPartitionDirectoryException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.catalog.exception;
-
-public class AlreadyExistsAssumedPartitionDirectoryException extends RuntimeException {
-
-  private static final long serialVersionUID = 277182608283894931L;
-
-  public AlreadyExistsAssumedPartitionDirectoryException(String message) {
-    super(String.format("ERROR: There is a directory which is assumed to be a partitioned directory : %s", message));
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AmbiguousPartitionDirectoryExistException.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AmbiguousPartitionDirectoryExistException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AmbiguousPartitionDirectoryExistException.java
new file mode 100644
index 0000000..0c99a4f
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AmbiguousPartitionDirectoryExistException.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.exception;
+
+
+import org.apache.tajo.error.Errors.ResultCode;
+
+public class AmbiguousPartitionDirectoryExistException extends CatalogException {
+	private static final long serialVersionUID = 277182608283894931L;
+
+	public AmbiguousPartitionDirectoryExistException(String columnName) {
+		super(ResultCode.AMBIGUOUS_PARTITION_DIRECTORY, columnName);
+	}
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoPartitionedTableException.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoPartitionedTableException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoPartitionedTableException.java
deleted file mode 100644
index e81f526..0000000
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoPartitionedTableException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.catalog.exception;
-
-public class NoPartitionedTableException extends Exception {
-
-  public NoPartitionedTableException(String databaseName, String relName) {
-    super(String.format("ERROR: table \"%s.%s\" is not a partitioned table", databaseName, relName));
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionException.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionException.java
deleted file mode 100644
index 70e0d26..0000000
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.catalog.exception;
-
-public class NoSuchPartitionException extends RuntimeException {
-
-  private static final long serialVersionUID = 277182608283894938L;
-
-  public NoSuchPartitionException(String message) {
-    super(message);
-  }
-
-  public NoSuchPartitionException(String tableName, String partitionName) {
-    super(String.format("ERROR: \"%s\" is not the partition of \"%s\".", partitionName, tableName));
-  }
-
-  public NoSuchPartitionException(String databaseName, String tableName, String partitionName) {
-    super(String.format("ERROR: \"%s\" is not the partition of \"%s.%s\".", partitionName, databaseName, tableName));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionKeyException.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionKeyException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionKeyException.java
deleted file mode 100644
index 94574dc..0000000
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionKeyException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.catalog.exception;
-
-public class NoSuchPartitionKeyException extends RuntimeException {
-
-  private static final long serialVersionUID = 277182608283894939L;
-
-  public NoSuchPartitionKeyException(String tableName, String partitionKey) {
-    super(String.format("ERROR: \"%s\" column is not the partition key of \"%s\".",
-      partitionKey, tableName));
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/UndefinedPartitionKeyException.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/UndefinedPartitionKeyException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/UndefinedPartitionKeyException.java
new file mode 100644
index 0000000..5e6d20f
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/UndefinedPartitionKeyException.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.exception;
+
+import org.apache.tajo.error.Errors.ResultCode;
+
+public class UndefinedPartitionKeyException extends CatalogException {
+
+  private static final long serialVersionUID = 277182608283894939L;
+
+  public UndefinedPartitionKeyException(String partitionKey) {
+    super(ResultCode.UNDEFINED_PARTITION_KEY, partitionKey);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/UndefinedPartitionMethodException.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/UndefinedPartitionMethodException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/UndefinedPartitionMethodException.java
new file mode 100644
index 0000000..5b6eb04
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/UndefinedPartitionMethodException.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.exception;
+
+import org.apache.tajo.error.Errors.ResultCode;
+
+public class UndefinedPartitionMethodException extends CatalogException {
+
+  private static final long serialVersionUID = 277182608283894949L;
+
+  public UndefinedPartitionMethodException(String partitionName) {
+    super(ResultCode.UNDEFINED_PARTITION_METHOD, partitionName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index 86fee86..a67be97 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -236,15 +236,23 @@ message PartitionMethodProto {
   required SchemaProto expressionSchema = 4;
 }
 
+message AddPartitionsProto {
+  required TableIdentifierProto tableIdentifier = 1;
+  repeated PartitionDescProto partitionDesc = 2;
+  required bool ifNotExists = 3;
+}
+
 message PartitionDescProto {
   required string partitionName = 1;
   repeated PartitionKeyProto partitionKeys = 2;
   optional string path = 3;
+  optional int32 id = 4;
 }
 
 message PartitionKeyProto {
   required string columnName = 1;
-  required string partitionValue = 2;
+  optional string parentColumnName = 2;
+  required string partitionValue = 3;
 }
 
 message PartitionIdentifierProto {
@@ -396,4 +404,4 @@ message IndexListResponse {
 message IndexResponse {
   required ReturnState state = 1;
   optional IndexDescProto indexDesc = 2;
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
index b49499f..a0ff5c8 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
@@ -55,6 +55,7 @@ import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.TUtil;
 import org.apache.thrift.TException;
 
 import java.io.IOException;
@@ -768,14 +769,82 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore {
   @Override
   public CatalogProtos.PartitionMethodProto getPartitionMethod(String databaseName, String tableName)
       throws CatalogException {
-    // TODO - not implemented yet
-    throw new UnsupportedOperationException();
+    org.apache.hadoop.hive.ql.metadata.Table table;
+    HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+
+    PartitionMethodDesc partitionMethodDesc = null;
+    try {
+      try {
+        client = clientPool.getClient();
+        table = HiveCatalogUtil.getTable(client.getHiveClient(), databaseName, tableName);
+      } catch (NoSuchObjectException nsoe) {
+        throw new UndefinedTableException(tableName);
+      } catch (Exception e) {
+        throw new TajoInternalError(e);
+      }
+
+      // set partition keys
+      List<FieldSchema> partitionKeys = table.getPartitionKeys();
+
+      if (partitionKeys != null && partitionKeys.size() > 0) {
+        org.apache.tajo.catalog.Schema expressionSchema = new org.apache.tajo.catalog.Schema();
+        StringBuilder sb = new StringBuilder();
+        if (partitionKeys.size() > 0) {
+          for (int i = 0; i < partitionKeys.size(); i++) {
+            FieldSchema fieldSchema = partitionKeys.get(i);
+            TajoDataTypes.Type dataType = HiveCatalogUtil.getTajoFieldType(fieldSchema.getType().toString());
+            String fieldName = databaseName + CatalogConstants.IDENTIFIER_DELIMITER + tableName +
+              CatalogConstants.IDENTIFIER_DELIMITER + fieldSchema.getName();
+            expressionSchema.addColumn(new Column(fieldName, dataType));
+            if (i > 0) {
+              sb.append(",");
+            }
+            sb.append(fieldSchema.getName());
+          }
+          partitionMethodDesc = new PartitionMethodDesc(
+            databaseName,
+            tableName,
+            PartitionType.COLUMN,
+            sb.toString(),
+            expressionSchema);
+        }
+      } else {
+        throw new UndefinedPartitionMethodException(tableName);
+      }
+    } finally {
+      if(client != null) client.release();
+    }
+
+    return partitionMethodDesc.getProto();
   }
 
   @Override
   public boolean existPartitionMethod(String databaseName, String tableName) throws CatalogException {
-    // TODO - not implemented yet
-    throw new UnsupportedOperationException();
+    boolean exist = false;
+    org.apache.hadoop.hive.ql.metadata.Table table;
+    HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+
+    try {
+      try {
+        client = clientPool.getClient();
+        table = HiveCatalogUtil.getTable(client.getHiveClient(), databaseName, tableName);
+      } catch (NoSuchObjectException nsoe) {
+        throw new UndefinedTableException(tableName);
+      } catch (Exception e) {
+        throw new TajoInternalError(e);
+      }
+
+      // set partition keys
+      List<FieldSchema> partitionKeys = table.getPartitionKeys();
+
+      if (partitionKeys != null && partitionKeys.size() > 0) {
+        exist = true;
+      }
+    } finally {
+      if(client != null) client.release();
+    }
+
+    return exist;
   }
 
   @Override
@@ -957,6 +1026,54 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore {
   }
 
   @Override
+  public void addPartitions(String databaseName, String tableName, List<CatalogProtos.PartitionDescProto> partitions
+    , boolean ifNotExists) throws CatalogException {
+    HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+    List<Partition> addPartitions = TUtil.newList();
+    CatalogProtos.PartitionDescProto existingPartition = null;
+
+    try {
+      client = clientPool.getClient();
+      for (CatalogProtos.PartitionDescProto partitionDescProto : partitions) {
+        existingPartition = getPartition(databaseName, tableName, partitionDescProto.getPartitionName());
+
+        // Unfortunately, hive client add_partitions doesn't run as expected. The method never read the ifNotExists
+        // parameter. So, if Tajo adds existing partition to Hive, it will threw AlreadyExistsException. To avoid
+        // above error, we need to filter existing partitions before call add_partitions.
+        if (existingPartition != null) {
+          Partition partition = new Partition();
+          partition.setDbName(databaseName);
+          partition.setTableName(tableName);
+
+          List<String> values = Lists.newArrayList();
+          for(CatalogProtos.PartitionKeyProto keyProto : partitionDescProto.getPartitionKeysList()) {
+            values.add(keyProto.getPartitionValue());
+          }
+          partition.setValues(values);
+
+          Table table = client.getHiveClient().getTable(databaseName, tableName);
+          StorageDescriptor sd = table.getSd();
+          sd.setLocation(partitionDescProto.getPath());
+          partition.setSd(sd);
+
+          addPartitions.add(partition);
+        }
+      }
+
+      if (addPartitions.size() > 0) {
+        client.getHiveClient().add_partitions(addPartitions, true, true);
+      }
+    } catch (Exception e) {
+      throw new TajoInternalError(e);
+    } finally {
+      if (client != null) {
+        client.release();
+      }
+    }
+
+  }
+
+  @Override
   public List<TableOptionProto> getAllTableProperties() throws CatalogException {
     throw new UnsupportedOperationException();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index 0327367..4ed1ae8 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -950,13 +950,16 @@ public class CatalogServer extends AbstractService {
 
             if (store.existPartitionMethod(dbName, tbName)) {
               PartitionDescProto partitionDesc = store.getPartition(dbName, tbName, partitionName);
-
-
-              return GetPartitionDescResponse.newBuilder()
+              if (partitionDesc != null) {
+                return GetPartitionDescResponse.newBuilder()
                   .setState(OK)
                   .setPartition(partitionDesc)
                   .build();
-
+              } else {
+                return GetPartitionDescResponse.newBuilder()
+                  .setState(errUndefinedPartition(partitionName))
+                  .build();
+              }
             } else {
               return GetPartitionDescResponse.newBuilder()
                   .setState(errUndefinedPartitionMethod(tbName))
@@ -1065,6 +1068,41 @@ public class CatalogServer extends AbstractService {
     }
 
     @Override
+    public ReturnState addPartitions(RpcController controller, AddPartitionsProto request) {
+
+      TableIdentifierProto identifier = request.getTableIdentifier();
+      String databaseName = identifier.getDatabaseName();
+      String tableName = identifier.getTableName();
+
+      rlock.lock();
+      try {
+        boolean contain;
+
+        contain = store.existDatabase(databaseName);
+        if (contain) {
+          contain = store.existTable(databaseName, tableName);
+          if (contain) {
+            if (store.existPartitionMethod(databaseName, tableName)) {
+              store.addPartitions(databaseName, tableName, request.getPartitionDescList(), request.getIfNotExists());
+              return OK;
+            } else {
+              return errUndefinedPartitionMethod(tableName);
+            }
+          } else {
+            return errUndefinedTable(tableName);
+          }
+        } else {
+          return errUndefinedDatabase(databaseName);
+        }
+      } catch (Throwable t) {
+        printStackTraceIfError(LOG, t);
+        return returnError(t);
+      } finally {
+        rlock.unlock();
+      }
+    }
+
+    @Override
     public ReturnState createIndex(RpcController controller, IndexDescProto indexDesc) {
       String dbName = indexDesc.getTableIdentifier().getDatabaseName();
       

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
index 2e9c340..b62624a 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
@@ -33,6 +33,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.*;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.util.FileUtil;
@@ -40,11 +41,7 @@ import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.sql.*;
 import java.util.*;
 
 import static org.apache.tajo.catalog.exception.CatalogExceptionUtil.makeCatalogUpgrade;
@@ -59,6 +56,23 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
   protected final String connectionPassword;
   protected final String catalogUri;
 
+  protected final String insertPartitionSql = "INSERT INTO " + TB_PARTTIONS
+    + "(" + COL_TABLES_PK + ", PARTITION_NAME, PATH) VALUES (?, ? , ?)";
+
+  protected final String insertPartitionKeysSql = "INSERT INTO " + TB_PARTTION_KEYS  + "("
+    + COL_PARTITIONS_PK + ", " + COL_TABLES_PK + ", "
+    + COL_COLUMN_NAME + ", " + COL_PARTITION_VALUE + ")"
+    + " VALUES ( ("
+    + " SELECT " + COL_PARTITIONS_PK + " FROM " + TB_PARTTIONS
+    + " WHERE " + COL_TABLES_PK + " = ? AND PARTITION_NAME = ? ) "
+    + " , ?, ?, ?)";
+
+  protected final String deletePartitionSql = "DELETE FROM " + TB_PARTTIONS
+    + " WHERE " + COL_PARTITIONS_PK + " = ? ";
+
+  protected final String deletePartitionKeysSql = "DELETE FROM " + TB_PARTTIONS
+    + " WHERE " + COL_PARTITIONS_PK + " = ? ";
+
   private Connection conn;
   
   protected XMLCatalogSchemaManager catalogSchemaManager;
@@ -935,9 +949,9 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
         }
 
         pstmt = conn.prepareStatement(statSql);
-        pstmt.setInt(1, tableId);
-        pstmt.setLong(2, statsProto.getStats().getNumRows());
-        pstmt.setLong(3, statsProto.getStats().getNumBytes());
+        pstmt.setLong(1, statsProto.getStats().getNumRows());
+        pstmt.setLong(2, statsProto.getStats().getNumBytes());
+        pstmt.setInt(3, tableId);
         pstmt.executeUpdate();
       }
 
@@ -1007,7 +1021,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
           if(partitionDesc == null) {
             throw new UndefinedPartitionException(partitionName);
           }
-          dropPartition(tableId, alterTableDescProto.getPartitionDesc().getPartitionName());
+          dropPartition(partitionDesc.getId());
           break;
         case SET_PROPERTY:
           setProperties(tableId, alterTableDescProto.getParams());
@@ -1243,116 +1257,81 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
 
   public void addPartition(int tableId, CatalogProtos.PartitionDescProto partition) throws CatalogException {
     Connection conn = null;
-    PreparedStatement pstmt = null;
-    final String ADD_PARTITION_SQL =
-      "INSERT INTO " + TB_PARTTIONS
-        + " (" + COL_TABLES_PK + ", PARTITION_NAME, PATH) VALUES (?,?,?)";
-
-    final String ADD_PARTITION_KEYS_SQL =
-      "INSERT INTO " + TB_PARTTION_KEYS + " (" + COL_PARTITIONS_PK + ", " + COL_COLUMN_NAME + ", "
-      + COL_PARTITION_VALUE + ") VALUES (?,?,?)";
+    PreparedStatement pstmt1 = null, pstmt2 = null;
 
     try {
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(ADD_PARTITION_SQL);
-      }
-
       conn = getConnection();
-      pstmt = conn.prepareStatement(ADD_PARTITION_SQL);
-      pstmt.setInt(1, tableId);
-      pstmt.setString(2, partition.getPartitionName());
-      pstmt.setString(3, partition.getPath());
-      pstmt.executeUpdate();
-      pstmt.close();
+      conn.setAutoCommit(false);
 
-      if (partition.getPartitionKeysCount() > 0) {
-        pstmt = conn.prepareStatement(ADD_PARTITION_KEYS_SQL);
-        int partitionId = getPartitionId(tableId, partition.getPartitionName());
-        addPartitionKeys(pstmt, partitionId, partition);
-        pstmt.executeBatch();
-      }
-    } catch (SQLException se) {
-      throw new TajoInternalError(se);
-    } finally {
-      CatalogUtil.closeQuietly(pstmt);
-    }
-  }
+      pstmt1 = conn.prepareStatement(insertPartitionSql);
+      pstmt1.setInt(1, tableId);
+      pstmt1.setString(2, partition.getPartitionName());
+      pstmt1.setString(3, partition.getPath());
+      pstmt1.executeUpdate();
 
-  public int getPartitionId(int tableId, String partitionName) throws CatalogException {
-    Connection conn = null;
-    ResultSet res = null;
-    PreparedStatement pstmt = null;
-    int retValue = -1;
+      pstmt2 = conn.prepareStatement(insertPartitionKeysSql);
 
-    try {
-      String sql = "SELECT " + COL_PARTITIONS_PK + " FROM " + TB_PARTTIONS +
-        " WHERE " + COL_TABLES_PK + " = ? AND PARTITION_NAME = ? ";
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(sql);
+      for (int i = 0; i < partition.getPartitionKeysCount(); i++) {
+        PartitionKeyProto partitionKey = partition.getPartitionKeys(i);
+        pstmt2.setInt(1, tableId);
+        pstmt2.setString(2, partition.getPartitionName());
+        pstmt2.setInt(3, tableId);
+        pstmt2.setString(4, partitionKey.getColumnName());
+        pstmt2.setString(5, partitionKey.getPartitionValue());
+        pstmt2.addBatch();
+        pstmt2.clearParameters();
       }
+      pstmt2.executeBatch();
 
-      conn = getConnection();
-      pstmt = conn.prepareStatement(sql);
-      pstmt.setInt(1, tableId);
-      pstmt.setString(2, partitionName);
-      res = pstmt.executeQuery();
-
-      if (res.next()) {
-        retValue = res.getInt(1);
+      if (conn != null) {
+        conn.commit();
       }
     } catch (SQLException se) {
+      if (conn != null) {
+        try {
+          conn.rollback();
+        } catch (SQLException e) {
+          LOG.error(e, e);
+        }
+      }
       throw new TajoInternalError(se);
     } finally {
-      CatalogUtil.closeQuietly(pstmt, res);
+      CatalogUtil.closeQuietly(pstmt1);
+      CatalogUtil.closeQuietly(pstmt2);
     }
-    return retValue;
   }
 
-  private void addPartitionKeys(PreparedStatement pstmt, int partitionId, PartitionDescProto partition) throws
-    SQLException {
-    for (int i = 0; i < partition.getPartitionKeysCount(); i++) {
-      PartitionKeyProto partitionKey = partition.getPartitionKeys(i);
-
-      pstmt.setInt(1, partitionId);
-      pstmt.setString(2, partitionKey.getColumnName());
-      pstmt.setString(3, partitionKey.getPartitionValue());
-
-      pstmt.addBatch();
-      pstmt.clearParameters();
-    }
-  }
-
-
-  private void dropPartition(int tableId, String partitionName) throws CatalogException {
+  private void dropPartition(int partitionId) throws CatalogException {
     Connection conn = null;
-    PreparedStatement pstmt = null;
+    PreparedStatement pstmt1 = null, pstmt2 = null;
 
     try {
-      int partitionId = getPartitionId(tableId, partitionName);
-
-      String sqlDeletePartitionKeys = "DELETE FROM " + TB_PARTTION_KEYS + " WHERE " + COL_PARTITIONS_PK + " = ? ";
-      String sqlDeletePartition = "DELETE FROM " + TB_PARTTIONS + " WHERE " + COL_PARTITIONS_PK + " = ? ";
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(sqlDeletePartitionKeys);
-      }
-
       conn = getConnection();
-      pstmt = conn.prepareStatement(sqlDeletePartitionKeys);
-      pstmt.setInt(1, partitionId);
-      pstmt.executeUpdate();
-      pstmt.close();
+      conn.setAutoCommit(false);
 
-      pstmt = conn.prepareStatement(sqlDeletePartition);
-      pstmt.setInt(1, partitionId);
-      pstmt.executeUpdate();
+      pstmt1 = conn.prepareStatement(deletePartitionKeysSql);
+      pstmt1.setInt(1, partitionId);
+      pstmt1.executeUpdate();
+
+      pstmt2 = conn.prepareStatement(deletePartitionSql);
+      pstmt2.setInt(1, partitionId);
+      pstmt2.executeUpdate();
 
+      if (conn != null) {
+        conn.commit();
+      }
     } catch (SQLException se) {
+      if (conn != null) {
+        try {
+          conn.rollback();
+        } catch (SQLException e) {
+          LOG.error(e, e);
+        }
+      }
       throw new TajoInternalError(se);
     } finally {
-      CatalogUtil.closeQuietly(pstmt);
+      CatalogUtil.closeQuietly(pstmt1);
+      CatalogUtil.closeQuietly(pstmt2);
     }
   }
 
@@ -2054,6 +2033,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
 
       if (res.next()) {
         builder = PartitionDescProto.newBuilder();
+        builder.setId(res.getInt(COL_PARTITIONS_PK));
         builder.setPath(res.getString("PATH"));
         builder.setPartitionName(partitionName);
         setPartitionKeys(res.getInt(COL_PARTITIONS_PK), builder);
@@ -2075,7 +2055,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
     PreparedStatement pstmt = null;
 
     try {
-      String sql = "SELECT "+ COL_COLUMN_NAME + " , "+ COL_PARTITION_VALUE
+      String sql = "SELECT "+ COL_COLUMN_NAME  + " , "+ COL_PARTITION_VALUE
         + " FROM " + TB_PARTTION_KEYS + " WHERE " + COL_PARTITIONS_PK + " = ? ";
 
       conn = getConnection();
@@ -2169,6 +2149,118 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
     return partitions;
   }
 
+  @Override
+  public void addPartitions(String databaseName, String tableName, List<CatalogProtos.PartitionDescProto> partitions
+    , boolean ifNotExists) throws CatalogException {
+    Connection conn = null;
+
+    // To delete existing partition keys
+    PreparedStatement pstmt1 = null;
+    // To delete existing partition;
+    PreparedStatement pstmt2 = null;
+    // To insert a partition
+    PreparedStatement pstmt3 = null;
+    // To insert partition keys
+    PreparedStatement pstmt4 = null;
+
+    PartitionDescProto partitionDesc = null;
+
+    try {
+      int databaseId = getDatabaseId(databaseName);
+      int tableId = getTableId(databaseId, databaseName, tableName);
+
+      conn = getConnection();
+      conn.setAutoCommit(false);
+
+      int currentIndex = 0, lastIndex = 0;
+
+      pstmt1 = conn.prepareStatement(deletePartitionKeysSql);
+      pstmt2 = conn.prepareStatement(deletePartitionSql);
+      pstmt3 = conn.prepareStatement(insertPartitionSql);
+      pstmt4 = conn.prepareStatement(insertPartitionKeysSql);
+
+      // Set a batch size like 1000. This avoids SQL injection and also takes care of out of memory issue.
+      int batchSize = conf.getInt(TajoConf.ConfVars.PARTITION_DYNAMIC_BULK_INSERT_BATCH_SIZE.varname, 1000);
+      for(currentIndex = 0; currentIndex < partitions.size(); currentIndex++) {
+        PartitionDescProto partition = partitions.get(currentIndex);
+        partitionDesc = getPartition(databaseName, tableName, partition.getPartitionName());
+
+        // Delete existing partition and partition keys
+        if (partitionDesc != null) {
+          if(ifNotExists) {
+            pstmt1.setInt(1, partitionDesc.getId());
+            pstmt1.addBatch();
+            pstmt1.clearParameters();
+
+            pstmt2.setInt(1, partitionDesc.getId());
+            pstmt2.addBatch();
+            pstmt2.clearParameters();
+          } else {
+            throw new DuplicatePartitionException(partition.getPartitionName());
+          }
+        }
+
+        // Insert partition
+        pstmt3.setInt(1, tableId);
+        pstmt3.setString(2, partition.getPartitionName());
+        pstmt3.setString(3, partition.getPath());
+        pstmt3.addBatch();
+        pstmt3.clearParameters();
+
+        // Insert partition keys
+        for (int i = 0; i < partition.getPartitionKeysCount(); i++) {
+          PartitionKeyProto partitionKey = partition.getPartitionKeys(i);
+          pstmt4.setInt(1, tableId);
+          pstmt4.setString(2, partition.getPartitionName());
+          pstmt4.setInt(3, tableId);
+          pstmt4.setString(4, partitionKey.getColumnName());
+          pstmt4.setString(5, partitionKey.getPartitionValue());
+
+          pstmt4.addBatch();
+          pstmt4.clearParameters();
+        }
+
+        // Execute batch
+        if (currentIndex >= lastIndex + batchSize && lastIndex != currentIndex) {
+          pstmt1.executeBatch();
+          pstmt1.clearBatch();
+          pstmt2.executeBatch();
+          pstmt2.clearBatch();
+          pstmt3.executeBatch();
+          pstmt3.clearBatch();
+          pstmt4.executeBatch();
+          pstmt4.clearBatch();
+          lastIndex = currentIndex;
+        }
+      }
+
+      // Execute existing batch queries
+      if (lastIndex != currentIndex) {
+        pstmt1.executeBatch();
+        pstmt2.executeBatch();
+        pstmt3.executeBatch();
+        pstmt4.executeBatch();
+      }
+
+      if (conn != null) {
+        conn.commit();
+      }
+    } catch (SQLException se) {
+      if (conn != null) {
+        try {
+          conn.rollback();
+        } catch (SQLException e) {
+          LOG.error(e, e);
+        }
+      }
+      throw new TajoInternalError(se);
+    } finally {
+      CatalogUtil.closeQuietly(pstmt1);
+      CatalogUtil.closeQuietly(pstmt2);
+      CatalogUtil.closeQuietly(pstmt3);
+      CatalogUtil.closeQuietly(pstmt4);
+    }
+  }
 
   @Override
   public void createIndex(final IndexDescProto proto) throws CatalogException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
index d8d0103..ef9ddd0 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
@@ -106,6 +106,9 @@ public interface CatalogStore extends Closeable {
 
   List<TablePartitionProto> getAllPartitions() throws CatalogException;
 
+  void addPartitions(String databaseName, String tableName, List<CatalogProtos.PartitionDescProto> partitions
+    , boolean ifNotExists) throws CatalogException;
+
   /**************************** INDEX *******************************/
   void createIndex(IndexDescProto proto) throws CatalogException;
   

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
index 74b6023..bcd9ce9 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
@@ -322,37 +322,13 @@ public class MemStore implements CatalogStore {
         if (partitions.containsKey(tableName) && partitions.get(tableName).containsKey(partitionName)) {
           throw new DuplicatePartitionException(partitionName);
         } else {
-          CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder();
-          builder.setPartitionName(partitionName);
-          builder.setPath(partitionDesc.getPath());
-
-          if (partitionDesc.getPartitionKeysCount() > 0) {
-            for (CatalogProtos.PartitionKeyProto eachKey : partitionDesc.getPartitionKeysList()) {
-              CatalogProtos.PartitionKeyProto.Builder keyBuilder = CatalogProtos.PartitionKeyProto.newBuilder();
-              keyBuilder.setColumnName(eachKey.getColumnName());
-              keyBuilder.setPartitionValue(eachKey.getPartitionValue());
-              builder.addPartitionKeys(keyBuilder.build());
-            }
-          }
-
-          Map<String, CatalogProtos.PartitionDescProto> protoMap = null;
-          if (!partitions.containsKey(tableName)) {
-            protoMap = Maps.newHashMap();
-          } else {
-            protoMap = partitions.get(tableName);
-          }
-          protoMap.put(partitionName, builder.build());
-          partitions.put(tableName, protoMap);
+          addPartition(partitionDesc, tableName, partitionName);
         }
         break;
       case DROP_PARTITION:
         partitionDesc = alterTableDescProto.getPartitionDesc();
         partitionName = partitionDesc.getPartitionName();
-        if(!partitions.containsKey(tableName)) {
-          throw new UndefinedPartitionException(partitionName);
-        } else {
-          partitions.get(tableName).remove(partitionName);
-        }
+        dropPartition(databaseName, tableName, partitionName);
         break;
       case SET_PROPERTY:
         KeyValueSet properties = new KeyValueSet(tableDescProto.getMeta().getParams());
@@ -372,6 +348,37 @@ public class MemStore implements CatalogStore {
     }
   }
 
+  private void addPartition(CatalogProtos.PartitionDescProto partitionDesc, String tableName, String partitionName) {
+    CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder();
+    builder.setPartitionName(partitionName);
+    builder.setPath(partitionDesc.getPath());
+
+    if (partitionDesc.getPartitionKeysCount() > 0) {
+      for (CatalogProtos.PartitionKeyProto eachKey : partitionDesc.getPartitionKeysList()) {
+        CatalogProtos.PartitionKeyProto.Builder keyBuilder = CatalogProtos.PartitionKeyProto.newBuilder();
+        keyBuilder.setColumnName(eachKey.getColumnName());
+        keyBuilder.setPartitionValue(eachKey.getPartitionValue());
+        builder.addPartitionKeys(keyBuilder.build());
+      }
+    }
+
+    Map<String, CatalogProtos.PartitionDescProto> protoMap = null;
+    if (!partitions.containsKey(tableName)) {
+      protoMap = Maps.newHashMap();
+    } else {
+      protoMap = partitions.get(tableName);
+    }
+    protoMap.put(partitionName, builder.build());
+    partitions.put(tableName, protoMap);
+  }
+
+  private void dropPartition(String databaseName, String tableName, String partitionName) {
+    if(!partitions.containsKey(tableName)) {
+      throw new UndefinedPartitionException(partitionName);
+    } else {
+      partitions.get(tableName).remove(partitionName);
+    }
+  }
 
   private int getIndexOfColumnToBeRenamed(List<CatalogProtos.ColumnProto> fieldList, String columnName) {
     int fieldCount = fieldList.size();
@@ -608,6 +615,23 @@ public class MemStore implements CatalogStore {
     return protos;
   }
 
+  @Override
+  public void addPartitions(String databaseName, String tableName, List<CatalogProtos.PartitionDescProto> partitions
+    , boolean ifNotExists) throws CatalogException {
+    for(CatalogProtos.PartitionDescProto partition: partitions) {
+      String partitionName = partition.getPartitionName();
+
+      if (this.partitions.containsKey(tableName) && this.partitions.get(tableName).containsKey(partitionName)) {
+        if (ifNotExists) {
+          dropPartition(databaseName, tableName, partitionName);
+        } else {
+          throw new DuplicatePartitionException(partitionName);
+        }
+      }
+      addPartition(partition, tableName, partitionName);
+    }
+  }
+
   /* (non-Javadoc)
    * @see CatalogStore#createIndex(nta.catalog.proto.CatalogProtos.IndexDescProto)
    */

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
index 7ed9118..500ff71 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
@@ -19,6 +19,7 @@
 <tns:store xmlns:tns="http://tajo.apache.org/catalogstore" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd ">
   <!--
       Catalog base version history
+      * 7 - 2015-07-30: Add a column and index for partition keys (TAJO-1346)
       * 6 - 2015-07-24: Merge the index branch into the master branch (TAJO-1300)
       * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616)
       * 4 - 2015-03-27: Partition Schema (TAJO-1284)
@@ -26,7 +27,7 @@
       * 2 - 2014-06-09: First versioning
       * 1-  Before 2013-03-20
     -->
-	<tns:base version="6">
+	<tns:base version="7">
 		<tns:objects>
 			<tns:Object order="0" type="table" name="META">
 				<tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql>
@@ -165,7 +166,8 @@
   				TID INT NOT NULL REFERENCES TABLES (TID) ON DELETE CASCADE,
   				PARTITION_NAME VARCHAR(767),
   				PATH VARCHAR(1024),
-  				CONSTRAINT C_PARTITION_PK PRIMARY KEY (PARTITION_ID)
+  				CONSTRAINT C_PARTITIONS_PK PRIMARY KEY (PARTITION_ID),
+  				CONSTRAINT C_PARTITIONS_UNIQ UNIQUE (TID, PARTITION_NAME)
 				)]]>
 				</tns:sql>
 			</tns:Object>
@@ -176,13 +178,14 @@
         <tns:sql><![CDATA[
 				CREATE TABLE PARTITION_KEYS (
   				PARTITION_ID INT NOT NULL REFERENCES PARTITIONS (PARTITION_ID) ON DELETE CASCADE,
+  				TID INT NOT NULL,
   				COLUMN_NAME VARCHAR(128) NOT NULL,
   				PARTITION_VALUE VARCHAR(255)
 				)]]>
         </tns:sql>
       </tns:Object>
       <tns:Object name="PARTITION_KEYS_IDX" type="index" dependsOn="PARTITION_KEYS" order="21">
-        <tns:sql><![CDATA[CREATE INDEX PARTITION_KEYS_IDX ON PARTITION_KEYS(PARTITION_ID, COLUMN_NAME, PARTITION_VALUE)]]></tns:sql>
+        <tns:sql><![CDATA[CREATE INDEX PARTITION_KEYS_IDX ON PARTITION_KEYS(TID , COLUMN_NAME, PARTITION_VALUE)]]></tns:sql>
       </tns:Object>
     </tns:objects>
 	</tns:base>

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml
index 8750d2b..4583489 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml
@@ -19,6 +19,7 @@
 <tns:store xmlns:tns="http://tajo.apache.org/catalogstore" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd ">
   <!--
       Catalog base version history
+      * 7 - 2015-07-30: Add a column and index for partition keys (TAJO-1346)
       * 6 - 2015-07-24: Merge the index branch into the master branch (TAJO-1300)
       * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616)
       * 4 - 2015-03-27: Partition Schema (TAJO-1284)
@@ -26,7 +27,7 @@
       * 2 - 2014-06-09: First versioning
       * 1-  Before 2013-03-20
     -->
-  <tns:base version="6">
+  <tns:base version="7">
     <tns:objects>
       <tns:Object order="0" type="table" name="META">
         <tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql>
@@ -142,7 +143,7 @@
           TID INT NOT NULL,
           PARTITION_NAME VARCHAR(255) BINARY,
           PATH VARCHAR(4096) BINARY,
-          CONSTRAINT CONST_PARTITION_UNIQUE UNIQUE (PARTITION_NAME),
+          UNIQUE INDEX PARTITION_UNIQUE_IDX (TID, PARTITION_NAME),
           FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE
         )]]>
         </tns:sql>
@@ -151,9 +152,10 @@
         <tns:sql><![CDATA[
         CREATE TABLE PARTITION_KEYS (
           PARTITION_ID INT NOT NULL,
+          TID INT NOT NULL,
           COLUMN_NAME VARCHAR(255) BINARY NOT NULL,
-          PARTITION_VALUE VARCHAR(255) BINARY NOT NULL,
-          UNIQUE INDEX PARTITION_KEYS_IDX (PARTITION_ID, COLUMN_NAME, PARTITION_VALUE),
+          PARTITION_VALUE VARCHAR(255) BINARY,
+          INDEX PARTITION_KEYS_IDX (TID, COLUMN_NAME, PARTITION_VALUE),
           FOREIGN KEY (PARTITION_ID) REFERENCES PARTITIONS (PARTITION_ID) ON DELETE CASCADE
         )]]>
         </tns:sql>

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml
index 763d0f7..94c4680 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml
@@ -19,6 +19,7 @@
 <tns:store xmlns:tns="http://tajo.apache.org/catalogstore" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd ">
   <!--
     Catalog base version history
+    * 7 - 2015-07-30: Add a column and index for partition keys (TAJO-1346)
     * 6 - 2015-07-24: Merge the index branch into the master branch (TAJO-1300)
     * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616)
     * 4 - 2015-03-27: Partition Schema (TAJO-1284)
@@ -26,7 +27,7 @@
     * 2 - 2014-06-09: First versioning
     * 1-  Before 2013-03-20
   -->
-  <tns:base version="6">
+  <tns:base version="7">
     <tns:objects>
       <tns:Object order="0" type="table" name="META">
         <tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql>
@@ -143,7 +144,7 @@
           TID INT NOT NULL,
           PARTITION_NAME VARCHAR(255) BINARY,
           PATH VARCHAR(4096) BINARY,
-          CONSTRAINT CONST_PARTITION_UNIQUE UNIQUE (PARTITION_NAME),
+          UNIQUE INDEX PARTITION_UNIQUE_IDX (TID, PARTITION_NAME),
           FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE
         )]]>
         </tns:sql>
@@ -152,9 +153,10 @@
         <tns:sql><![CDATA[
         CREATE TABLE PARTITION_KEYS (
           PARTITION_ID INT NOT NULL,
+          TID INT NOT NULL,
           COLUMN_NAME VARCHAR(255) BINARY NOT NULL,
-          PARTITION_VALUE VARCHAR(255) BINARY NOT NULL,
-          UNIQUE INDEX PARTITION_KEYS_IDX (PARTITION_ID, COLUMN_NAME, PARTITION_VALUE),
+          PARTITION_VALUE VARCHAR(255) BINARY,
+          INDEX PARTITION_KEYS_IDX (TID, COLUMN_NAME, PARTITION_VALUE),
           FOREIGN KEY (PARTITION_ID) REFERENCES PARTITIONS (PARTITION_ID) ON DELETE CASCADE
         )]]>
         </tns:sql>

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml
index 3d8ef30..fb715ae 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml
@@ -19,6 +19,7 @@
 <tns:store xmlns:tns="http://tajo.apache.org/catalogstore" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd ">
   <!--
       Catalog base version history
+      * 7 - 2015-07-30: Add a column and index for partition keys (TAJO-1346)
       * 6 - 2015-07-24: Merge the index branch into the master branch (TAJO-1300)
       * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616)
       * 4 - 2015-03-27: Partition Schema (TAJO-1284)
@@ -26,7 +27,7 @@
       * 2 - 2014-06-09: First versioning
       * 1-  Before 2013-03-20
     -->
-  <tns:base version="6">
+  <tns:base version="7">
     <tns:objects>
   		<tns:Object order="0" type="table" name="meta">
   			<tns:sql><![CDATA[
@@ -218,10 +219,14 @@
 				END;]]>
         </tns:sql>
       </tns:Object>
-      <tns:Object order="21" type="table" name="PARTITION_KEYS">
+      <tns:Object order="21" type="index" name="PARTITIONS_UNIQUE_IDX" dependsOn="PARTITIONS">
+        <tns:sql><![CDATA[CREATE UNIQUE INDEX PARTITIONS_UNIQUE_IDX on PARTITIONS (TID, PARTITION_NAME)]]></tns:sql>
+      </tns:Object>
+      <tns:Object order="22" type="table" name="PARTITION_KEYS">
         <tns:sql><![CDATA[
           CREATE TABLE PARTITION_KEYS (
             PARTITION_ID INT NOT NULL,
+            TID INT NOT NULL,
             COLUMN_NAME VARCHAR2(255) NOT NULL,
             PARTITION_VALUE VARCHAR(255) NULL,
             FOREIGN KEY (PARTITION_ID) REFERENCES PARTITIONS (PARTITION_ID) ON DELETE CASCADE
@@ -229,7 +234,7 @@
         </tns:sql>
       </tns:Object>
       <tns:Object order="23" type="index" name="PARTITION_KEYS_IDX" dependsOn="PARTITION_KEYS">
-        <tns:sql><![CDATA[CREATE INDEX PARTITION_KEYS_IDX on PARTITION_KEYS (PARTITION_ID, COLUMN_NAME, PARTITION_VALUE)]]></tns:sql>
+        <tns:sql><![CDATA[CREATE INDEX PARTITION_KEYS_IDX on PARTITION_KEYS (TID, COLUMN_NAME, PARTITION_VALUE)]]></tns:sql>
       </tns:Object>
     </tns:objects>
   </tns:base>

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml
index 0dd72b9..b7b94fc 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml
@@ -21,6 +21,7 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd ">
   <!--
       Catalog base version history
+      * 7 - 2015-07-30: Add a column and index for partition keys (TAJO-1346)
       * 6 - 2015-07-24: Merge the index branch into the master branch (TAJO-1300)
       * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616)
       * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616)
@@ -29,7 +30,7 @@ xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.
       * 2 - 2014-06-09: First versioning
       * 1-  Before 2013-03-20
     -->
-	<tns:base version="6">
+	<tns:base version="7">
 		<tns:objects>
 			<tns:Object name="META" type="table" order="0">
 				<tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql>
@@ -161,23 +162,26 @@ xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.
   				PARTITION_NAME VARCHAR(128),
   				PARTITION_VALUE VARCHAR(1024),
   				PATH VARCHAR(4096),
-  				FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE,
-  				CONSTRAINT C_PARTITION_UNIQUE UNIQUE (PARTITION_NAME)
+  				FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE
 				)]]>
 				</tns:sql>
 			</tns:Object>
-      <tns:Object name="PARTITION_KEYS" type="table" order="16">
+      <tns:Object name="PARTITIONS_UNIQUE_IDX" type="index" order="16" dependsOn="PARTITIONS">
+        <tns:sql><![CDATA[CREATE UNIQUE INDEX PARTITIONS_UNIQUE_IDX on PARTITIONS (TID, PARTITION_NAME)]]></tns:sql>
+      </tns:Object>
+      <tns:Object name="PARTITION_KEYS" type="table" order="17">
         <tns:sql><![CDATA[
           CREATE TABLE PARTITION_KEYS (
             PARTITION_ID INT NOT NULL,
+            TID INT NOT NULL,
             COLUMN_NAME VARCHAR(255) NOT NULL,
             PARTITION_VALUE VARCHAR(255) NULL,
             FOREIGN KEY (PARTITION_ID) REFERENCES PARTITIONS (PARTITION_ID) ON DELETE CASCADE
 				)]]>
         </tns:sql>
       </tns:Object>
-      <tns:Object name="PARTITION_KEYS_IDX" type="index" order="17" dependsOn="PARTITION_KEYS">
-        <tns:sql><![CDATA[CREATE INDEX PARTITION_KEYS_IDX on PARTITION_KEYS (PARTITION_ID, COLUMN_NAME, PARTITION_VALUE)]]></tns:sql>
+      <tns:Object name="PARTITION_KEYS_IDX" type="index" order="18" dependsOn="PARTITION_KEYS">
+        <tns:sql><![CDATA[CREATE INDEX PARTITION_KEYS_IDX on PARTITION_KEYS (TID, COLUMN_NAME, PARTITION_VALUE)]]></tns:sql>
       </tns:Object>
 		</tns:objects>
 	</tns:base>

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
index 1206bfa..caa85e8 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Sets;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.dictionary.InfoSchemaMetadataDictionary;
-import org.apache.tajo.catalog.exception.CatalogException;
 import org.apache.tajo.catalog.exception.UndefinedFunctionException;
 import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
@@ -952,11 +951,11 @@ public class TestCatalog {
     List<PartitionKeyProto> partitionKeyList = new ArrayList<PartitionKeyProto>();
     for(int i = 0; i < partitionNames.length; i++) {
       String columnName = partitionNames[i].split("=")[0];
+      String partitionValue = partitionNames[i].split("=")[1];
 
       PartitionKeyProto.Builder builder = PartitionKeyProto.newBuilder();
-      builder.setColumnName(partitionNames[i]);
+      builder.setColumnName(partitionValue);
       builder.setPartitionValue(columnName);
-
       partitionKeyList.add(builder.build());
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 4b4de62..1f7f2fa 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -290,6 +290,10 @@ public class TajoConf extends Configuration {
     PYTHON_CODE_DIR("tajo.function.python.code-dir", ""),
     PYTHON_CONTROLLER_LOG_DIR("tajo.function.python.controller.log-dir", ""),
 
+    // Partition
+    PARTITION_DYNAMIC_BULK_INSERT_BATCH_SIZE("tajo.partition.dynamic.bulk-insert.batch-size", 1000),
+
+
     /////////////////////////////////////////////////////////////////////////////////
     // User Session Configuration
     //

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java
index 3f05f44..d50164d 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java
@@ -63,6 +63,7 @@ public class ErrorMessages {
     ADD_MESSAGE(UNDEFINED_COLUMN, "column '%s' does not exist", 1);
     ADD_MESSAGE(UNDEFINED_FUNCTION, "function does not exist: %s", 1);
     ADD_MESSAGE(UNDEFINED_PARTITION, "partition '%s' does not exist", 1);
+    ADD_MESSAGE(UNDEFINED_PARTITION_KEY, "'%s' column is not the partition key", 1);
     ADD_MESSAGE(UNDEFINED_OPERATOR, "operator does not exist: '%s'", 1);
     ADD_MESSAGE(UNDEFINED_INDEX_FOR_TABLE, "index ''%s' does not exist", 1);
     ADD_MESSAGE(UNDEFINED_INDEX_FOR_COLUMNS, "index does not exist for '%s' columns of '%s' table", 2);
@@ -94,6 +95,10 @@ public class ErrorMessages {
     ADD_MESSAGE(MDC_NO_MATCHED_DATATYPE, "no matched type for %s", 1);
 
     ADD_MESSAGE(UNKNOWN_DATAFORMAT, "Unknown data format: '%s'", 1);
+
+    ADD_MESSAGE(AMBIGUOUS_PARTITION_DIRECTORY, "There is a directory which is assumed to be a partitioned directory" +
+      " : '%s'", 1);
+
   }
 
   private static void ADD_MESSAGE(ResultCode code, String msgFormat) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-common/src/main/proto/errors.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/errors.proto b/tajo-common/src/main/proto/errors.proto
index 5e2ecc0..0bc0069 100644
--- a/tajo-common/src/main/proto/errors.proto
+++ b/tajo-common/src/main/proto/errors.proto
@@ -109,6 +109,7 @@ enum ResultCode {
   UNDEFINED_PARTITION                   = 520; // ?
   UNDEFINED_PARTITION_METHOD            = 521; // ?
   UNDEFINED_OPERATOR                    = 522; // SQLState: 42883 (=UNDEFINED_FUNCTION)
+  UNDEFINED_PARTITION_KEY               = 523; // ?
 
   DUPLICATE_TABLESPACE                  = 531;
   DUPLICATE_DATABASE                    = 532; // SQLState: 42P04
@@ -123,6 +124,7 @@ enum ResultCode {
   AMBIGUOUS_TABLE                       = 541; // ?
   AMBIGUOUS_COLUMN                      = 542; // SQLState: 42702;
   AMBIGUOUS_FUNCTION                    = 543; // SQLState: 42725;
+  AMBIGUOUS_PARTITION_DIRECTORY         = 544; // ?
 
   CANNOT_CAST                           = 601; // SQLState: 42846 - Cast from source type to target type is not supported.
   GROUPING_ERROR                        = 602; // SQLState: 42803

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
index b07fb8f..098994d 100644
--- a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
+++ b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
@@ -1611,8 +1611,8 @@ alter_table_statement
   : ALTER TABLE table_name RENAME TO table_name
   | ALTER TABLE table_name RENAME COLUMN column_name TO column_name
   | ALTER TABLE table_name ADD COLUMN field_element
-  | ALTER TABLE table_name (if_not_exists)? ADD PARTITION LEFT_PAREN partition_column_value_list RIGHT_PAREN (LOCATION path=Character_String_Literal)?
-  | ALTER TABLE table_name (if_exists)? DROP PARTITION LEFT_PAREN partition_column_value_list RIGHT_PAREN (PURGE)?
+  | ALTER TABLE table_name ADD (if_not_exists)? PARTITION LEFT_PAREN partition_column_value_list RIGHT_PAREN (LOCATION path=Character_String_Literal)?
+  | ALTER TABLE table_name DROP (if_exists)? PARTITION LEFT_PAREN partition_column_value_list RIGHT_PAREN (PURGE)?
   | ALTER TABLE table_name SET PROPERTY property_list
   ;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index c50d5be..ffe5d2e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -1886,6 +1886,8 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
         alterTable.setLocation(path);
       }
       alterTable.setPurge(checkIfExist(ctx.PURGE()));
+      alterTable.setIfNotExists(checkIfExist(ctx.if_not_exists()));
+      alterTable.setIfExists(checkIfExist(ctx.if_exists()));
     }
 
     if (checkIfExist(ctx.property_list())) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
index 33714db..a8a1c78 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
@@ -29,6 +29,8 @@ import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.PartitionKeyProto;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.plan.logical.CreateTableNode;
 import org.apache.tajo.plan.logical.InsertNode;
@@ -135,7 +137,9 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec {
       LOG.info("Path " + lastFileName.getParent() + " already exists!");
     } else {
       fs.mkdirs(lastFileName.getParent());
-      LOG.info("Add subpartition path directory :" + lastFileName.getParent());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Add subpartition path directory :" + lastFileName.getParent());
+      }
     }
 
     if (fs.exists(lastFileName)) {
@@ -146,9 +150,47 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec {
 
     openAppender(0);
 
+    addPartition(partition);
+
     return appender;
   }
 
+  /**
+   * Add partition information to TableStats for storing to CatalogStore.
+   *
+   * @param partition partition name
+   * @throws IOException
+   */
+  private void addPartition(String partition) throws IOException {
+    PartitionDescProto.Builder builder = PartitionDescProto.newBuilder();
+    builder.setPartitionName(partition);
+
+    String[] partitionKeyPairs = partition.split("/");
+
+    for(int i = 0; i < partitionKeyPairs.length; i++) {
+      String partitionKeyPair = partitionKeyPairs[i];
+      String[] split = partitionKeyPair.split("=");
+
+      PartitionKeyProto.Builder keyBuilder = PartitionKeyProto.newBuilder();
+      keyBuilder.setColumnName(split[0]);
+      keyBuilder.setPartitionValue(split[1]);
+
+      builder.addPartitionKeys(keyBuilder.build());
+    }
+
+    if (this.plan.getUri() == null) {
+      // In CTAS, the uri would be null. So,
+      String[] split = CatalogUtil.splitTableName(plan.getTableName());
+      int endIndex = storeTablePath.toString().indexOf(split[1]) + split[1].length();
+      String outputPath = storeTablePath.toString().substring(0, endIndex);
+      builder.setPath(outputPath + "/" + partition);
+    } else {
+      builder.setPath(this.plan.getUri().toString() + "/" + partition);
+    }
+
+    context.addPartition(builder.build());
+  }
+
   public void openAppender(int suffixId) throws IOException {
     Path actualFilePath = lastFileName;
     if (suffixId > 0) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
index a535f94..048bab2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
@@ -562,61 +562,70 @@ public class DDLExecutor {
       boolean duplicatedPartition = true;
       try {
         catalog.getPartition(databaseName, simpleTableName, pair.getSecond());
-      } catch (UndefinedPartitionException npe) {
+      } catch (UndefinedPartitionException e) {
         duplicatedPartition = false;
       }
-      if (duplicatedPartition) {
-        throw new DuplicatePartitionException(pair.getSecond());
-      }
 
-      if (alterTable.getLocation() != null) {
-        partitionPath = new Path(alterTable.getLocation());
-      } else {
-        // If location is not specified, the partition's location will be set using the table location.
-        partitionPath = new Path(desc.getUri().toString(), pair.getSecond());
-        alterTable.setLocation(partitionPath.toString());
-      }
+      if (duplicatedPartition && !alterTable.isIfNotExists()) {
+        throw new DuplicatePartitionException(pair.getSecond());
+      } else if (!duplicatedPartition) {
+        if (alterTable.getLocation() != null) {
+          partitionPath = new Path(alterTable.getLocation());
+        } else {
+          // If location is not specified, the partition's location will be set using the table location.
+          partitionPath = new Path(desc.getUri().toString(), pair.getSecond());
+          alterTable.setLocation(partitionPath.toString());
+        }
 
-      FileSystem fs = partitionPath.getFileSystem(context.getConf());
+        FileSystem fs = partitionPath.getFileSystem(context.getConf());
 
-      // If there is a directory which was assumed to be a partitioned directory and users don't input another
-      // location, this will throw exception.
-      Path assumedDirectory = new Path(desc.getUri().toString(), pair.getSecond());
+        // If there is a directory which was assumed to be a partitioned directory and users don't input another
+        // location, this will throw exception.
+        Path assumedDirectory = new Path(desc.getUri().toString(), pair.getSecond());
 
-      if (fs.exists(assumedDirectory) && !assumedDirectory.equals(partitionPath)) {
-        throw new AlreadyExistsAssumedPartitionDirectoryException(assumedDirectory.toString());
-      }
+        if (fs.exists(assumedDirectory) && !assumedDirectory.equals(partitionPath)) {
+          throw new AmbiguousPartitionDirectoryExistException(assumedDirectory.toString());
+        }
 
-      catalog.alterTable(CatalogUtil.addOrDropPartition(qualifiedName, alterTable.getPartitionColumns(),
-        alterTable.getPartitionValues(), alterTable.getLocation(), AlterTableType.ADD_PARTITION));
+        catalog.alterTable(CatalogUtil.addOrDropPartition(qualifiedName, alterTable.getPartitionColumns(),
+          alterTable.getPartitionValues(), alterTable.getLocation(), AlterTableType.ADD_PARTITION));
 
-      // If the partition's path doesn't exist, this would make the directory by force.
-      if (!fs.exists(partitionPath)) {
-        fs.mkdirs(partitionPath);
+        // If the partition's path doesn't exist, this would make the directory by force.
+        if (!fs.exists(partitionPath)) {
+          fs.mkdirs(partitionPath);
+        }
       }
+
       break;
     case DROP_PARTITION:
       ensureColumnPartitionKeys(qualifiedName, alterTable.getPartitionColumns());
       pair = CatalogUtil.getPartitionKeyNamePair(alterTable.getPartitionColumns(), alterTable.getPartitionValues());
-      partitionDescProto = catalog.getPartition(databaseName, simpleTableName, pair.getSecond());
 
-      if (partitionDescProto == null) {
-        throw new NoSuchPartitionException(tableName, pair.getSecond());
+      boolean undefinedPartition = false;
+      try {
+        partitionDescProto = catalog.getPartition(databaseName, simpleTableName, pair.getSecond());
+      } catch (UndefinedPartitionException e) {
+        undefinedPartition = true;
       }
 
-      catalog.alterTable(CatalogUtil.addOrDropPartition(qualifiedName, alterTable.getPartitionColumns(),
-        alterTable.getPartitionValues(), alterTable.getLocation(), AlterTableType.DROP_PARTITION));
+      if (undefinedPartition && !alterTable.isIfExists()) {
+        throw new UndefinedPartitionException(pair.getSecond());
+      } else if (!undefinedPartition) {
+        catalog.alterTable(CatalogUtil.addOrDropPartition(qualifiedName, alterTable.getPartitionColumns(),
+          alterTable.getPartitionValues(), alterTable.getLocation(), AlterTableType.DROP_PARTITION));
 
-      // When dropping partition on an managed table, the data will be delete from file system.
-      if (!desc.isExternal()) {
-        deletePartitionPath(partitionDescProto);
-      } else {
-        // When dropping partition on an external table, the data in the table will NOT be deleted from the file
-        // system. But if PURGE is specified, the partition data will be deleted.
-        if (alterTable.isPurge()) {
+        // When dropping partition on an managed table, the data will be delete from file system.
+        if (!desc.isExternal()) {
           deletePartitionPath(partitionDescProto);
+        } else {
+          // When dropping partition on an external table, the data in the table will NOT be deleted from the file
+          // system. But if PURGE is specified, the partition data will be deleted.
+          if (alterTable.isPurge()) {
+            deletePartitionPath(partitionDescProto);
+          }
         }
       }
+
       break;
     default:
       //TODO
@@ -634,7 +643,7 @@ public class DDLExecutor {
   private boolean ensureColumnPartitionKeys(String tableName, String[] columnNames) {
     for(String columnName : columnNames) {
       if (!ensureColumnPartitionKeys(tableName, columnName)) {
-        throw new NoSuchPartitionKeyException(tableName, columnName);
+        throw new UndefinedPartitionKeyException(columnName);
       }
     }
     return true;

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index e3629c7..f351143 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -34,8 +34,8 @@ import org.apache.tajo.QueryVars;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.exception.CatalogException;
 import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
@@ -331,6 +331,17 @@ public class Query implements EventHandler<QueryEvent> {
     return queryHistory;
   }
 
+  public List<PartitionDescProto> getPartitions() {
+    List<PartitionDescProto> partitions = new ArrayList<PartitionDescProto>();
+    for(Stage eachStage : getStages()) {
+      if (!eachStage.getPartitions().isEmpty()) {
+        partitions.addAll(eachStage.getPartitions());
+      }
+    }
+
+    return partitions;
+  }
+
   public List<String> getDiagnostics() {
     readLock.lock();
     try {
@@ -493,6 +504,35 @@ public class Query implements EventHandler<QueryEvent> {
         QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
         hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir);
 
+        TableDesc desc = query.getResultDesc();
+
+        // If there is partitions
+        List<PartitionDescProto> partitions = query.getPartitions();
+        if (partitions!= null && !partitions.isEmpty()) {
+
+          String databaseName, simpleTableName;
+
+          if (CatalogUtil.isFQTableName(desc.getName())) {
+            String[] split = CatalogUtil.splitFQTableName(desc.getName());
+            databaseName = split[0];
+            simpleTableName = split[1];
+          } else {
+            databaseName = queryContext.getCurrentDatabase();
+            simpleTableName = desc.getName();
+          }
+
+          // Store partitions to CatalogStore using alter table statement.
+          boolean result = catalog.addPartitions(databaseName, simpleTableName, partitions, true);
+          if (result) {
+            LOG.info(String.format("Complete adding for partition %s", partitions.size()));
+          } else {
+            LOG.info(String.format("Incomplete adding for partition %s", partitions.size()));
+          }
+        } else {
+          LOG.info("Can't find partitions for adding.");
+        }
+
+
       } catch (Exception e) {
         query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
         return QueryState.QUERY_ERROR;


Mime
View raw message