tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [3/5] tajo git commit: TAJO-1309: Add missing break point in physical operator. (jinho)
Date Mon, 26 Jan 2015 04:17:17 GMT
TAJO-1309: Add missing break point in physical operator. (jinho)

Closes #355


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/17c6dff4
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/17c6dff4
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/17c6dff4

Branch: refs/heads/index_support
Commit: 17c6dff4e258d93e0ffaa1cc07368b2b5d8b8aa4
Parents: 0024c75
Author: jhkim <jhkim@apache.org>
Authored: Fri Jan 23 10:24:59 2015 +0900
Committer: jhkim <jhkim@apache.org>
Committed: Fri Jan 23 10:24:59 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 ++
 .../tajo/cli/tsql/commands/HelpCommand.java     |  6 +++---
 .../engine/planner/physical/BNLJoinExec.java    |  3 ++-
 .../planner/physical/ExternalSortExec.java      |  2 +-
 .../planner/physical/HashAggregateExec.java     |  2 +-
 .../HashBasedColPartitionStoreExec.java         |  2 +-
 .../planner/physical/HashFullOuterJoinExec.java |  4 ++--
 .../engine/planner/physical/HashJoinExec.java   |  4 ++--
 .../planner/physical/HashLeftAntiJoinExec.java  |  4 ++--
 .../planner/physical/HashLeftOuterJoinExec.java |  4 ++--
 .../planner/physical/HashLeftSemiJoinExec.java  |  2 +-
 .../physical/HashShuffleFileWriteExec.java      |  2 +-
 .../engine/planner/physical/HavingExec.java     |  2 +-
 .../engine/planner/physical/MemSortExec.java    |  2 +-
 .../physical/MergeFullOuterJoinExec.java        |  3 ++-
 .../engine/planner/physical/MergeJoinExec.java  |  3 ++-
 .../engine/planner/physical/NLJoinExec.java     |  3 ++-
 .../planner/physical/NLLeftOuterJoinExec.java   |  3 ++-
 .../physical/PartitionMergeScanExec.java        |  5 ++---
 .../physical/RangeShuffleFileWriteExec.java     |  2 +-
 .../physical/RightOuterMergeJoinExec.java       |  3 ++-
 .../engine/planner/physical/SelectionExec.java  |  2 +-
 .../engine/planner/physical/SeqScanExec.java    |  2 +-
 .../SortBasedColPartitionStoreExec.java         |  2 +-
 .../engine/planner/physical/StoreTableExec.java |  2 +-
 .../org/apache/tajo/master/QueryInProgress.java | 10 ++++++---
 .../apache/tajo/master/TajoContainerProxy.java  | 10 +++++----
 .../tajo/worker/TajoResourceAllocator.java      |  6 +++---
 .../main/java/org/apache/tajo/worker/Task.java  | 22 ++++++++------------
 .../apache/tajo/worker/TaskAttemptContext.java  |  2 +-
 30 files changed, 65 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 6ba73fe..4e14c93 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,8 @@ Release 0.10.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1309: Add missing break point in physical operator. (jinho)
+
     TAJO-1307: HBaseStorageManager need to support for users to use
     hbase-site.xml file. (jaehwa)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/HelpCommand.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/HelpCommand.java
b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/HelpCommand.java
index 5d41e41..ce56d12 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/HelpCommand.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/HelpCommand.java
@@ -18,11 +18,11 @@
 
 package org.apache.tajo.cli.tsql.commands;
 
-import java.io.PrintWriter;
-
 import org.apache.tajo.cli.tsql.TajoCli;
 import org.apache.tajo.util.VersionInfo;
 
