tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From blrun...@apache.org
Subject tajo git commit: TAJO-1798: Dynamic partitioning occasionally fails.
Date Wed, 26 Aug 2015 09:01:20 GMT
Repository: tajo
Updated Branches:
  refs/heads/branch-0.11.0 13f42cf40 -> 8b318f6ea


TAJO-1798: Dynamic partitioning occasionally fails.


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8b318f6e
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8b318f6e
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8b318f6e

Branch: refs/heads/branch-0.11.0
Commit: 8b318f6ea8b61e2e143341a7d07974bc675a6c7c
Parents: 13f42cf
Author: JaeHwa Jung <blrunner@apache.org>
Authored: Wed Aug 26 17:59:20 2015 +0900
Committer: JaeHwa Jung <blrunner@apache.org>
Committed: Wed Aug 26 17:59:20 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../tajo/engine/query/TestTablePartitions.java  | 46 ++++++++++++++--
 .../java/org/apache/tajo/querymaster/Query.java | 56 +++++++++++---------
 .../java/org/apache/tajo/querymaster/Stage.java | 13 +++--
 .../apache/tajo/querymaster/TaskAttempt.java    | 17 +++---
 5 files changed, 91 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/8b318f6e/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 426779b..9fc3de0 100644
--- a/CHANGES
+++ b/CHANGES
@@ -232,6 +232,8 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1798: Dynamic partitioning occasionally fails. (jaehwa)
+    
     TAJO-1799: Fix incorrect event handler when kill-query failed. (jinho)
 
     TAJO-1783: Query result is not returned by invalid output path. (jinho)

http://git-wip-us.apache.org/repos/asf/tajo/blob/8b318f6e/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index 6eb2841..952e26a 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -23,10 +23,7 @@ import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.DeflateCodec;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryTestCaseBase;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.*;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
@@ -1298,4 +1295,45 @@ public class TestTablePartitions extends QueryTestCaseBase {
       assertEquals(numRows, new Long(rowCount));
     }
   }
