tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject tajo git commit: TAJO-1901: Repair partition throws ArrayIndexOutOfBoundsException occasionally.
Date Sun, 04 Oct 2015 22:36:05 GMT
Repository: tajo
Updated Branches:
  refs/heads/branch-0.11.0 b8e947313 -> 0476ab844


TAJO-1901: Repair partition throws ArrayIndexOutOfBoundsException occasionally.

Signed-off-by: Hyunsik Choi <hyunsik@apache.org>

Closes #796


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/0476ab84
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/0476ab84
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/0476ab84

Branch: refs/heads/branch-0.11.0
Commit: 0476ab844b71a05e294c21d58841b6c88ad38d72
Parents: b8e9473
Author: JaeHwa Jung <blrunner@apache.org>
Authored: Sun Oct 4 15:31:38 2015 -0700
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Sun Oct 4 15:35:50 2015 -0700

----------------------------------------------------------------------
 CHANGES                                         |   5 +-
 .../tajo/engine/query/TestAlterTable.java       | 343 ++++++++++++++++++-
 .../apache/tajo/master/exec/DDLExecutor.java    |  32 +-
 .../rewrite/rules/PartitionedTableRewriter.java |   2 +-
 4 files changed, 364 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/0476ab84/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index f37969b..05e98c6 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,7 +1,5 @@
 Tajo Change Log 
 
-Release 0.12.0 - unreleased
-
 Release 0.11.0 - unreleased
 
   NEW FEATURES
@@ -287,6 +285,9 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1901: Repair partition throws ArrayIndexOutOfBoundsException 
+    occasionally. (Contributed by jaehwa, committed by hyunsik)
+
     TAJO-1902: Add line delimiter for repair partition in TajoDump. (jaehwa)
 
     TAJO-1889: UndefinedColumnException when a query with table subquery is 

http://git-wip-us.apache.org/repos/asf/tajo/blob/0476ab84/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
index 9a30012..58ceb74 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
@@ -23,22 +23,35 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.IntegrationTest;
 import org.apache.tajo.QueryTestCaseBase;
 import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.exception.UndefinedDatabaseException;
