tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From blrun...@apache.org
Subject [1/2] tajo git commit: TAJO-1346: Create dynamic partitions to CatalogStore by running insert query or CTAS query. (jaehwa)
Date Fri, 31 Jul 2015 03:06:59 GMT
Repository: tajo
Updated Branches:
  refs/heads/master 00ccb8ba9 -> d80c32b28


http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 4256582..9279f64 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -31,6 +31,8 @@ import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto;
 import org.apache.tajo.catalog.statistics.ColumnStats;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -59,6 +61,7 @@ import org.apache.tajo.storage.Tablespace;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.history.StageHistory;
 import org.apache.tajo.util.history.TaskHistory;
 import org.apache.tajo.worker.FetchImpl;
@@ -483,6 +486,18 @@ public class Stage implements EventHandler<StageEvent> {
     return stageHistory;
   }
 
+  public List<PartitionDescProto> getPartitions() {
+    List<PartitionDescProto> partitions = TUtil.newList();
+
+    for(Task eachTask : getTasks()) {
+      if (eachTask.getLastAttempt() != null && !eachTask.getLastAttempt().getPartitions().isEmpty())
{
+        partitions.addAll(eachTask.getLastAttempt().getPartitions());
+      }
+    }
+
+    return partitions;
+  }
+
   /**
    * It finalizes this stage. It is only invoked when the stage is finalizing.
    */

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
index 6c48d3b..f5fcfa7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.state.*;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.ResourceProtos.TaskCompletionReport;
 import org.apache.tajo.ResourceProtos.ShuffleFileOutput;
@@ -35,6 +36,7 @@ import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptSched
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 import org.apache.tajo.querymaster.Task.IntermediateEntry;
 import org.apache.tajo.querymaster.Task.PullHost;
+import org.apache.tajo.util.TUtil;
 
 import java.util.ArrayList;
 import java.util.EnumSet;