+import java.io.PrintWriter;
+
 public class HelpCommand extends TajoShellCommand {
   private String targetDocVersion = "";
 
@@ -79,7 +79,7 @@ public class HelpCommand extends TajoShellCommand {
       sout.println();
 
       sout.println("Variables");
-      sout.println("  \\set [[NAME] [VALUE]  set session variable or list session variables");
+      sout.println("  \\set [NAME] [VALUE]  set session variable or list session variables");
       sout.println("  \\unset NAME           unset session variable");
       sout.println();
       sout.println();

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
index 117b04c..14cf567 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -128,7 +128,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
       rightEnd = true;
     }
 
-    while (true) {
+    while (!context.isStopped()) {
       if (!rightIterator.hasNext()) { // if leftIterator ended
         if (leftIterator.hasNext()) { // if rightTupleslot remains
           leftTuple = leftIterator.next();
@@ -201,6 +201,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
         return outputTuple;
       }
     }
+    return null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 4e19114..c3f9d3d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -203,7 +203,7 @@ public class ExternalSortExec extends SortExec {
 
     int chunkId = 0;
     long runStartTime = System.currentTimeMillis();
-    while ((tuple = child.next()) != null) { // partition sort start
+    while (!context.isStopped() && (tuple = child.next()) != null) { // partition
sort start
       Tuple vtuple = new VTuple(tuple);
       inMemoryTable.add(vtuple);
       memoryConsumption += MemoryUtil.calculateMemorySize(vtuple);

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
index 80bba2b..0d1bf3d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
@@ -48,7 +48,7 @@ public class HashAggregateExec extends AggregationExec {
   private void compute() throws IOException {
     Tuple tuple;
     Tuple keyTuple;
-    while((tuple = child.next()) != null && !context.isStopped()) {
+    while(!context.isStopped() && (tuple = child.next()) != null) {
       keyTuple = new VTuple(groupingKeyIds.length);
       // build one key tuple
       for(int i = 0; i < groupingKeyIds.length; i++) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
index c28a5cd..e94bc26 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
@@ -67,7 +67,7 @@ public class HashBasedColPartitionStoreExec extends ColPartitionStoreExec
{
   public Tuple next() throws IOException {
     Tuple tuple;
     StringBuilder sb = new StringBuilder();
-    while((tuple = child.next()) != null) {
+    while(!context.isStopped() && (tuple = child.next()) != null) {
       // set subpartition directory name
       sb.delete(0, sb.length());
       if (keyIds != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
index 28d9a3e..9cd13fb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -144,7 +144,7 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
     Tuple rightTuple;
     boolean found = false;
 
-    while(!finished) {
+    while(!context.isStopped() && !finished) {
       if (shouldGetLeftTuple) { // initially, it is true.
         // getting new outer
         leftTuple = leftChild.next(); // it comes from a disk
@@ -208,7 +208,7 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
     Tuple tuple;
     Tuple keyTuple;
 
-    while ((tuple = rightChild.next()) != null) {
+    while (!context.isStopped() && (tuple = rightChild.next()) != null) {
       keyTuple = new VTuple(joinKeyPairs.size());
       for (int i = 0; i < rightKeyList.length; i++) {
         keyTuple.put(i, tuple.get(rightKeyList[i]));

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index 701297f..38728b5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -111,7 +111,7 @@ public class HashJoinExec extends BinaryPhysicalExec {
     Tuple rightTuple;
     boolean found = false;
 
-    while(!finished) {
+    while(!context.isStopped() && !finished) {
       if (shouldGetLeftTuple) { // initially, it is true.
         // getting new outer
         leftTuple = leftChild.next(); // it comes from a disk
@@ -156,7 +156,7 @@ public class HashJoinExec extends BinaryPhysicalExec {
     Tuple tuple;
     Tuple keyTuple;
 
-    while ((tuple = rightChild.next()) != null) {
+    while (!context.isStopped() && (tuple = rightChild.next()) != null) {
       keyTuple = new VTuple(joinKeyPairs.size());
       for (int i = 0; i < rightKeyList.length; i++) {
         keyTuple.put(i, tuple.get(rightKeyList[i]));

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
index 236f5e3..cceed3e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
@@ -64,7 +64,7 @@ public class HashLeftAntiJoinExec extends HashJoinExec {
     Tuple rightTuple;
     boolean notFound;
 
-    while(!finished) {
+    while(!context.isStopped() && !finished) {
 
       // getting new outer
       leftTuple = leftChild.next(); // it comes from a disk
@@ -89,7 +89,7 @@ public class HashLeftAntiJoinExec extends HashJoinExec {
       // Reach here only when a hash bucket is found. Then, it checks all tuples in the found
bucket.
       // If it finds a matched tuple, it escapes the loop for all tuples in the hash bucket.
       notFound = true;
-      while (notFound && iterator.hasNext()) {
+      while (!context.isStopped() && notFound && iterator.hasNext()) {
         rightTuple = iterator.next();
         frameTuple.set(leftTuple, rightTuple);
         if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if the matched one is found

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index c1b6522..233ef92 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -138,7 +138,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
     Tuple rightTuple;
     boolean found = false;
 
-    while(!finished) {
+    while(!context.isStopped() && !finished) {
 
       if (shouldGetLeftTuple) { // initially, it is true.
         // getting new outer
@@ -204,7 +204,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
     Tuple tuple;
     Tuple keyTuple;
 
-    while ((tuple = rightChild.next()) != null) {
+    while (!context.isStopped() && (tuple = rightChild.next()) != null) {
       keyTuple = new VTuple(joinKeyPairs.size());
       for (int i = 0; i < rightKeyList.length; i++) {
         keyTuple.put(i, tuple.get(rightKeyList[i]));

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
index 5196a63..37c6d0e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
@@ -70,7 +70,7 @@ public class HashLeftSemiJoinExec extends HashJoinExec {
     Tuple rightTuple;
     boolean notFound;
 
-    while(!finished) {
+    while(!context.isStopped() && !finished) {
 
       // getting new outer
       leftTuple = leftChild.next(); // it comes from a disk

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
index 3c4949f..28974f9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
@@ -103,7 +103,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec
{
       int partId;
       int tupleCount = 0;
       long numRows = 0;
-      while ((tuple = child.next()) != null) {
+      while (!context.isStopped() && (tuple = child.next()) != null) {
         tupleCount++;
         numRows++;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
index f9f4351..e9a7c03 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
@@ -39,7 +39,7 @@ public class HavingExec extends UnaryPhysicalExec  {
   @Override
   public Tuple next() throws IOException {
     Tuple tuple;
-    while ((tuple = child.next()) != null) {
+    while (!context.isStopped() && (tuple = child.next()) != null) {
       if (qual.eval(inSchema, tuple).isTrue()) {
         return tuple;
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
index 13fec7b..c77313e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
@@ -51,7 +51,7 @@ public class MemSortExec extends SortExec {
 
     if (!sorted) {
       Tuple tuple;
-      while ((tuple = child.next()) != null) {
+      while (!context.isStopped() && (tuple = child.next()) != null) {
         tupleSlots.add(new VTuple(tuple));
       }
       

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
index cb2552b..3f2e431 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
@@ -109,7 +109,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
   public Tuple next() throws IOException {
     Tuple previous;
 
-    for (;;) {
+    while (!context.isStopped()) {
       boolean newRound = false;
       if((posRightTupleSlots == -1) && (posLeftTupleSlots == -1)) {
         newRound = true;
@@ -313,6 +313,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
         }
       } // the second if end false
     } // for
+    return null;
   }
 
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
index 13104ee..63f48ac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
@@ -102,7 +102,7 @@ public class MergeJoinExec extends BinaryPhysicalExec {
   public Tuple next() throws IOException {
     Tuple previous;
 
-    for (;;) {
+    while (!context.isStopped()) {
       if (!outerIterator.hasNext() && !innerIterator.hasNext()) {
         if(end){
           return null;
@@ -170,6 +170,7 @@ public class MergeJoinExec extends BinaryPhysicalExec {
         return outTuple;
       }
     }
+    return null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
index b5c6244..5e7ab98 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
@@ -67,7 +67,7 @@ public class NLJoinExec extends BinaryPhysicalExec {
   }
 
   public Tuple next() throws IOException {
-    for (;;) {
+    while (!context.isStopped()) {
       if (needNewOuter) {
         outerTuple = leftChild.next();
         if (outerTuple == null) {
@@ -94,6 +94,7 @@ public class NLJoinExec extends BinaryPhysicalExec {
         return outTuple;
       }
     }
+    return null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
index 8ff7570..7959d47 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
@@ -73,7 +73,7 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
   }
 
   public Tuple next() throws IOException {
-    for (;;) {
+    while (!context.isStopped()) {
       if (needNextRightTuple) {
         leftTuple = leftChild.next();
         if (leftTuple == null) {
@@ -112,6 +112,7 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
         return outTuple;
       }
     }
+    return null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
index 5297e2c..5692308 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
@@ -21,9 +21,8 @@ package org.apache.tajo.engine.planner.physical;
 import com.google.common.collect.Lists;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
@@ -70,7 +69,7 @@ public class PartitionMergeScanExec extends PhysicalExec {
   @Override
   public Tuple next() throws IOException {
     Tuple tuple;
-    while (currentScanner != null) {
+    while (!context.isStopped() && currentScanner != null) {
       tuple = currentScanner.next();
 
       if (tuple != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index 119f053..8da1a03 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -96,7 +96,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
     long offset;
 
 
-    while((tuple = child.next()) != null) {
+    while(!context.isStopped() && (tuple = child.next()) != null) {
       offset = appender.getOffset();
       appender.addTuple(tuple);
       keyTuple = new VTuple(keySchema.size());

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
index a02d00b..5e80b8f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -129,7 +129,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
   public Tuple next() throws IOException {
     Tuple previous;
 
-    for (;;) {
+    while (!context.isStopped()) {
       boolean newRound = false;
       if((posRightTupleSlots == -1) && (posLeftTupleSlots == -1)) {
         newRound = true;
@@ -339,6 +339,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
         }
       } // the second if end false
     } // for
+    return null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
index 9e84462..b9273fa 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
@@ -44,7 +44,7 @@ public class SelectionExec extends UnaryPhysicalExec  {
   @Override
   public Tuple next() throws IOException {
     Tuple tuple;
-    while ((tuple = child.next()) != null) {
+    while (!context.isStopped() && (tuple = child.next()) != null) {
       if (qual.eval(inSchema, tuple).isTrue()) {
         return tuple;
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 94cd4ed..15f17fd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -251,7 +251,7 @@ public class SeqScanExec extends PhysicalExec {
     initScanner(projected);
 
     List<Tuple> broadcastTupleCacheList = new ArrayList<Tuple>();
-    while (true) {
+    while (!context.isStopped()) {
       Tuple tuple = next();
       if (tuple != null) {
         broadcastTupleCacheList.add(tuple);

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
index f7c20fc..ca90b0e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
@@ -73,7 +73,7 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec
{
   @Override
   public Tuple next() throws IOException {
     Tuple tuple;
-    while((tuple = child.next()) != null) {
+    while(!context.isStopped() && (tuple = child.next()) != null) {
 
       fillKeyTuple(tuple, currentKey);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index 3d3da5c..5622699 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -115,7 +115,7 @@ public class StoreTableExec extends UnaryPhysicalExec {
    */
   @Override
   public Tuple next() throws IOException {
-    while((tuple = child.next()) != null) {
+    while(!context.isStopped() && (tuple = child.next()) != null) {
       appender.addTuple(tuple);
 
       if (maxPerFileSize > 0 && maxPerFileSize <= appender.getEstimatedOutputSize())
{

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
index 352ec46..df461c8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -80,8 +80,12 @@ public class QueryInProgress {
 
   public synchronized void kill() {
     getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
-    if(queryMasterRpcClient != null){
-      queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+    if (queryMasterRpcClient != null) {
+      try {
+        queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+      } catch (Throwable e) {
+        catchException(e);
+      }
     }
   }
 
@@ -165,7 +169,7 @@ public class QueryInProgress {
     }
   }
 
-  public void catchException(Exception e) {
+  public void catchException(Throwable e) {
     LOG.error(e.getMessage(), e);
     queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED);
     queryInfo.setLastMessage(StringUtils.stringifyException(e));

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 588b7ee..42ffd87 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -31,6 +31,7 @@ import org.apache.tajo.ipc.QueryCoordinatorProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.container.TajoContainer;
 import org.apache.tajo.master.container.TajoContainerId;
+import org.apache.tajo.master.event.TaskFatalErrorEvent;
 import org.apache.tajo.master.rm.TajoWorkerContainer;
 import org.apache.tajo.master.rm.TajoWorkerContainerId;
 import org.apache.tajo.querymaster.QueryMasterTask;
@@ -82,8 +83,9 @@ public class TajoContainerProxy extends ContainerProxy {
       tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class,
true);
       TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
       tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), NullCallback.get());
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
+    } catch (Throwable e) {
+      /* Worker RPC failure */
+      context.getEventHandler().handle(new TaskFatalErrorEvent(taskAttemptId, e.getMessage()));
     } finally {
       RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc);
     }
@@ -111,7 +113,7 @@ public class TajoContainerProxy extends ContainerProxy {
               .build();
 
       tajoWorkerRpcClient.startExecutionBlock(null, request, NullCallback.get());
-    } catch (Exception e) {
+    } catch (Throwable e) {
       LOG.error(e.getMessage(), e);
     } finally {
       RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc);
@@ -198,7 +200,7 @@ public class TajoContainerProxy extends ContainerProxy {
               .addAllContainerIds(containerIdProtos)
               .build(),
           NullCallback.get());
-    } catch (Exception e) {
+    } catch (Throwable e) {
       LOG.error(e.getMessage(), e);
     } finally {
       connPool.releaseConnection(tmClient);

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 7278317..dd408c9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -128,7 +128,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
     for(ContainerProxy eachProxy: list) {
       try {
         eachProxy.stopContainer();
-      } catch (Exception e) {
+      } catch (Throwable e) {
         LOG.warn(e.getMessage());
       }
     }
@@ -301,7 +301,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
 
         QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
         masterClientService.allocateWorkerResources(null, request, callBack);
-      } catch (Exception e) {
+      } catch (Throwable e) {
         LOG.error(e.getMessage(), e);
       } finally {
         connPool.releaseConnection(tmClient);
@@ -363,7 +363,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
               containerIds.add(eachContainer.getId());
             }
             TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId,
containerIds);
-          } catch (Exception e) {
+          } catch (Throwable e) {
             LOG.error(e.getMessage(), e);
           }
           return;

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 5f9c6ac..e9ad838 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -88,8 +88,6 @@ public class Task {
   private final Map<String, TableDesc> descs = Maps.newHashMap();
   private PhysicalExec executor;
   private boolean interQuery;
-  private boolean killed = false;
-  private boolean aborted = false;
   private Path inputTableBaseDir;
 
   private long startTime;
@@ -254,13 +252,11 @@ public class Task {
   }
 
   public void kill() {
-    killed = true;
-    context.stop();
     context.setState(TaskAttemptState.TA_KILLED);
+    context.stop();
   }
 
   public void abort() {
-    aborted = true;
     context.stop();
   }
 
@@ -299,7 +295,7 @@ public class Task {
   }
 
   public void updateProgress() {
-    if(killed || aborted){
+    if(context != null && context.isStopped()){
       return;
     }
 
@@ -403,12 +399,12 @@ public class Task {
           createPlan(context, plan);
       this.executor.init();
 
-      while(!killed && !aborted && executor.next() != null) {
+      while(!context.isStopped() && executor.next() != null) {
       }
     } catch (Throwable e) {
       error = e ;
       LOG.error(e.getMessage(), e);
-      aborted = true;
+      context.stop();
     } finally {
       if (executor != null) {
         try {
@@ -423,10 +419,10 @@ public class Task {
       executionBlockContext.completedTasksNum.incrementAndGet();
       context.getHashShuffleAppenderManager().finalizeTask(taskId);
       QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getQueryMasterStub();
-      if (killed || aborted) {
+      if (context.isStopped()) {
         context.setExecutorProgress(0.0f);
-        if(killed) {
-          context.setState(TaskAttemptState.TA_KILLED);
+
+        if(context.getState() == TaskAttemptState.TA_KILLED) {
           queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
           executionBlockContext.killedTasksNum.incrementAndGet();
         } else {
@@ -593,7 +589,7 @@ public class Task {
       int retryWaitTime = 1000; //sec
 
       try { // for releasing fetch latch
-        while(!killed && retryNum < maxRetryNum) {
+        while(!context.isStopped() && retryNum < maxRetryNum) {
           if (retryNum > 0) {
             try {
               Thread.sleep(retryWaitTime);
@@ -625,7 +621,7 @@ public class Task {
           if (retryNum == maxRetryNum) {
             LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded
(" + fetcher.getURI() + ")");
           }
-          aborted = true; // retry task
+          context.stop(); // retry task
           ctx.getFetchLatch().countDown();
         }
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 3092c47..1f2c325 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -70,7 +70,7 @@ public class TaskAttemptContext {
   /** a map of shuffled file outputs */
   private Map<Integer, String> shuffleFileOutputs;
   private File fetchIn;
-  private boolean stopped = false;
+  private volatile boolean stopped = false;
   private boolean interQuery = false;
   private Path outputPath;
   private DataChannel dataChannel;


Mime
View raw message