tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From blrun...@apache.org
Subject tajo git commit: TAJO-1673: Implement recover partitions.
Date Thu, 24 Sep 2015 05:45:33 GMT
Repository: tajo
Updated Branches:
  refs/heads/branch-0.11.0 a9bf62008 -> fcc0c030c


TAJO-1673: Implement recover partitions.


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/fcc0c030
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/fcc0c030
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/fcc0c030

Branch: refs/heads/branch-0.11.0
Commit: fcc0c030c5fcd3f72d46b225decd75ee668bfd7c
Parents: a9bf620
Author: JaeHwa Jung <blrunner@apache.org>
Authored: Thu Sep 24 14:45:01 2015 +0900
Committer: JaeHwa Jung <blrunner@apache.org>
Committed: Thu Sep 24 14:45:01 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../apache/tajo/algebra/AlterTableOpType.java   |   2 +-
 .../tajo/engine/planner/TestLogicalPlanner.java |  19 +++
 .../tajo/engine/query/TestAlterTable.java       |  95 +++++++++++++
 .../alter_table_drop_partition1.sql             |   2 +-
 .../alter_table_drop_partition2.sql             |   2 +-
 .../create_partitioned_table2.sql               |   2 +
 .../alter_table_repair_partition_1.sql          |   1 +
 .../alter_table_repair_partition_1.result       |   8 ++
 .../apache/tajo/master/exec/DDLExecutor.java    | 138 +++++++++++++++++--
 .../org/apache/tajo/parser/sql/SQLAnalyzer.java |   6 +
 .../main/sphinx/sql_language/alter_table.rst    |  19 ++-
 .../rewrite/rules/PartitionedTableRewriter.java |   4 +-
 .../plan/serder/LogicalNodeDeserializer.java    |   3 +
 .../tajo/plan/serder/LogicalNodeSerializer.java |   4 +
 tajo-plan/src/main/proto/Plan.proto             |   1 +
 .../org/apache/tajo/parser/sql/SQLLexer.g4      |   1 +
 .../org/apache/tajo/parser/sql/SQLParser.g4     |   2 +
 18 files changed, 296 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index f2f1352..8bc7409 100644
--- a/CHANGES
+++ b/CHANGES
@@ -572,6 +572,8 @@ Release 0.11.0 - unreleased
   
   TASKS
 