@@ -67,6 +69,8 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent>
{
   private CatalogProtos.TableStatsProto inputStats;
   private CatalogProtos.TableStatsProto resultStats;
 
+  private List<PartitionDescProto> partitions;
+
   protected static final StateMachineFactory
       <TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
       stateMachineFactory = new StateMachineFactory
@@ -190,6 +194,8 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent>
{
     this.writeLock = readWriteLock.writeLock();
 
     stateMachine = stateMachineFactory.make(this);
+
+    this.partitions = TUtil.newList();
   }
 
   public TaskAttemptState getState() {
@@ -252,6 +258,14 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent>
{
     return new TableStats(resultStats);
   }
 
+  public List<PartitionDescProto> getPartitions() {
+    return partitions;
+  }
+
+  public void setPartitions(List<PartitionDescProto> partitions) {
+    this.partitions = partitions;
+  }
+
   private void fillTaskStatistics(TaskCompletionReport report) {
     this.progress = 1.0f;
 
@@ -392,6 +406,10 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent>
{
       TaskCompletionReport report = ((TaskCompletionEvent)event).getReport();
 
       try {
+        if (report.getPartitionsCount() > 0) {
+          taskAttempt.setPartitions(report.getPartitionsList());
+        }
+
         taskAttempt.fillTaskStatistics(report);
         taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_SUCCEEDED));
       } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 5d7a53a..5996118 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
@@ -85,6 +86,8 @@ public class TaskAttemptContext {
 
   private EvalContext evalContext = new EvalContext();
 
+  private List<PartitionDescProto> partitions;
+
   public TaskAttemptContext(QueryContext queryContext, final ExecutionBlockContext executionBlockContext,
                             final TaskAttemptId taskId,
                             final FragmentProto[] fragments,
@@ -116,6 +119,8 @@ public class TaskAttemptContext {
     this.state = TaskAttemptState.TA_PENDING;
 
     this.partitionOutputVolume = Maps.newHashMap();
+
+    this.partitions = TUtil.newList();
   }
 
   @VisibleForTesting
@@ -414,4 +419,18 @@ public class TaskAttemptContext {
   public EvalContext getEvalContext() {
     return evalContext;
   }
+
+  public List<PartitionDescProto> getPartitions() {
+    return partitions;
+  }
+
+  public void setPartitions(List<PartitionDescProto> partitions) {
+    this.partitions = partitions;
+  }
+
+  public void addPartition(PartitionDescProto partition) {
+    if (!partitions.contains(partition)) {
+      partitions.add(partition);
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index 92c682c..7b8d06f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -329,6 +329,10 @@ public class TaskImpl implements Task {
       builder.setResultStats(new TableStats().getProto());
     }
 
+    if (!context.getPartitions().isEmpty()) {
+      builder.addAllPartitions(context.getPartitions());
+    }
+
     Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs();
     if (it.hasNext()) {
       do {

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/main/proto/ResourceProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceProtos.proto b/tajo-core/src/main/proto/ResourceProtos.proto
index 97bf05e..e789b81 100644
--- a/tajo-core/src/main/proto/ResourceProtos.proto
+++ b/tajo-core/src/main/proto/ResourceProtos.proto
@@ -108,6 +108,7 @@ message TaskCompletionReport {
   optional TableStatsProto inputStats = 3;
   optional TableStatsProto resultStats = 4;
   repeated ShuffleFileOutput shuffleFileOutputs = 5;
+  repeated PartitionDescProto partitions = 6;
 }
 
 message TaskFatalErrorReport {

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java b/tajo-core/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
index 0e37b47..bb14aec 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
@@ -379,6 +379,7 @@ public class TestSQLAnalyzer {
     assertEquals("1", value1.getValue());
     LiteralValue value2 = (LiteralValue)alterTable.getValues()[1];
     assertEquals("2", value2.getValue());
+    assertFalse(alterTable.isIfNotExists());
   }
 
   @Test
@@ -397,6 +398,7 @@ public class TestSQLAnalyzer {
     LiteralValue value2 = (LiteralValue)alterTable.getValues()[1];
     assertEquals("2", value2.getValue());
     assertEquals(alterTable.getLocation(), "hdfs://xxx.com/warehouse/table1/col1=1/col2=2");
+    assertFalse(alterTable.isIfNotExists());
   }
 
   @Test
@@ -418,6 +420,7 @@ public class TestSQLAnalyzer {
     LiteralValue value3 = (LiteralValue)alterTable.getValues()[2];
     assertEquals("11", value3.getValue());
     assertEquals(alterTable.getLocation(), "hdfs://xxx.com/warehouse/table1/col1=2015/col2=01/col3=11");
+    assertFalse(alterTable.isIfNotExists());
   }
 
   @Test
@@ -432,6 +435,22 @@ public class TestSQLAnalyzer {
     assertEquals("col1", alterTable.getColumns()[0].getName());
     LiteralValue value1 = (LiteralValue)alterTable.getValues()[0];
     assertEquals("TAJO", value1.getValue());
+    assertFalse(alterTable.isIfNotExists());
+  }
+
+  @Test
+  public void testAlterTableAddPartition5() throws IOException {
+    String sql = FileUtil.readTextFileFromResource("queries/default/alter_table_add_partition_5.sql");
+    Expr expr = parseQuery(sql);
+    assertEquals(OpType.AlterTable, expr.getType());
+    AlterTable alterTable = (AlterTable)expr;
+    assertEquals(alterTable.getAlterTableOpType(), AlterTableOpType.ADD_PARTITION);
+    assertEquals(1, alterTable.getColumns().length);
+    assertEquals(1, alterTable.getValues().length);
+    assertEquals("col1", alterTable.getColumns()[0].getName());
+    LiteralValue value1 = (LiteralValue)alterTable.getValues()[0];
+    assertEquals("TAJO", value1.getValue());
+    assertTrue(alterTable.isIfNotExists());
   }
 
   @Test
@@ -450,6 +469,7 @@ public class TestSQLAnalyzer {
     LiteralValue value2 = (LiteralValue)alterTable.getValues()[1];
     assertEquals("2", value2.getValue());
     assertFalse(alterTable.isPurge());
+    assertFalse(alterTable.isIfExists());
   }
 
   @Test
@@ -471,6 +491,7 @@ public class TestSQLAnalyzer {
     LiteralValue value3 = (LiteralValue)alterTable.getValues()[2];
     assertEquals("11", value3.getValue());
     assertFalse(alterTable.isPurge());
+    assertFalse(alterTable.isIfExists());
   }
 
   @Test
@@ -486,6 +507,23 @@ public class TestSQLAnalyzer {
     LiteralValue value1 = (LiteralValue)alterTable.getValues()[0];
     assertEquals("TAJO", value1.getValue());
     assertTrue(alterTable.isPurge());
+    assertFalse(alterTable.isIfExists());
+  }
+
+  @Test
+  public void testAlterTableDropPartition4() throws IOException {
+    String sql = FileUtil.readTextFileFromResource("queries/default/alter_table_drop_partition_4.sql");
+    Expr expr = parseQuery(sql);
+    assertEquals(OpType.AlterTable, expr.getType());
+    AlterTable alterTable = (AlterTable)expr;
+    assertEquals(alterTable.getAlterTableOpType(), AlterTableOpType.DROP_PARTITION);
+    assertEquals(1, alterTable.getColumns().length);
+    assertEquals(1, alterTable.getValues().length);
+    assertEquals("col1", alterTable.getColumns()[0].getName());
+    LiteralValue value1 = (LiteralValue)alterTable.getValues()[0];
+    assertEquals("TAJO", value1.getValue());
+    assertTrue(alterTable.isPurge());
+    assertTrue(alterTable.isIfExists());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
index 8cdaf80..8339ea7 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
@@ -31,7 +31,6 @@ import org.junit.experimental.categories.Category;
 import java.sql.ResultSet;
 import java.util.List;
 
-import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.junit.Assert.*;
 
 @Category(IntegrationTest.class)
@@ -87,6 +86,7 @@ public class TestAlterTable extends QueryTestCaseBase {
     assertEquals(retrieved.getPartitionMethod().getExpressionSchema().getColumn(1).getSimpleName(),
"col4");
 
     executeDDL("alter_table_add_partition1.sql", null);
+    executeDDL("alter_table_add_partition2.sql", null);
 
     List<CatalogProtos.PartitionDescProto> partitions = catalog.getPartitions("TestAlterTable",
"partitioned_table");
     assertNotNull(partitions);
@@ -104,6 +104,7 @@ public class TestAlterTable extends QueryTestCaseBase {
     assertTrue(partitionPath.toString().indexOf("col3=1/col4=2") > 0);
 
     executeDDL("alter_table_drop_partition1.sql", null);
+    executeDDL("alter_table_drop_partition2.sql", null);
 
     partitions = catalog.getPartitions("TestAlterTable", "partitioned_table");
     assertNotNull(partitions);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index d18db71..6eb2841 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -19,10 +19,7 @@
 package org.apache.tajo.engine.query;
 
 import com.google.common.collect.Maps;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.DeflateCodec;
@@ -34,12 +31,13 @@ import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.exception.ReturnStateUtil;
+import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.exception.ReturnStateUtil;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.jdbc.FetchResultSet;
 import org.apache.tajo.jdbc.TajoMemoryResultSet;
@@ -123,6 +121,10 @@ public class TestTablePartitions extends QueryTestCaseBase {
     assertEquals(SCATTERED_HASH_SHUFFLE, channel.getShuffleType());
     assertEquals(1, channel.getShuffleKeys().length);
 
+    TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"},
+      tableDesc.getStats().getNumRows());
+
     executeString("DROP TABLE " + tableName + " PURGE").close();
     res.close();
   }
@@ -169,6 +171,10 @@ public class TestTablePartitions extends QueryTestCaseBase {
     assertEquals(SCATTERED_HASH_SHUFFLE, channel.getShuffleType());
     assertEquals(1, channel.getShuffleKeys().length);
 
+    TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"},
+      tableDesc.getStats().getNumRows());
+
     executeString("DROP TABLE " + tableName + " PURGE").close();
     res.close();
   }
@@ -195,6 +201,10 @@ public class TestTablePartitions extends QueryTestCaseBase {
     }
     res.close();
 
+    TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"},
+      tableDesc.getStats().getNumRows());
+
     executeString("DROP TABLE " + tableName + " PURGE").close();
   }
 
@@ -234,6 +244,10 @@ public class TestTablePartitions extends QueryTestCaseBase {
       assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(1));
       assertEquals(resultRows1.get(res.getDouble(4))[1], res.getInt(2));
     }
+
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName,
+      new String[]{"key"}, desc.getStats().getNumRows());
+
     executeString("DROP TABLE " + tableName + " PURGE").close();
     res.close();
   }
@@ -338,6 +352,9 @@ public class TestTablePartitions extends QueryTestCaseBase {
     assertResultSet(res, "case13.result");
     res.close();
 
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"},
+      desc.getStats().getNumRows());
+
     executeString("DROP TABLE " + tableName + " PURGE").close();
     res.close();
   }
@@ -411,6 +428,13 @@ public class TestTablePartitions extends QueryTestCaseBase {
       assertEquals(resultRows2.get(res.getDouble(4))[1], res.getInt(3));
     }
 
+    res = executeString("SELECT col1, col2, col3 FROM " + tableName);
+    String result = resultSetToString(res);
+    res.close();
+
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1",
"col2", "col3"},
+      desc.getStats().getNumRows());
+
     executeString("DROP TABLE " + tableName + " PURGE").close();
     res.close();
   }
@@ -438,6 +462,10 @@ public class TestTablePartitions extends QueryTestCaseBase {
     res.close();
 
     TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1",
"col2", "col3"},
+      desc.getStats().getNumRows());
+
     Path path = new Path(desc.getUri());
 
     FileSystem fs = FileSystem.get(conf);
@@ -479,6 +507,11 @@ public class TestTablePartitions extends QueryTestCaseBase {
     res.close();
 
     desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+
+    // TODO: When inserting into already exists partitioned table, table status need to change
correctly.
+//    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1",
"col2", "col3"},
+//      desc.getStats().getNumRows());
+
     path = new Path(desc.getUri());
 
     verifyDirectoriesForThreeColumns(fs, path, 2);
@@ -632,6 +665,9 @@ public class TestTablePartitions extends QueryTestCaseBase {
       }
     }
 
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1"},
+      desc.getStats().getNumRows());
+
     executeString("DROP TABLE " + tableName + " PURGE").close();
   }
 