-import org.apache.tajo.exception.UndefinedPartitionException;
-import org.apache.tajo.exception.UndefinedPartitionMethodException;
-import org.apache.tajo.exception.UndefinedTableException;
+import org.apache.tajo.exception.*;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.File;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.List;
 
 import static org.junit.Assert.*;
 
 @Category(IntegrationTest.class)
 public class TestAlterTable extends QueryTestCaseBase {
+
+  @Before
+  public void setUp() throws Exception {
+    executeString("create database " + getCurrentDatabase()).close();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    executeString("drop database " + getCurrentDatabase()).close();
+  }
+
   @Test
   public final void testAlterTableName() throws Exception {
     List<String> createdNames = executeDDL("table1_ddl.sql", "table1.tbl", "ABC");
@@ -58,7 +71,7 @@ public class TestAlterTable extends QueryTestCaseBase {
   public final void testAlterTableAddNewColumn() throws Exception {
     List<String> createdNames = executeDDL("table1_ddl.sql", "table1.tbl", "EFG");
     executeDDL("alter_table_add_new_column_ddl.sql", null);
-    assertColumnExists(createdNames.get(0),"cool");
+    assertColumnExists(createdNames.get(0), "cool");
   }
 
   @Test
@@ -199,6 +212,290 @@ public class TestAlterTable extends QueryTestCaseBase {
     catalog.dropTable(tableName);
   }
 
+  @Test
+  public final void testRepairPartitionWithDatabaseNameIncludeTableName() throws Exception
{
+    String databaseName = "test_repair_partition";
+    String tableName = "part";
+    String canonicalTableName = CatalogUtil.getCanonicalTableName(databaseName, tableName);
+
+    executeString("create database " + databaseName).close();
+    executeString("create table " + canonicalTableName + "(col1 int4, col2 int4) partition
by column(key float8) "
+      + " as select l_orderkey, l_partkey, l_quantity from default.lineitem").close();
+
+    TableDesc tableDesc = catalog.getTableDesc(databaseName, tableName);
+    assertNotNull(tableDesc);
+
+    verifyPartitionCount(databaseName, tableName, 5);
+
+    ResultSet res = executeString("SELECT * FROM " + canonicalTableName + " ORDER BY col1,
col2 desc, key desc;");
+    String result = resultSetToString(res);
+    String expectedResult = "col1,col2,key\n" +
+      "-------------------------------\n" +
+      "1,1,36.0\n" +
+      "1,1,17.0\n" +
+      "2,2,38.0\n" +
+      "3,3,49.0\n" +
+      "3,2,45.0\n";
+    res.close();
+    assertEquals(expectedResult, result);
+
+    // Remove all partitions
+    dropPartitions(databaseName, tableName, tableDesc.getPartitionMethod().getExpressionSchema().getAllColumns());
+
+    verifyPartitionCount(databaseName, tableName, 0);
+
+    executeString("ALTER TABLE " + canonicalTableName + " REPAIR PARTITION").close();
+
+    verifyPartitionCount(databaseName, tableName, 5);
+
+    res = executeString("SELECT * FROM " + canonicalTableName + " ORDER BY col1, col2 desc,
key desc;");
+    result = resultSetToString(res);
+    res.close();
+    assertEquals(expectedResult, result);
+
+    executeString("DROP TABLE " + canonicalTableName + " PURGE").close();
+    executeString("DROP database " + databaseName).close();
+  }
+
+  @Test
+  public void testRepairPartitionWithAbnormalDirectories()  throws Exception {
+    String databaseName = getCurrentDatabase().toLowerCase();
+    String tableName = "testRepairPartitionWithAbnormalDirectories".toLowerCase();
+    String canonicalTableName = CatalogUtil.getCanonicalTableName(databaseName, tableName);
+
+    executeString("create table " + canonicalTableName + "(col1 int4, col2 int4) partition
by column(key float8) "
+      + " as select l_orderkey, l_partkey, l_quantity from default.lineitem").close();
+
+    TableDesc tableDesc = catalog.getTableDesc(databaseName, tableName);
+    assertNotNull(tableDesc);
+
+    verifyPartitionCount(databaseName, tableName, 5);
+
+    ResultSet res = executeString("SELECT * FROM " + canonicalTableName + " ORDER BY col1,
col2 desc, key desc;");
+    String result = resultSetToString(res);
+    String expectedResult = "col1,col2,key\n" +
+      "-------------------------------\n" +
+      "1,1,36.0\n" +
+      "1,1,17.0\n" +
+      "2,2,38.0\n" +
+      "3,3,49.0\n" +
+      "3,2,45.0\n";
+    res.close();
+    assertEquals(expectedResult, result);
+
+    // Remove all partitions
+    dropPartitions(databaseName, tableName, tableDesc.getPartitionMethod().getExpressionSchema().getAllColumns());
+
+    verifyPartitionCount(databaseName, tableName, 0);
+
+    // Make abnormal directories
+    FileSystem fs = FileSystem.get(conf);
+    Path path = new Path(tableDesc.getUri().getPath(), "key=100.0");
+    fs.mkdirs(path);
+    path = new Path(tableDesc.getUri().getPath(), "key=110.0");
+    fs.mkdirs(path);
+    path = new Path(tableDesc.getUri().getPath(), "key=");
+    fs.mkdirs(path);
+    path = new Path(tableDesc.getUri().getPath(), "col1=a");
+    fs.mkdirs(path);
+    assertEquals(9, fs.listStatus(path.getParent()).length);
+
+    executeString("ALTER TABLE " + canonicalTableName + " REPAIR PARTITION").close();
+
+    verifyPartitionCount(databaseName, tableName, 7);
+
+    res = executeString("SELECT * FROM " + canonicalTableName + " ORDER BY col1, col2 desc,
key desc;");
+    result = resultSetToString(res);
+    res.close();
+    assertEquals(expectedResult, result);
+
+    executeString("DROP TABLE " + canonicalTableName + " PURGE").close();
+  }
+
+  @Test
+  public void testRepairPartitionWithDatePartitionColumn()  throws Exception {
+    String databaseName = getCurrentDatabase().toLowerCase();
+    String tableName = "testRepairPartitionWithDatePartitionColumn".toLowerCase();
+    String canonicalTableName = CatalogUtil.getCanonicalTableName(databaseName, tableName);
+
+    executeString(
+      "create table " + canonicalTableName + "(col1 int4, col2 int4) partition by column(key
date) "
+        + " as select l_orderkey, l_partkey, l_shipdate::date from default.lineitem").close();
+
+    TableDesc tableDesc = catalog.getTableDesc(databaseName, tableName);
+    assertNotNull(tableDesc);
+
+    verifyPartitionCount(databaseName, tableName, 5);
+
+    ResultSet res = executeString("SELECT * FROM " + canonicalTableName + " ORDER BY col1,
col2 desc, key desc;");
+    String result = resultSetToString(res);
+    String expectedResult = "col1,col2,key\n" +
+      "-------------------------------\n" +
+      "1,1,1996-04-12\n" +
+      "1,1,1996-03-13\n" +
+      "2,2,1997-01-28\n" +
+      "3,3,1993-11-09\n" +
+      "3,2,1994-02-02\n";
+    res.close();
+    assertEquals(expectedResult, result);
+
+    // Remove all partitions
+    dropPartitions(databaseName, tableName, tableDesc.getPartitionMethod().getExpressionSchema().getAllColumns());
+
+    verifyPartitionCount(databaseName, tableName, 0);
+
+    executeString("ALTER TABLE " + canonicalTableName + " REPAIR PARTITION").close();
+
+    verifyPartitionCount(databaseName, tableName, 5);
+
+    res = executeString("SELECT * FROM " + canonicalTableName + " ORDER BY col1, col2 desc,
key desc;");
+    result = resultSetToString(res);
+    res.close();
+    assertEquals(expectedResult, result);
+
+    executeString("DROP TABLE " + canonicalTableName + " PURGE").close();
+  }
+
+  @Test
+  public void testRepairPartitionWithTimestampPartitionColumn()  throws Exception {
+    String databaseName = getCurrentDatabase().toLowerCase();
+    String tableName = "testRepairPartitionWithTimestampPartitionColumn".toLowerCase();
+    String canonicalTableName = CatalogUtil.getCanonicalTableName(databaseName, tableName);
+
+    executeString(
+      "create table " + canonicalTableName + "(col1 int4, col2 int4) partition by column(key
timestamp) "
+        + " as select l_orderkey, l_partkey, to_timestamp(l_shipdate, 'YYYY-MM-DD') from
default.lineitem");
+
+    TableDesc tableDesc = catalog.getTableDesc(databaseName, tableName);
+    assertNotNull(tableDesc);
+
+    verifyPartitionCount(databaseName, tableName, 5);
+
+    ResultSet res = executeString("SELECT * FROM " + canonicalTableName + " ORDER BY col1,
col2 desc, key desc;");
+    String result = resultSetToString(res);
+    String expectedResult = "col1,col2,key\n" +
+      "-------------------------------\n" +
+      "1,1,1996-04-12 00:00:00\n" +
+      "1,1,1996-03-13 00:00:00\n" +
+      "2,2,1997-01-28 00:00:00\n" +
+      "3,3,1993-11-09 00:00:00\n" +
+      "3,2,1994-02-02 00:00:00\n";
+    res.close();
+    assertEquals(expectedResult, result);
+
+    // Remove all partitions
+    dropPartitions(databaseName, tableName, tableDesc.getPartitionMethod().getExpressionSchema().getAllColumns());
+
+    verifyPartitionCount(databaseName, tableName, 0);
+
+    executeString("ALTER TABLE " + canonicalTableName + " REPAIR PARTITION").close();
+
+    verifyPartitionCount(databaseName, tableName, 5);
+
+    res = executeString("SELECT * FROM " + canonicalTableName + " ORDER BY col1, col2 desc,
key desc;");
+    result = resultSetToString(res);
+    res.close();
+    assertEquals(expectedResult, result);
+
+    executeString("DROP TABLE " + canonicalTableName + " PURGE").close();
+  }
+
+  @Test
+  public void testRepairPartitionWithTimesPartitionColumn()  throws Exception {
+    String databaseName = getCurrentDatabase().toLowerCase();
+    String tableName = "testRepairPartitionWithTimesPartitionColumn".toLowerCase();
+    String canonicalTableName = CatalogUtil.getCanonicalTableName(databaseName, tableName);
+
+    executeString(
+      "create table " + canonicalTableName + "(col1 int4, col2 int4) partition by column(key
time) "
+        + " as select l_orderkey, l_partkey " +
+        " , CASE l_shipdate WHEN '1996-03-13' THEN cast ('11:20:40' as time) " +
+        " WHEN '1997-01-28' THEN cast ('12:10:20' as time) " +
+        " WHEN '1994-02-02' THEN cast ('12:10:30' as time) " +
+        " ELSE cast ('00:00:00' as time) END " +
+        " from default.lineitem");
+
+    TableDesc tableDesc = catalog.getTableDesc(databaseName, tableName);
+    assertNotNull(tableDesc);
+
+    ResultSet res = executeString("SELECT * FROM " + canonicalTableName + " ORDER BY col1,
col2 desc, key desc;");
+    String result = resultSetToString(res);
+    String expectedResult = "col1,col2,key\n" +
+      "-------------------------------\n" +
+      "1,1,11:20:40\n" +
+      "1,1,00:00:00\n" +
+      "2,2,12:10:20\n" +
+      "3,3,00:00:00\n" +
+      "3,2,12:10:30\n";
+    res.close();
+    assertEquals(expectedResult, result);
+
+    verifyPartitionCount(databaseName, tableName, 4);
+
+    // Remove all partitions
+    dropPartitions(databaseName, tableName, tableDesc.getPartitionMethod().getExpressionSchema().getAllColumns());
+
+    verifyPartitionCount(databaseName, tableName, 0);
+
+    executeString("ALTER TABLE " + canonicalTableName + " REPAIR PARTITION").close();
+
+    verifyPartitionCount(databaseName, tableName, 4);
+
+    res = executeString("SELECT * FROM " + canonicalTableName + " ORDER BY col1, col2 desc,
key desc;");
+    result = resultSetToString(res);
+    res.close();
+    assertEquals(expectedResult, result);
+
+    executeString("DROP TABLE " + canonicalTableName + " PURGE").close();
+  }
+
+
+  @Test
+  public void testRepairPartitionWithMutiplePartitionColumn()  throws Exception {
+    String databaseName = getCurrentDatabase().toLowerCase();
+    String tableName = "testRepairPartitionWithMutiplePartitionColumn".toLowerCase();
+    String canonicalTableName = CatalogUtil.getCanonicalTableName(databaseName, tableName);
+
+    executeString("create table " + canonicalTableName + " (col4 text) "
+      + " partition by column(col1 int4, col2 int4, col3 float8) as select l_returnflag,
l_orderkey, l_partkey, " +
+      "l_quantity from default.lineitem");
+
+    TableDesc tableDesc = catalog.getTableDesc(databaseName, tableName);
+    assertNotNull(tableDesc);
+
+    ResultSet res = executeString("SELECT * FROM " + canonicalTableName
+      + " ORDER BY col1, col2 desc, col3 desc, col4;");
+    String result = resultSetToString(res);
+    String expectedResult = "col4,col1,col2,col3\n" +
+      "-------------------------------\n" +
+      "N,1,1,36.0\n" +
+      "N,1,1,17.0\n" +
+      "N,2,2,38.0\n" +
+      "R,3,3,49.0\n" +
+      "R,3,2,45.0\n";
+    res.close();
+    assertEquals(expectedResult, result);
+
+    verifyPartitionCount(databaseName, tableName, 5);
+
+    // Remove all partitions
+    dropPartitions(databaseName, tableName, tableDesc.getPartitionMethod().getExpressionSchema().getAllColumns());
+
+    verifyPartitionCount(databaseName, tableName, 0);
+
+    executeString("ALTER TABLE " + canonicalTableName + " REPAIR PARTITION").close();
+
+    verifyPartitionCount(databaseName, tableName, 5);
+
+    res = executeString("SELECT * FROM " + canonicalTableName
+      + " ORDER BY col1, col2 desc, col3 desc, col4;");    result = resultSetToString(res);
+    res.close();
+    assertEquals(expectedResult, result);
+
+    executeString("DROP TABLE " + canonicalTableName + " PURGE").close();
+  }
+
+
   private void verifyPartitionCount(String databaseName, String tableName, int expectedCount)
     throws UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException,
     UndefinedPartitionException {
@@ -206,4 +503,40 @@ public class TestAlterTable extends QueryTestCaseBase {
     assertNotNull(partitions);
     assertEquals(partitions.size(), expectedCount);
   }
+
+  private void dropPartitions(String databaseName, String tableName, List<Column> colums)
+    throws Exception {
+    String canonicalTableName = CatalogUtil.getCanonicalTableName(databaseName, tableName);
+    List<CatalogProtos.PartitionDescProto> partitions = catalog.getPartitionsOfTable(databaseName,
tableName);
+
+    StringBuilder sb = new StringBuilder();
+    for (CatalogProtos.PartitionDescProto partition : partitions) {
+
+      sb.delete(0, sb.length());
+      sb.append("ALTER TABLE ").append(canonicalTableName).append(" DROP PARTITION (");
+
+      String[] splitPartitionName = partition.getPartitionName().split(File.separator);
+      for(int i = 0; i < splitPartitionName.length; i++) {
+        String[] partitionColumnValue = splitPartitionName[i].split("=");
+        if (i > 0) {
+          sb.append(",");
+        }
+
+        switch (colums.get(i).getDataType().getType()) {
+          case TEXT:
+          case TIME:
+          case TIMESTAMP:
+          case DATE:
+            sb.append(partitionColumnValue[0]).append("='").append(partitionColumnValue[1]).append("'");
+            break;
+          default:
+            sb.append(partitionColumnValue[0]).append("=").append(partitionColumnValue[1]);
+            break;
+        }
+      }
+      sb.append(")");
+      executeString(sb.toString()).close();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/0476ab84/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
index 013570e..588ea1f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
@@ -50,6 +50,7 @@ import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.StringUtils;
 import org.apache.tajo.util.TUtil;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -625,13 +626,25 @@ public class DDLExecutor {
     // Find missing partitions from CatalogStore
     List<PartitionDescProto> targetPartitions = TUtil.newList();
     for(Path filteredPath : filteredPaths) {
-      PartitionDescProto targetPartition = getPartitionDesc(simpleTableName, filteredPath);
-      if (!existingPartitionNames.contains(targetPartition.getPartitionName())) {
+
+      int startIdx = filteredPath.toString().indexOf(PartitionedTableRewriter.getColumnPartitionPathPrefix
+        (partitionColumns));
+
+      // if there is partition column in the path
+      if (startIdx > -1) {
+        PartitionDescProto targetPartition = getPartitionDesc(tablePath, filteredPath);
+        if (!existingPartitionNames.contains(targetPartition.getPartitionName())) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Partitions not in CatalogStore:" + targetPartition.getPartitionName());
+          }
+          targetPartitions.add(targetPartition);
+        }
+      } else {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Partitions not in CatalogStore:" + targetPartition.getPartitionName());
+          LOG.debug("Invalid partition path:" + filteredPath.toString());
         }
-        targetPartitions.add(targetPartition);
       }
+
     }
 
     catalog.addPartitions(databaseName, simpleTableName, targetPartitions, true);
@@ -645,12 +658,11 @@ public class DDLExecutor {
     LOG.info("Total added partitions to CatalogStore: " + targetPartitions.size());
   }
 
-  private PartitionDescProto getPartitionDesc(String tableName, Path path) throws IOException
{
-    String partitionPath = path.toString();
+  private PartitionDescProto getPartitionDesc(Path tablePath, Path partitionPath) throws
IOException {
+    String partitionName = StringUtils.unescapePathName(partitionPath.toString());
 
-    String partitionName = StringUtils.unescapePathName(partitionPath);
-    int startIndex = partitionPath.indexOf(tableName);
-    partitionName = partitionName.substring(startIndex + tableName.length() + 1, partitionPath.length());
+    int startIndex = partitionName.indexOf(tablePath.toString()) + tablePath.toString().length();
+    partitionName = partitionName.substring(startIndex +  File.separator.length());
 
     CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder();
     builder.setPartitionName(partitionName);
@@ -668,7 +680,7 @@ public class DDLExecutor {
       builder.addPartitionKeys(keyBuilder.build());
     }
 
-    builder.setPath(partitionPath);
+    builder.setPath(partitionPath.toString());
 
     return builder.build();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/0476ab84/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
index 5b1a1f1..fc0b1bb 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
@@ -479,7 +479,7 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule
{
    * @param partitionColumn the schema of column partition
    * @return The first part string of column partition path.
    */
-  private static String getColumnPartitionPathPrefix(Schema partitionColumn) {
+  public static String getColumnPartitionPathPrefix(Schema partitionColumn) {
     StringBuilder sb = new StringBuilder();
     sb.append(partitionColumn.getColumn(0).getSimpleName()).append("=");
     return sb.toString();


Mime
View raw message