+    TAJO-1673: Implement recover partitions. (jaehwa)
+
     TAJO-1872: Increase the minimum split size and add a classpath to hadoop 
     tools. (jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTableOpType.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTableOpType.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTableOpType.java
index 679ab4b..89daef0 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTableOpType.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTableOpType.java
@@ -18,5 +18,5 @@
 package org.apache.tajo.algebra;
 
 public enum AlterTableOpType {
-  RENAME_TABLE, RENAME_COLUMN, ADD_COLUMN, ADD_PARTITION, DROP_PARTITION, SET_PROPERTY
+  RENAME_TABLE, RENAME_COLUMN, ADD_COLUMN, ADD_PARTITION, DROP_PARTITION, SET_PROPERTY, REPAIR_PARTITION
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index 38d02aa..6e61657 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -1261,6 +1261,25 @@ public class TestLogicalPlanner {
     return root.getChild();
   }
 
+  @Test
+  public final void testAlterTableRepairPartiton() throws TajoException {
+    QueryContext qc = createQueryContext();
+
+    String sql = "ALTER TABLE table1 REPAIR PARTITION";
+    Expr expr = sqlAnalyzer.parse(sql);
+    LogicalPlan rootNode = planner.createPlan(qc, expr);
+    LogicalNode plan = rootNode.getRootBlock().getRoot();
+    testJsonSerDerObject(plan);
+    assertEquals(NodeType.ROOT, plan.getType());
+    LogicalRootNode root = (LogicalRootNode) plan;
+    assertEquals(NodeType.ALTER_TABLE, root.getChild().getType());
+
+    AlterTableNode msckNode = root.getChild();
+
+    assertEquals(msckNode.getAlterTableOpType(), AlterTableOpType.REPAIR_PARTITION);
+    assertEquals(msckNode.getTableName(), "table1");
+  }
+
   String [] ALTER_PARTITIONS = {
     "ALTER TABLE partitioned_table ADD PARTITION (col1 = 1 , col2 = 2) LOCATION 'hdfs://xxx"
+
       ".com/warehouse/partitioned_table/col1=1/col2=2'", //0

http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
index 8339ea7..d10c0f2 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
@@ -25,6 +25,10 @@ import org.apache.tajo.QueryTestCaseBase;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.exception.UndefinedDatabaseException;
+import org.apache.tajo.exception.UndefinedPartitionException;
+import org.apache.tajo.exception.UndefinedPartitionMethodException;
+import org.apache.tajo.exception.UndefinedTableException;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -110,5 +114,96 @@ public class TestAlterTable extends QueryTestCaseBase {
     assertNotNull(partitions);
     assertEquals(partitions.size(), 0);
     assertFalse(fs.exists(partitionPath));
+
+    catalog.dropTable(tableName);
+  }
+
+  @Test
+  public final void testAlterTableRepairPartition() throws Exception {
+    executeDDL("create_partitioned_table2.sql", null);
+
+    String simpleTableName = "partitioned_table2";
+    String tableName = CatalogUtil.buildFQName(getCurrentDatabase(), simpleTableName);
+    assertTrue(catalog.existsTable(tableName));
+
+    TableDesc tableDesc = catalog.getTableDesc(tableName);
+    assertEquals(tableDesc.getName(), tableName);
+    assertEquals(tableDesc.getPartitionMethod().getPartitionType(), CatalogProtos.PartitionType.COLUMN);
+    assertEquals(tableDesc.getPartitionMethod().getExpressionSchema().getAllColumns().size(),
2);
+    assertEquals(tableDesc.getPartitionMethod().getExpressionSchema().getColumn(0).getSimpleName(),
"col1");
+    assertEquals(tableDesc.getPartitionMethod().getExpressionSchema().getColumn(1).getSimpleName(),
"col2");
+
+    ResultSet res = executeString(
+      "insert overwrite into " + simpleTableName + " select l_quantity, l_returnflag, l_orderkey,
l_partkey " +
+      " from default.lineitem");
+    res.close();
+
+    res = executeString("select * from " + simpleTableName + " order by col1, col2, col3,
col4");
+    String result = resultSetToString(res);
+    String expectedResult = "col3,col4,col1,col2\n" +
+      "-------------------------------\n" +
+      "17.0,N,1,1\n" +
+      "36.0,N,1,1\n" +
+      "38.0,N,2,2\n" +
+      "45.0,R,3,2\n" +
+      "49.0,R,3,3\n";
+
+    res.close();
+    assertEquals(expectedResult, result);
+
+    verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4);
+
+    Path tablePath = new Path(tableDesc.getUri());
+    FileSystem fs = tablePath.getFileSystem(conf);
+    assertTrue(fs.exists(new Path(tableDesc.getUri())));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=1/col2=1")));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=3/col2=3")));
+
+    // Remove all partitions
+    executeString("ALTER TABLE " + simpleTableName + " DROP PARTITION (col1 = 1 , col2 =
1)").close();
+    executeString("ALTER TABLE " + simpleTableName + " DROP PARTITION (col1 = 2 , col2 =
2)").close();
+    executeString("ALTER TABLE " + simpleTableName + " DROP PARTITION (col1 = 3 , col2 =
2)").close();
+    executeString("ALTER TABLE " + simpleTableName + " DROP PARTITION (col1 = 3 , col2 =
3)").close();
+
+    verifyPartitionCount(getCurrentDatabase(), simpleTableName, 0);
+
+    assertTrue(fs.exists(new Path(tableDesc.getUri())));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=1/col2=1")));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=3/col2=3")));
+
+    executeString("ALTER TABLE " + simpleTableName + " REPAIR PARTITION").close();
+    verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4);
+
+    // Remove just one of existing partitions
+    executeString("ALTER TABLE " + simpleTableName + " DROP PARTITION (col1 = 3 , col2 =
3)").close();
+    executeString("ALTER TABLE " + simpleTableName + " REPAIR PARTITION").close();
+    verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4);
+
+    // Remove a partition directory from filesystem
+    fs.delete(new Path(tablePath.toUri() + "/col1=3/col2=3"), true);
+    executeString("ALTER TABLE " + simpleTableName + " REPAIR PARTITION").close();
+    verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4);
+
+    // Add abnormal directories
+    assertTrue(fs.mkdirs(new Path(tablePath.toUri() + "/col10=1/col20=1")));
+    assertTrue(fs.mkdirs(new Path(tablePath.toUri() + "/col1=")));
+    assertTrue(fs.mkdirs(new Path(tablePath.toUri() + "/test")));
+    assertEquals(6, fs.listStatus(new Path(tablePath.toUri())).length);
+
+    executeString("ALTER TABLE " + simpleTableName + " REPAIR PARTITION").close();
+    verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4);
+    catalog.dropTable(tableName);
+  }
+
+  private void verifyPartitionCount(String databaseName, String tableName, int expectedCount)
+    throws UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException,
+    UndefinedPartitionException {
+    List<CatalogProtos.PartitionDescProto> partitions = catalog.getPartitions(databaseName,
tableName);
+    assertNotNull(partitions);
+    assertEquals(partitions.size(), expectedCount);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition1.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition1.sql
b/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition1.sql
index b5d672f..cc4d6dd 100644
--- a/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition1.sql
+++ b/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition1.sql
@@ -1 +1 @@
-ALTER TABLE partitioned_table DROP PARTITION (col3 = 1 , col4 = 2)
\ No newline at end of file
+ALTER TABLE partitioned_table DROP PARTITION (col3 = 1 , col4 = 2) PURGE
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql
b/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql
index 0d4c932..452164b 100644
--- a/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql
+++ b/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql
@@ -1 +1 @@
-ALTER TABLE partitioned_table DROP IF EXISTS PARTITION (col3 = 1 , col4 = 2)
\ No newline at end of file
+ALTER TABLE partitioned_table DROP IF EXISTS PARTITION (col3 = 1 , col4 = 2) PURGE
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-core-tests/src/test/resources/queries/TestAlterTable/create_partitioned_table2.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestAlterTable/create_partitioned_table2.sql
b/tajo-core-tests/src/test/resources/queries/TestAlterTable/create_partitioned_table2.sql
new file mode 100644
index 0000000..0fc8094
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/queries/TestAlterTable/create_partitioned_table2.sql
@@ -0,0 +1,2 @@
+create table partitioned_table2 (col3 float8, col4 text) USING text  WITH ('text.delimiter'='|')
+PARTITION by column(col1 int4, col2 int4)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-core-tests/src/test/resources/queries/TestSQLAnalyzer/alter_table_repair_partition_1.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestSQLAnalyzer/alter_table_repair_partition_1.sql
b/tajo-core-tests/src/test/resources/queries/TestSQLAnalyzer/alter_table_repair_partition_1.sql
new file mode 100644
index 0000000..b65b0e6
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/queries/TestSQLAnalyzer/alter_table_repair_partition_1.sql
@@ -0,0 +1 @@
+ALTER TABLE table1 REPAIR PARTITION
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-core-tests/src/test/resources/results/TestSQLAnalyzer/alter_table_repair_partition_1.result
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/results/TestSQLAnalyzer/alter_table_repair_partition_1.result
b/tajo-core-tests/src/test/resources/results/TestSQLAnalyzer/alter_table_repair_partition_1.result
new file mode 100644
index 0000000..daca3b3
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/results/TestSQLAnalyzer/alter_table_repair_partition_1.result
@@ -0,0 +1,8 @@
+{
+  "OldTableName": "table1",
+  "AlterTableType": "REPAIR_PARTITION",
+  "IsPurge": false,
+  "IfNotExists": false,
+  "IfExists": false,
+  "OpType": "AlterTable"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
index 15abf9e..a67a625 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
@@ -23,25 +23,31 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.tajo.algebra.AlterTableOpType;
 import org.apache.tajo.algebra.AlterTablespaceSetType;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.PartitionKeyProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.*;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.FileTablespace;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.storage.Tablespace;
 import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.StringUtils;
+import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -529,24 +535,138 @@ public class DDLExecutor {
         catalog.alterTable(CatalogUtil.addOrDropPartition(qualifiedName, alterTable.getPartitionColumns(),
             alterTable.getPartitionValues(), alterTable.getLocation(), AlterTableType.DROP_PARTITION));
 
-        // When dropping partition on an managed table, the data will be delete from file
system.
-        if (!desc.isExternal()) {
+        // When dropping a partition on a table, its data will NOT be deleted if the 'PURGE'
option is not specified.
+        if (alterTable.isPurge()) {
           deletePartitionPath(partitionDescProto);
-        } else {
-          // When dropping partition on an external table, the data in the table will NOT
be deleted from the file
-          // system. But if PURGE is specified, the partition data will be deleted.
-          if (alterTable.isPurge()) {
-            deletePartitionPath(partitionDescProto);
-          }
         }
       }
-
+      break;
+    case REPAIR_PARTITION:
+      repairPartition(context, queryContext, alterTable);
       break;
     default:
       throw new InternalError("alterTable cannot handle such query: \n" + alterTable.toJson());
     }
   }
 
+  /**
+   * Run ALTER TABLE table_name REPAIR TABLE  statement.
+   * This will recovery all partitions which exists on table directory.
+   *
+   *
+   * @param context
+   * @param queryContext
+   * @param alterTable
+   * @throws IOException
+   */
+  public void repairPartition(TajoMaster.MasterContext context, final QueryContext queryContext,
+                         final AlterTableNode alterTable) throws IOException, TajoException
{
+    final CatalogService catalog = context.getCatalog();
+    final String tableName = alterTable.getTableName();
+
+    String databaseName;
+    String simpleTableName;
+    if (CatalogUtil.isFQTableName(tableName)) {
+      String[] split = CatalogUtil.splitFQTableName(tableName);
+      databaseName = split[0];
+      simpleTableName = split[1];
+    } else {
+      databaseName = queryContext.getCurrentDatabase();
+      simpleTableName = tableName;
+    }
+
+    if (!catalog.existsTable(databaseName, simpleTableName)) {
+      throw new UndefinedTableException(alterTable.getTableName());
+    }
+
+    TableDesc tableDesc = catalog.getTableDesc(databaseName, simpleTableName);
+
+    if(tableDesc.getPartitionMethod() == null) {
+      throw new UndefinedPartitionMethodException(simpleTableName);
+    }
+
+    Path tablePath = new Path(tableDesc.getUri());
+    FileSystem fs = tablePath.getFileSystem(context.getConf());
+
+    PartitionMethodDesc partitionDesc = tableDesc.getPartitionMethod();
+    Schema partitionColumns = partitionDesc.getExpressionSchema();
+
+    // Get the array of path filter, accepting all partition paths.
+    PathFilter[] filters = PartitionedTableRewriter.buildAllAcceptingPathFilters(partitionColumns);
+
+    // loop from one to the number of partition columns
+    Path [] filteredPaths = PartitionedTableRewriter.toPathArray(fs.listStatus(tablePath,
filters[0]));
+
+    // Get all file status matched to a ith level path filter.
+    for (int i = 1; i < partitionColumns.size(); i++) {
+      filteredPaths = PartitionedTableRewriter.toPathArray(fs.listStatus(filteredPaths, filters[i]));
+    }
+
+    // Find missing partitions from filesystem
+    List<PartitionDescProto> existingPartitions = catalog.getPartitions(databaseName,
simpleTableName);
+    List<String> existingPartitionNames = TUtil.newList();
+    Path existingPartitionPath = null;
+
+    for(PartitionDescProto existingPartition : existingPartitions) {
+      existingPartitionPath = new Path(existingPartition.getPath());
+      existingPartitionNames.add(existingPartition.getPartitionName());
+      if (!fs.exists(existingPartitionPath) && LOG.isDebugEnabled()) {
+        LOG.debug("Partitions missing from Filesystem:" + existingPartition.getPartitionName());
+      }
+    }
+
+    // Find missing partitions from CatalogStore
+    List<PartitionDescProto> targetPartitions = TUtil.newList();
+    for(Path filteredPath : filteredPaths) {
+      PartitionDescProto targetPartition = getPartitionDesc(simpleTableName, filteredPath);
+      if (!existingPartitionNames.contains(targetPartition.getPartitionName())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Partitions not in CatalogStore:" + targetPartition.getPartitionName());
+        }
+        targetPartitions.add(targetPartition);
+      }
+    }
+
+    catalog.addPartitions(databaseName, simpleTableName, targetPartitions, true);
+
+    if (LOG.isDebugEnabled()) {
+      for(PartitionDescProto targetPartition: targetPartitions) {
+        LOG.debug("Repair: Added partition to CatalogStore " + tableName + ":" + targetPartition.getPartitionName());
+      }
+    }
+
+    LOG.info("Total added partitions to CatalogStore: " + targetPartitions.size());
+  }
+
+  private PartitionDescProto getPartitionDesc(String tableName, Path path) throws IOException
{
+    String partitionPath = path.toString();
+
+    String partitionName = StringUtils.unescapePathName(partitionPath);
+    int startIndex = partitionPath.indexOf(tableName);
+    partitionName = partitionName.substring(startIndex + tableName.length() + 1, partitionPath.length());
+
+    CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder();
+    builder.setPartitionName(partitionName);
+
+    String[] partitionKeyPairs = partitionName.split("/");
+
+    for(int i = 0; i < partitionKeyPairs.length; i++) {
+      String partitionKeyPair = partitionKeyPairs[i];
+      String[] split = partitionKeyPair.split("=");
+
+      PartitionKeyProto.Builder keyBuilder = PartitionKeyProto.newBuilder();
+      keyBuilder.setColumnName(split[0]);
+      keyBuilder.setPartitionValue(split[1]);
+
+      builder.addPartitionKeys(keyBuilder.build());
+    }
+
+    builder.setPath(partitionPath);
+
+    return builder.build();
+  }
+
+
   private void deletePartitionPath(CatalogProtos.PartitionDescProto partitionDescProto) throws