@@ -688,6 +724,9 @@ public class TestTablePartitions extends QueryTestCaseBase {
       }
     }
 
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1",
"col2"},
+      desc.getStats().getNumRows());
+
     executeString("DROP TABLE " + tableName + " PURGE").close();
   }
 
@@ -783,6 +822,9 @@ public class TestTablePartitions extends QueryTestCaseBase {
     res.close();
     assertEquals(3, i);
 
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1",
"col2", "col3"},
+      desc.getStats().getNumRows());
+
     executeString("DROP TABLE " + tableName + " PURGE").close();
   }
 
@@ -851,6 +893,9 @@ public class TestTablePartitions extends QueryTestCaseBase {
     assertFalse(res.next());
     res.close();
 
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1",
"col2", "col3"},
+      desc.getStats().getNumRows());
+
     executeString("DROP TABLE " + tableName + " PURGE").close();
   }
 
@@ -874,6 +919,10 @@ public class TestTablePartitions extends QueryTestCaseBase {
     assertResultSet(res, "case14.result");
     res.close();
 
+    TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"},
+      desc.getStats().getNumRows());
+
     executeString("DROP TABLE " + tableName + " PURGE").close();
   }
 
@@ -900,6 +949,10 @@ public class TestTablePartitions extends QueryTestCaseBase {
       assertResultSet(res, "case15.result");
       res.close();
 
+      TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+      verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"},
+        desc.getStats().getNumRows());
+
       executeString("DROP TABLE " + tableName + " PURGE").close();
     }
   }