+
+  @Test
+  public final void testDuplicatedPartitions() throws Exception {
+    String tableName = CatalogUtil.normalizeIdentifier("testDuplicatedPartitions");
+
+    try {
+      executeString("CREATE TABLE lineitem2 as select * from lineitem").close();
+
+      // Execute UNION ALL statement for creating multiple output files.
+      if (nodeType == NodeType.INSERT) {
+        executeString(
+          "create table " + tableName + " (col1 int4, col2 int4) partition by column(key
text) ").close();
+
+        executeString(
+          "insert overwrite into " + tableName
+            + " select a.l_orderkey, a.l_partkey, a.l_returnflag from lineitem a union all"
+            + " select b.l_orderkey, b.l_partkey, b.l_returnflag from lineitem2 b"
+        ).close();
+      } else {
+        executeString(
+          "create table " + tableName + "(col1 int4, col2 int4) partition by column(key text)
as "
+            + " select a.l_orderkey, a.l_partkey, a.l_returnflag from lineitem a union all"
+            + " select b.l_orderkey, b.l_partkey, b.l_returnflag from lineitem2 b"
+        ).close();
+      }
+
+      // If duplicated partitions had been removed, partitions just will contain 'KEY=N'
partition and 'KEY=R'
+      // partition. In previous Query and Stage, duplicated partitions were not deleted because
they had been in List.
+      // If you want to verify duplicated partitions, you need to use List instead of Set
with DerbyStore.
+      List<PartitionDescProto> partitions = catalog.getPartitions(DEFAULT_DATABASE_NAME,
tableName);
+      assertEquals(2, partitions.size());
+
+      PartitionDescProto firstPartition = catalog.getPartition(DEFAULT_DATABASE_NAME, tableName,
"key=N");
+      assertNotNull(firstPartition);
+      PartitionDescProto secondPartition = catalog.getPartition(DEFAULT_DATABASE_NAME, tableName,
"key=R");
+      assertNotNull(secondPartition);
+    } finally {
+      executeString("DROP TABLE lineitem2 PURGE");
+      executeString("DROP TABLE " + tableName + " PURGE");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8b318f6e/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index b09d5fd..9560353 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.querymaster;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
@@ -333,14 +334,17 @@ public class Query implements EventHandler<QueryEvent> {
   }
 
   public List<PartitionDescProto> getPartitions() {
-    List<PartitionDescProto> partitions = new ArrayList<PartitionDescProto>();
+    Set<PartitionDescProto> partitions = TUtil.newHashSet();
     for(Stage eachStage : getStages()) {
-      if (!eachStage.getPartitions().isEmpty()) {
-        partitions.addAll(eachStage.getPartitions());
-      }
+      partitions.addAll(eachStage.getPartitions());
     }
+    return Lists.newArrayList(partitions);
+  }
 
-    return partitions;
+  public void clearPartitions() {
+    for(Stage eachStage : getStages()) {
+      eachStage.clearPartitions();
+    }
   }
 
   public List<String> getDiagnostics() {
@@ -505,30 +509,30 @@ public class Query implements EventHandler<QueryEvent> {
         QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
         hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(),
finalOutputDir);
 
-        TableDesc desc = query.getResultDesc();
-
-        // If there is partitions
-        List<PartitionDescProto> partitions = query.getPartitions();
-        if (partitions!= null && !partitions.isEmpty()) {
-
-          String databaseName, simpleTableName;
-
-          if (CatalogUtil.isFQTableName(desc.getName())) {
-            String[] split = CatalogUtil.splitFQTableName(desc.getName());
-            databaseName = split[0];
-            simpleTableName = split[1];
+        // Add dynamic partitions to catalog for partition table.
+        if (queryContext.hasOutputTableUri() && queryContext.hasPartition()) {
+          List<PartitionDescProto> partitions = query.getPartitions();
+          if (partitions != null) {
+            String databaseName, simpleTableName;
+
+            if (CatalogUtil.isFQTableName(tableDesc.getName())) {
+              String[] split = CatalogUtil.splitFQTableName(tableDesc.getName());
+              databaseName = split[0];
+              simpleTableName = split[1];
+            } else {
+              databaseName = queryContext.getCurrentDatabase();
+              simpleTableName = tableDesc.getName();
+            }
+
+            // Store partitions to CatalogStore using alter table statement.
+            catalog.addPartitions(databaseName, simpleTableName, partitions, true);
+            LOG.info("Added partitions to catalog (total=" + partitions.size() + ")");
           } else {
-            databaseName = queryContext.getCurrentDatabase();
-            simpleTableName = desc.getName();
+            LOG.info("Can't find partitions for adding.");
           }
-
-          // Store partitions to CatalogStore using alter table statement.
-          catalog.addPartitions(databaseName, simpleTableName, partitions, true);
-        } else {
-          LOG.info("Can't find partitions for adding.");
+          query.clearPartitions();
         }
-
-      } catch (Exception e) {
+      } catch (Throwable e) {
         query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
         return QueryState.QUERY_ERROR;
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8b318f6e/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index cf5cdbd..f6c9cdb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -487,9 +487,8 @@ public class Stage implements EventHandler<StageEvent> {
     return stageHistory;
   }
 
-  public List<PartitionDescProto> getPartitions() {
-    List<PartitionDescProto> partitions = TUtil.newList();
-
+  public Set<PartitionDescProto> getPartitions() {
+    Set<PartitionDescProto> partitions = TUtil.newHashSet();
     for(Task eachTask : getTasks()) {
       if (eachTask.getLastAttempt() != null && !eachTask.getLastAttempt().getPartitions().isEmpty())
{
         partitions.addAll(eachTask.getLastAttempt().getPartitions());
@@ -499,6 +498,14 @@ public class Stage implements EventHandler<StageEvent> {
     return partitions;
   }
 
+  public void clearPartitions() {
+    for(Task eachTask : getTasks()) {
+      if (eachTask.getLastAttempt() != null && !eachTask.getLastAttempt().getPartitions().isEmpty())
{
+        eachTask.getLastAttempt().getPartitions().clear();
+      }
+    }
+  }
+
   /**
    * It finalizes this stage. It is only invoked when the stage is finalizing.
    */

http://git-wip-us.apache.org/repos/asf/tajo/blob/8b318f6e/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
index f5fcfa7..cda62a4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
@@ -38,9 +38,7 @@ import org.apache.tajo.querymaster.Task.IntermediateEntry;
 import org.apache.tajo.querymaster.Task.PullHost;
 import org.apache.tajo.util.TUtil;
 
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -69,7 +67,7 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent>
{
   private CatalogProtos.TableStatsProto inputStats;
   private CatalogProtos.TableStatsProto resultStats;
 
-  private List<PartitionDescProto> partitions;
+  private Set<PartitionDescProto> partitions;
 
   protected static final StateMachineFactory
       <TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
@@ -194,8 +192,7 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent>
{
     this.writeLock = readWriteLock.writeLock();
 
     stateMachine = stateMachineFactory.make(this);
-
-    this.partitions = TUtil.newList();
+    this.partitions = TUtil.newHashSet();
   }
 
   public TaskAttemptState getState() {
@@ -258,12 +255,12 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent>
{
     return new TableStats(resultStats);
   }
 
-  public List<PartitionDescProto> getPartitions() {
+  public Set<PartitionDescProto> getPartitions() {
     return partitions;
   }
 
-  public void setPartitions(List<PartitionDescProto> partitions) {
-    this.partitions = partitions;
+  public void addPartitions(List<PartitionDescProto> partitions) {
+    this.partitions.addAll(partitions);
   }
 
   private void fillTaskStatistics(TaskCompletionReport report) {
@@ -407,7 +404,7 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent>
{
 
       try {
         if (report.getPartitionsCount() > 0) {
-          taskAttempt.setPartitions(report.getPartitionsList());
+          taskAttempt.addPartitions(report.getPartitionsList());
         }
 
         taskAttempt.fillTaskStatistics(report);


Mime
View raw message