IOException {
     Path partitionPath = new Path(partitionDescProto.getPath());
     FileSystem fs = partitionPath.getFileSystem(context.getConf());

http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java
index 6190cdc..d5eafb3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java
@@ -1947,6 +1947,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
     final int PARTITION_MASK = 00000020;
     final int SET_MASK = 00000002;
     final int PROPERTY_MASK = 00010000;
+    final int REPAIR_MASK = 00000003;
 
     int val = 00000000;
 
@@ -1978,6 +1979,9 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
           case PROPERTY:
             val = val | PROPERTY_MASK;
             break;
+          case REPAIR:
+            val = val | REPAIR_MASK;
+            break;
           default:
             break;
         }
@@ -1989,6 +1993,8 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
   private AlterTableOpType evaluateAlterTableOperationTye(final int value) {
 
     switch (value) {
+      case 19:
+        return AlterTableOpType.REPAIR_PARTITION;
       case 65:
         return AlterTableOpType.RENAME_TABLE;
       case 73:

http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-docs/src/main/sphinx/sql_language/alter_table.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/sql_language/alter_table.rst b/tajo-docs/src/main/sphinx/sql_language/alter_table.rst
index ffc34d1..959ebcc 100644
--- a/tajo-docs/src/main/sphinx/sql_language/alter_table.rst
+++ b/tajo-docs/src/main/sphinx/sql_language/alter_table.rst
@@ -96,4 +96,21 @@ You can use ``ALTER TABLE ADD PARTITION`` to add partitions to a table.
The loca
   ALTER TABLE table1 DROP PARTITION (col1 = '2015' , col2 = '01', col3 = '11' )
   ALTER TABLE table1 DROP PARTITION (col1 = 'TAJO' ) PURGE
 
-You can use ``ALTER TABLE DROP PARTITION`` to drop a partition for a table. This removes
the data for a managed table and this doesn't remove the data for an external table. But if
``PURGE`` is specified for an external table, the partition data will be removed. The metadata
is completely lost in all cases. An error is thrown if the partition for the table doesn't
exists. You can use ``IF EXISTS`` to skip the error.
+You can use ``ALTER TABLE DROP PARTITION`` to drop a partition for a table. This doesn't
remove the data for a table. But if ``PURGE`` is specified, the partition data will be removed.
The metadata is completely lost in all cases. An error is thrown if the partition for the
table doesn't exist. You can use ``IF EXISTS`` to skip the error.
+
+========================
+REPAIR PARTITION
+========================
+
+Tajo stores a list of partitions for each table in its catalog. If partitions are manually
added to the distributed file system, the metastore is not aware of these partitions. Running
the ``ALTER TABLE REPAIR PARTITION`` statement ensures that the tables are properly populated.
+
+*Synopsis*
+
+.. code-block:: sql
+
+  ALTER TABLE <table_name> REPAIR PARTITION
+
+.. note::
+
+  Even though an information of a partition is stored in the catalog, Tajo does not recover
it when its partition directory doesn't exist in the file system.
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
index b5cd42b..5123fc4 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
@@ -199,7 +199,7 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule
{
    * @param partitionColumns The partition columns schema
    * @return The array of path filter, accpeting all partition paths.
    */
-  private static PathFilter [] buildAllAcceptingPathFilters(Schema partitionColumns) {
+  public static PathFilter [] buildAllAcceptingPathFilters(Schema partitionColumns) {
     Column target;
     PathFilter [] filters = new PathFilter[partitionColumns.size()];
     List<EvalNode> accumulatedFilters = Lists.newArrayList();
@@ -214,7 +214,7 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule
{
     return filters;
   }
 
-  private static Path [] toPathArray(FileStatus[] fileStatuses) {
+  public static Path [] toPathArray(FileStatus[] fileStatuses) {
     Path [] paths = new Path[fileStatuses.length];
     for (int j = 0; j < fileStatuses.length; j++) {
       paths[j] = fileStatuses[j].getPath();

http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
index 608fa4c..c75c3fd 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
@@ -650,6 +650,9 @@ public class LogicalNodeDeserializer {
       alterTable.setPurge(alterPartition.getPurge());
       alterTable.setIfExists(alterPartition.getIfExists());
       break;
+    case REPAIR_PARTITION:
+      alterTable.setTableName(alterTableProto.getTableName());
+      break;
     default:
       throw new TajoRuntimeException(
           new NotImplementedException("Unknown SET type in ALTER TABLE: " + alterTableProto.getSetType().name()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
index a0f1fcc..3cf7d9e 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
@@ -633,6 +633,10 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe
       partitionBuilder.setPurge(node.isPurge());
       alterTableBuilder.setAlterPartition(partitionBuilder);
       break;
+    case REPAIR_PARTITION:
+      alterTableBuilder.setSetType(PlanProto.AlterTableNode.Type.REPAIR_PARTITION);
+      alterTableBuilder.setTableName(node.getTableName());
+      break;
     default:
       throw new TajoRuntimeException(
           new NotImplementedException("Unknown SET type in ALTER TABLE: " + node.getAlterTableOpType().name()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-plan/src/main/proto/Plan.proto
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto
index 8a8ecb1..fa1deeb 100644
--- a/tajo-plan/src/main/proto/Plan.proto
+++ b/tajo-plan/src/main/proto/Plan.proto
@@ -302,6 +302,7 @@ message AlterTableNode {
     SET_PROPERTY = 3;
     ADD_PARTITION = 4;
     DROP_PARTITION = 5;
+    REPAIR_PARTITION = 6;    
   }
 
   message RenameTable {

http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLLexer.g4
----------------------------------------------------------------------
diff --git a/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLLexer.g4 b/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLLexer.g4
index 896f627..ee61320 100644
--- a/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLLexer.g4
+++ b/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLLexer.g4
@@ -290,6 +290,7 @@ RANK : R A N K;
 RECORD : R E C O R D;
 REGEXP : R E G E X P;
 RENAME : R E N A M E;
+REPAIR : R E P A I R;
 RESET : R E S E T;
 RLIKE : R L I K E;
 ROLLUP : R O L L U P;

http://git-wip-us.apache.org/repos/asf/tajo/blob/fcc0c030/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLParser.g4
----------------------------------------------------------------------
diff --git a/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLParser.g4 b/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLParser.g4
index c125352..e2693ea 100644
--- a/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLParser.g4
+++ b/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLParser.g4
@@ -308,6 +308,7 @@ nonreserved_keywords
   | RECORD
   | REGEXP
   | RENAME
+  | REPAIR
   | RESET
   | RLIKE
   | ROLLUP
@@ -1624,6 +1625,7 @@ alter_table_statement
   | ALTER TABLE table_name ADD (if_not_exists)? PARTITION LEFT_PAREN partition_column_value_list
RIGHT_PAREN (LOCATION path=Character_String_Literal)?
   | ALTER TABLE table_name DROP (if_exists)? PARTITION LEFT_PAREN partition_column_value_list
RIGHT_PAREN (PURGE)?
   | ALTER TABLE table_name SET PROPERTY property_list
+  | ALTER TABLE table_name REPAIR PARTITION
   ;
 
 partition_column_value_list


Mime
View raw message