@@ -982,6 +1035,10 @@ public class TestTablePartitions extends QueryTestCaseBase {
     assertResultSet(res);
     res.close();
 
+    TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col2"},
+      desc.getStats().getNumRows());
+
     executeString("DROP TABLE " + tableName + " PURGE").close();
   }
 
@@ -1009,6 +1066,10 @@ public class TestTablePartitions extends QueryTestCaseBase {
     assertResultSet(res);
     res.close();
 
+    TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+    verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col2"},
+      desc.getStats().getNumRows());
+
     executeString("DROP TABLE " + tableName + " PURGE").close();
   }
 
@@ -1085,6 +1146,10 @@ public class TestTablePartitions extends QueryTestCaseBase {
       }
       assertEquals(data.size(), numRows);
 
+      TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, "test_partition");
+      verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, "test_partition", new String[]{"col1"},
+        desc.getStats().getNumRows());
+
     } finally {
       testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.varname,
           TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.defaultVal);
@@ -1178,4 +1243,59 @@ public class TestTablePartitions extends QueryTestCaseBase {
     }
   }
 
+  /**
+   * Verify added partitions to a table. This would check each partition's directory using
record of table.
+   *
+   *
+   * @param databaseName
+   * @param tableName
+   * @param partitionColumns
+   * @param numRows
+   * @throws Exception
+   */
+  private void verifyPartitionDirectoryFromCatalog(String databaseName, String tableName,
+                                                   String[] partitionColumns, Long numRows)
throws Exception {
+    int rowCount = 0;
+
+    // Get all partition column values
+    StringBuilder query = new StringBuilder();
+    query.append("SELECT");
+    for (int i = 0; i < partitionColumns.length; i++) {
+      String partitionColumn = partitionColumns[i];
+      if (i > 0) {
+        query.append(",");
+      }
+      query.append(" ").append(partitionColumn);
+    }
+    query.append(" FROM ").append(tableName);
+    ResultSet res = executeString(query.toString());
+
+    StringBuilder partitionName = new StringBuilder();
+    PartitionDescProto partitionDescProto = null;
+
+    // Check whether that partition's directory exist or doesn't exist.
+    while(res.next()) {
+      partitionName.delete(0, partitionName.length());
+
+      for (int i = 0; i < partitionColumns.length; i++) {
+        String partitionColumn = partitionColumns[i];
+        if (i > 0) {
+          partitionName.append("/");
+        }
+        partitionName.append(partitionColumn).append("=").append(res.getString(partitionColumn));
+      }
+      partitionDescProto = catalog.getPartition(databaseName, tableName, partitionName.toString());
+      assertNotNull(partitionDescProto);
+      assertTrue(partitionDescProto.getPath().indexOf(tableName + "/" + partitionName.toString())
> 0);
+
+      rowCount++;
+    }
+
+    res.close();
+
+    // Check row count.
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      assertEquals(numRows, new Long(rowCount));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/test/resources/queries/TestAlterTable/alter_table_add_partition2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestAlterTable/alter_table_add_partition2.sql
b/tajo-core/src/test/resources/queries/TestAlterTable/alter_table_add_partition2.sql
new file mode 100644
index 0000000..315ac47
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestAlterTable/alter_table_add_partition2.sql
@@ -0,0 +1 @@
+ALTER TABLE partitioned_table ADD IF NOT EXISTS PARTITION (col3 = 1 , col4 = 2)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql
b/tajo-core/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql
new file mode 100644
index 0000000..0d4c932
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql
@@ -0,0 +1 @@
+ALTER TABLE partitioned_table DROP IF EXISTS PARTITION (col3 = 1 , col4 = 2)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/test/resources/queries/default/alter_table_add_partition_5.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/default/alter_table_add_partition_5.sql
b/tajo-core/src/test/resources/queries/default/alter_table_add_partition_5.sql
new file mode 100644
index 0000000..127d999
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/default/alter_table_add_partition_5.sql
@@ -0,0 +1 @@
+ALTER TABLE table1 ADD IF NOT EXISTS PARTITION (col1 = 'TAJO' )
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/test/resources/queries/default/alter_table_drop_partition_4.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/default/alter_table_drop_partition_4.sql
b/tajo-core/src/test/resources/queries/default/alter_table_drop_partition_4.sql
new file mode 100644
index 0000000..44c7977
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/default/alter_table_drop_partition_4.sql
@@ -0,0 +1 @@
+ALTER TABLE table1 DROP IF EXISTS PARTITION (col1 = 'TAJO' ) PURGE
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-core/src/test/resources/results/TestTajoCli/testAlterTableAddDropPartition.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testAlterTableAddDropPartition.result
b/tajo-core/src/test/resources/results/TestTajoCli/testAlterTableAddDropPartition.result
index d01038a..b09f134 100644
--- a/tajo-core/src/test/resources/results/TestTajoCli/testAlterTableAddDropPartition.result
+++ b/tajo-core/src/test/resources/results/TestTajoCli/testAlterTableAddDropPartition.result
@@ -1,5 +1,5 @@
 OK
-ERROR: "key2" column is not the partition key of "default.testaltertableaddpartition".
+ERROR: 'key2' column is not the partition key
 OK
 OK
 ERROR: partition 'key=0.1' does not exist

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-dist/src/main/conf/catalog-site.xml.template
----------------------------------------------------------------------
diff --git a/tajo-dist/src/main/conf/catalog-site.xml.template b/tajo-dist/src/main/conf/catalog-site.xml.template
index 365de6b..9db714e 100644
--- a/tajo-dist/src/main/conf/catalog-site.xml.template
+++ b/tajo-dist/src/main/conf/catalog-site.xml.template
@@ -49,7 +49,7 @@
  </property>
  <property>
   <name>tajo.catalog.jdbc.uri</name>
-  <value>jdbc:mysql://<host name>:<mysql port>/<database name for tajo>?createDatabaseIfNotExist=true</value>
+  <value>jdbc:mysql://<host name>:<mysql port>/<database name for tajo>?rewriteBatchedStatements=true</value>
  </property>
  -->
 
@@ -61,7 +61,7 @@
  </property>
  <property>
   <name>tajo.catalog.jdbc.uri</name>
-  <value>jdbc:mariadb://<mariadb host name>:<mariadb port>/<database
name for tajo>?createDatabaseIfNotExist=true</value>
+  <value>jdbc:mariadb://<mariadb host name>:<mariadb port>/<database
name for tajo>?rewriteBatchedStatements=true</value>
  </property>
  -->
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-docs/src/main/sphinx/sql_language/alter_table.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/sql_language/alter_table.rst b/tajo-docs/src/main/sphinx/sql_language/alter_table.rst
index b9b55fd..ffc34d1 100644
--- a/tajo-docs/src/main/sphinx/sql_language/alter_table.rst
+++ b/tajo-docs/src/main/sphinx/sql_language/alter_table.rst
@@ -79,7 +79,7 @@ ADD PARTITION
   ALTER TABLE table1 ADD PARTITION (col1 = 1 , col2 = 2)
   ALTER TABLE table1 ADD PARTITION (col1 = 1 , col2 = 2) LOCATION 'hdfs://xxx.com/warehouse/table1/col1=1/col2=2'
 
-You can use ``ALTER TABLE ADD PARTITION`` to add partitions to a table. The location must
be a directory inside of which data files reside. If the location doesn't exist on the file
system, Tajo will make the location by force. ``ADD PARTITION`` changes the table metadata,
but does not load data. If the data does not exist in the partition's location, queries will
not return any results.
+You can use ``ALTER TABLE ADD PARTITION`` to add partitions to a table. The location must
be a directory inside of which data files reside. If the location doesn't exist on the file
system, Tajo will make the location by force. ``ADD PARTITION`` changes the table metadata,
but does not load data. If the data does not exist in the partition's location, queries will
not return any results. An error is thrown if the partition for the table already exists.
You can use ``IF NOT EXISTS`` to skip the error.
 
 ========================
  DROP PARTITION
@@ -89,12 +89,11 @@ You can use ``ALTER TABLE ADD PARTITION`` to add partitions to a table.
The loca
 
 .. code-block:: sql
 
-  ALTER TABLE <table_name> [IF NOT EXISTS] DROP PARTITION (<partition column>
= <partition value>, ...) [PURGE]
+  ALTER TABLE <table_name> [IF EXISTS] DROP PARTITION (<partition column> = <partition
value>, ...) [PURGE]
 
   For example:
   ALTER TABLE table1 DROP PARTITION (col1 = 1 , col2 = 2)
   ALTER TABLE table1 DROP PARTITION (col1 = '2015' , col2 = '01', col3 = '11' )
   ALTER TABLE table1 DROP PARTITION (col1 = 'TAJO' ) PURGE
 
-You can use ``ALTER TABLE DROP PARTITION`` to drop a partition for a table. This removes
the data for a managed table
- and this doesn't remove the data for an external table. But if ``PURGE`` is specified for
an external table, the partition data will be removed. The metadata is completely lost in
all cases.
\ No newline at end of file
+You can use ``ALTER TABLE DROP PARTITION`` to drop a partition for a table. This removes
the data for a managed table and this doesn't remove the data for an external table. But if
``PURGE`` is specified for an external table, the partition data will be removed. The metadata
is completely lost in all cases. An error is thrown if the partition for the table doesn't
exists. You can use ``IF EXISTS`` to skip the error.

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
index c9e101b..c0727bb 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
@@ -2149,6 +2149,8 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     }
 
     alterTableNode.setPurge(alterTable.isPurge());
+    alterTableNode.setIfNotExists(alterTable.isIfNotExists());
+    alterTableNode.setIfExists(alterTable.isIfExists());
     alterTableNode.setAlterTableOpType(alterTable.getAlterTableOpType());
     return alterTableNode;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-plan/src/main/java/org/apache/tajo/plan/logical/AlterTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/AlterTableNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/AlterTableNode.java
index ecb173a..4e25baa 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/AlterTableNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/AlterTableNode.java
@@ -51,6 +51,10 @@ public class AlterTableNode extends LogicalNode {
   private String location;
   @Expose
   private boolean isPurge;
+  @Expose
+  private boolean ifNotExists;
+  @Expose
+  private boolean ifExists;
 
   public AlterTableNode(int pid) {
     super(pid, NodeType.ALTER_TABLE);
@@ -158,6 +162,22 @@ public class AlterTableNode extends LogicalNode {
     this.isPurge = isPurge;
   }
 
+  public boolean isIfNotExists() {
+    return ifNotExists;
+  }
+
+  public void setIfNotExists(boolean ifNotExists) {
+    this.ifNotExists = ifNotExists;
+  }
+
+  public boolean isIfExists() {
+    return ifExists;
+  }
+
+  public void setIfExists(boolean ifExists) {
+    this.ifExists = ifExists;
+  }
+
   @Override
   public PlanString getPlanString() {
     return new PlanString(this);
@@ -166,7 +186,7 @@ public class AlterTableNode extends LogicalNode {
   @Override
   public int hashCode() {
     return Objects.hashCode(tableName, addNewColumn, alterTableOpType, columnName, newColumnName,
newTableName,
-      tableName, properties, partitionColumns, partitionValues, location, isPurge);
+      tableName, properties, partitionColumns, partitionValues, location, isPurge, ifNotExists,
ifExists);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
index 25897f2..d298cc8 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
@@ -633,6 +633,7 @@ public class LogicalNodeDeserializer {
       if (alterPartition.getLocation() != null) {
         alterTable.setLocation(alterPartition.getLocation());
       }
+      alterTable.setIfNotExists(alterPartition.getIfNotExists());
       break;
     case DROP_PARTITION:
       alterPartition = alterTableProto.getAlterPartition();
@@ -640,6 +641,8 @@ public class LogicalNodeDeserializer {
         .getColumnNamesCount()]));
       alterTable.setPartitionValues(alterPartition.getPartitionValuesList().toArray(new String[alterPartition
         .getPartitionValuesCount()]));
+      alterTable.setPurge(alterPartition.getPurge());
+      alterTable.setIfExists(alterPartition.getIfExists());
       break;
     default:
       throw new UnimplementedException("Unknown SET type in ALTER TABLE: " + alterTableProto.getSetType().name());

http://git-wip-us.apache.org/repos/asf/tajo/blob/d80c32b2/tajo-plan/src/main/proto/Plan.proto
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto
index 4019322..5acbbee 100644
--- a/tajo-plan/src/main/proto/Plan.proto
+++ b/tajo-plan/src/main/proto/Plan.proto
@@ -320,6 +320,8 @@ message AlterTableNode {
     repeated string partitionValues = 21;
     optional string location = 3;
     optional bool purge = 4;
+    optional bool ifNotExists = 5;
+    optional bool ifExists = 6;
   }
 
   required string tableName = 1;


Mime
View raw message