tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [02/18] tajo git commit: TAJO-1766: Improve the performance of cross join.
Date Tue, 25 Aug 2015 09:23:37 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Hash.plan
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Hash.plan b/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Hash.plan
index 1c2fd7a..c0de2a1 100644
--- a/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Hash.plan
+++ b/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Hash.plan
@@ -10,7 +10,7 @@ JOIN(8)(RIGHT_OUTER)
      => out schema: {(1) default.t3.id (INT4)}
      => in schema: {(2) default.t3.id (INT4), default.t3.name (TEXT)}
    JOIN(7)(RIGHT_OUTER)
-     => Join Cond: default.t1.id (INT4) = default.t2.id (INT4) AND concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33
+     => Join Cond: (default.t1.id (INT4) = default.t2.id (INT4) AND (concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33))
      => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)
      => out schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)}
      => in schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)}
@@ -29,72 +29,43 @@ Execution Block Graph (TERMINAL - eb_0000000000000_0000_000006)
 -------------------------------------------------------------------------------
 |-eb_0000000000000_0000_000006
    |-eb_0000000000000_0000_000005
-      |-eb_0000000000000_0000_000004
-      |-eb_0000000000000_0000_000003
 -------------------------------------------------------------------------------
 Order of Execution
 -------------------------------------------------------------------------------
-1: eb_0000000000000_0000_000003
-2: eb_0000000000000_0000_000004
-3: eb_0000000000000_0000_000005
-4: eb_0000000000000_0000_000006
+1: eb_0000000000000_0000_000005
+2: eb_0000000000000_0000_000006
 -------------------------------------------------------------------------------
 
 =======================================================
-Block Id: eb_0000000000000_0000_000003 [LEAF]
-=======================================================
-
-[Outgoing]
-[q_0000000000000_0000] 3 => 5 (type=HASH_SHUFFLE, key=default.t1.id (INT4), num=32)
-
-[Enforcers]
- 0: type=Broadcast, tables=default.t1
-
-JOIN(7)(RIGHT_OUTER)
-  => Join Cond: default.t1.id (INT4) = default.t2.id (INT4) AND concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33
-  => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)
-  => out schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)}
-  => in schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)}
-   SCAN(1) on default.jointable12 as t2
-     => target list: default.t2.id (INT4)
-     => out schema: {(1) default.t2.id (INT4)}
-     => in schema: {(2) default.t2.id (INT4), default.t2.name (TEXT)}
-   SCAN(0) on default.jointable11 as t1
-     => target list: default.t1.id (INT4), default.t1.name (TEXT)
-     => out schema: {(2) default.t1.id (INT4), default.t1.name (TEXT)}
-     => in schema: {(2) default.t1.id (INT4), default.t1.name (TEXT)}
-
-=======================================================
-Block Id: eb_0000000000000_0000_000004 [LEAF]
-=======================================================
-
-[Outgoing]
-[q_0000000000000_0000] 4 => 5 (type=HASH_SHUFFLE, key=default.t3.id (INT4), num=32)
-
-SCAN(3) on default.jointable13 as t3
-  => target list: default.t3.id (INT4)
-  => out schema: {(1) default.t3.id (INT4)}
-  => in schema: {(2) default.t3.id (INT4), default.t3.name (TEXT)}
-
-=======================================================
 Block Id: eb_0000000000000_0000_000005 [ROOT]
 =======================================================
 
-[Incoming]
-[q_0000000000000_0000] 3 => 5 (type=HASH_SHUFFLE, key=default.t1.id (INT4), num=32)
-[q_0000000000000_0000] 4 => 5 (type=HASH_SHUFFLE, key=default.t3.id (INT4), num=32)
+[Enforcers]
+ 0: type=Broadcast, tables=default.t2
+ 1: type=Broadcast, tables=default.t1
 
 JOIN(8)(RIGHT_OUTER)
   => Join Cond: default.t1.id (INT4) = default.t3.id (INT4)
   => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4), default.t3.id (INT4)
   => out schema: {(4) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4), default.t3.id (INT4)}
   => in schema: {(4) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4), default.t3.id (INT4)}
-   SCAN(13) on eb_0000000000000_0000_000004
+   SCAN(3) on default.jointable13 as t3
+     => target list: default.t3.id (INT4)
      => out schema: {(1) default.t3.id (INT4)}
-     => in schema: {(1) default.t3.id (INT4)}
-   SCAN(12) on eb_0000000000000_0000_000003
+     => in schema: {(2) default.t3.id (INT4), default.t3.name (TEXT)}
+   JOIN(7)(RIGHT_OUTER)
+     => Join Cond: (default.t1.id (INT4) = default.t2.id (INT4) AND (concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33))
+     => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)
      => out schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)}
      => in schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)}
+      SCAN(1) on default.jointable12 as t2
+        => target list: default.t2.id (INT4)
+        => out schema: {(1) default.t2.id (INT4)}
+        => in schema: {(2) default.t2.id (INT4), default.t2.name (TEXT)}
+      SCAN(0) on default.jointable11 as t1
+        => target list: default.t1.id (INT4), default.t1.name (TEXT)
+        => out schema: {(2) default.t1.id (INT4), default.t1.name (TEXT)}
+        => in schema: {(2) default.t1.id (INT4), default.t1.name (TEXT)}
 
 =======================================================
 Block Id: eb_0000000000000_0000_000006 [TERMINAL]

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Hash_NoBroadcast.plan
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Hash_NoBroadcast.plan b/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Hash_NoBroadcast.plan
index 5a589ff..fab3809 100644
--- a/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Hash_NoBroadcast.plan
+++ b/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Hash_NoBroadcast.plan
@@ -10,7 +10,7 @@ JOIN(8)(RIGHT_OUTER)
      => out schema: {(1) default.t3.id (INT4)}
      => in schema: {(2) default.t3.id (INT4), default.t3.name (TEXT)}
    JOIN(7)(RIGHT_OUTER)
-     => Join Cond: default.t1.id (INT4) = default.t2.id (INT4) AND concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33
+     => Join Cond: (default.t1.id (INT4) = default.t2.id (INT4) AND (concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33))
      => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)
      => out schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)}
      => in schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)}
@@ -80,7 +80,7 @@ Block Id: eb_0000000000000_0000_000003 [INTERMEDIATE]
 [q_0000000000000_0000] 3 => 5 (type=HASH_SHUFFLE, key=default.t1.id (INT4), num=32)
 
 JOIN(7)(RIGHT_OUTER)
-  => Join Cond: default.t1.id (INT4) = default.t2.id (INT4) AND concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33
+  => Join Cond: (default.t1.id (INT4) = default.t2.id (INT4) AND (concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33))
   => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)
   => out schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)}
   => in schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)}

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Sort.plan
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Sort.plan b/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Sort.plan
index 1c2fd7a..c0de2a1 100644
--- a/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Sort.plan
+++ b/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Sort.plan
@@ -10,7 +10,7 @@ JOIN(8)(RIGHT_OUTER)
      => out schema: {(1) default.t3.id (INT4)}
      => in schema: {(2) default.t3.id (INT4), default.t3.name (TEXT)}
    JOIN(7)(RIGHT_OUTER)
-     => Join Cond: default.t1.id (INT4) = default.t2.id (INT4) AND concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33
+     => Join Cond: (default.t1.id (INT4) = default.t2.id (INT4) AND (concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33))
      => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)
      => out schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)}
      => in schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)}
@@ -29,72 +29,43 @@ Execution Block Graph (TERMINAL - eb_0000000000000_0000_000006)
 -------------------------------------------------------------------------------
 |-eb_0000000000000_0000_000006
    |-eb_0000000000000_0000_000005
-      |-eb_0000000000000_0000_000004
-      |-eb_0000000000000_0000_000003
 -------------------------------------------------------------------------------
 Order of Execution
 -------------------------------------------------------------------------------
-1: eb_0000000000000_0000_000003
-2: eb_0000000000000_0000_000004
-3: eb_0000000000000_0000_000005
-4: eb_0000000000000_0000_000006
+1: eb_0000000000000_0000_000005
+2: eb_0000000000000_0000_000006
 -------------------------------------------------------------------------------
 
 =======================================================
-Block Id: eb_0000000000000_0000_000003 [LEAF]
-=======================================================
-
-[Outgoing]
-[q_0000000000000_0000] 3 => 5 (type=HASH_SHUFFLE, key=default.t1.id (INT4), num=32)
-
-[Enforcers]
- 0: type=Broadcast, tables=default.t1
-
-JOIN(7)(RIGHT_OUTER)
-  => Join Cond: default.t1.id (INT4) = default.t2.id (INT4) AND concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33
-  => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)
-  => out schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)}
-  => in schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)}
-   SCAN(1) on default.jointable12 as t2
-     => target list: default.t2.id (INT4)
-     => out schema: {(1) default.t2.id (INT4)}
-     => in schema: {(2) default.t2.id (INT4), default.t2.name (TEXT)}
-   SCAN(0) on default.jointable11 as t1
-     => target list: default.t1.id (INT4), default.t1.name (TEXT)
-     => out schema: {(2) default.t1.id (INT4), default.t1.name (TEXT)}
-     => in schema: {(2) default.t1.id (INT4), default.t1.name (TEXT)}
-
-=======================================================
-Block Id: eb_0000000000000_0000_000004 [LEAF]
-=======================================================
-
-[Outgoing]
-[q_0000000000000_0000] 4 => 5 (type=HASH_SHUFFLE, key=default.t3.id (INT4), num=32)
-
-SCAN(3) on default.jointable13 as t3
-  => target list: default.t3.id (INT4)
-  => out schema: {(1) default.t3.id (INT4)}
-  => in schema: {(2) default.t3.id (INT4), default.t3.name (TEXT)}
-
-=======================================================
 Block Id: eb_0000000000000_0000_000005 [ROOT]
 =======================================================
 
-[Incoming]
-[q_0000000000000_0000] 3 => 5 (type=HASH_SHUFFLE, key=default.t1.id (INT4), num=32)
-[q_0000000000000_0000] 4 => 5 (type=HASH_SHUFFLE, key=default.t3.id (INT4), num=32)
+[Enforcers]
+ 0: type=Broadcast, tables=default.t2
+ 1: type=Broadcast, tables=default.t1
 
 JOIN(8)(RIGHT_OUTER)
   => Join Cond: default.t1.id (INT4) = default.t3.id (INT4)
   => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4), default.t3.id (INT4)
   => out schema: {(4) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4), default.t3.id (INT4)}
   => in schema: {(4) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4), default.t3.id (INT4)}
-   SCAN(13) on eb_0000000000000_0000_000004
+   SCAN(3) on default.jointable13 as t3
+     => target list: default.t3.id (INT4)
      => out schema: {(1) default.t3.id (INT4)}
-     => in schema: {(1) default.t3.id (INT4)}
-   SCAN(12) on eb_0000000000000_0000_000003
+     => in schema: {(2) default.t3.id (INT4), default.t3.name (TEXT)}
+   JOIN(7)(RIGHT_OUTER)
+     => Join Cond: (default.t1.id (INT4) = default.t2.id (INT4) AND (concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33))
+     => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)
      => out schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)}
      => in schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)}
+      SCAN(1) on default.jointable12 as t2
+        => target list: default.t2.id (INT4)
+        => out schema: {(1) default.t2.id (INT4)}
+        => in schema: {(2) default.t2.id (INT4), default.t2.name (TEXT)}
+      SCAN(0) on default.jointable11 as t1
+        => target list: default.t1.id (INT4), default.t1.name (TEXT)
+        => out schema: {(2) default.t1.id (INT4), default.t1.name (TEXT)}
+        => in schema: {(2) default.t1.id (INT4), default.t1.name (TEXT)}
 
 =======================================================
 Block Id: eb_0000000000000_0000_000006 [TERMINAL]

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Sort_NoBroadcast.plan
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Sort_NoBroadcast.plan b/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Sort_NoBroadcast.plan
index 5a589ff..fab3809 100644
--- a/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Sort_NoBroadcast.plan
+++ b/tajo-core-tests/src/test/resources/results/TestOuterJoinQuery/testRightOuterJoinPredicationCaseByCase3.1.Sort_NoBroadcast.plan
@@ -10,7 +10,7 @@ JOIN(8)(RIGHT_OUTER)
      => out schema: {(1) default.t3.id (INT4)}
      => in schema: {(2) default.t3.id (INT4), default.t3.name (TEXT)}
    JOIN(7)(RIGHT_OUTER)
-     => Join Cond: default.t1.id (INT4) = default.t2.id (INT4) AND concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33
+     => Join Cond: (default.t1.id (INT4) = default.t2.id (INT4) AND (concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33))
      => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)
      => out schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)}
      => in schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)}
@@ -80,7 +80,7 @@ Block Id: eb_0000000000000_0000_000003 [INTERMEDIATE]
 [q_0000000000000_0000] 3 => 5 (type=HASH_SHUFFLE, key=default.t1.id (INT4), num=32)
 
 JOIN(7)(RIGHT_OUTER)
-  => Join Cond: default.t1.id (INT4) = default.t2.id (INT4) AND concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33
+  => Join Cond: (default.t1.id (INT4) = default.t2.id (INT4) AND (concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-11 OR concat(default.t1.name (TEXT),CAST (default.t2.id (INT4) AS TEXT)) = table11-33))
   => target list: default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)
   => out schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)}
   => in schema: {(3) default.t1.id (INT4), default.t1.name (TEXT), default.t2.id (INT4)}

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
index 1f878f1..33443da 100644
--- a/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
+++ b/tajo-core-tests/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
@@ -18,7 +18,8 @@ Available Session Variables:
 \set LC_MONETARY [text value] - Formatting of currency amounts
 \set LC_NUMERIC [text value] - Formatting of numbers
 \set LC_TIME [text value] - Formatting of dates and times
-\set BROADCAST_TABLE_SIZE_LIMIT [long value] - limited size (bytes) of broadcast table
+\set BROADCAST_NON_CROSS_JOIN_THRESHOLD [long value] - restriction for the total bytes of broadcasted table for non-cross join
+\set BROADCAST_CROSS_JOIN_THRESHOLD [long value] - restriction for the total bytes of broadcasted table for cross join
 \set JOIN_TASK_INPUT_SIZE [int value] - join task input size (mb) 
 \set SORT_TASK_INPUT_SIZE [int value] - sort task input size (mb)
 \set GROUPBY_TASK_INPUT_SIZE [int value] - group by task input size (mb)
@@ -41,4 +42,4 @@ Available Session Variables:
 \set ARITHABORT [true or false] - If true, a running query will be terminated when an overflow or divide-by-zero occurs.
 \set FETCH_ROWNUM [int value] - Sets the number of rows at a time from Master
 \set BLOCK_ON_RESULT [true or false] - Whether to block result set on query execution
-\set DEBUG_ENABLED [true or false] - (debug only) debug mode enabled
+\set DEBUG_ENABLED [true or false] - (debug only) debug mode enabled
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 5bbf3a9..b04bdc4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -330,20 +330,18 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
 
       switch (algorithm) {
-        case NESTED_LOOP_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
-          return new NLJoinExec(context, plan, leftExec, rightExec);
-        case BLOCK_NESTED_LOOP_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
-          return new BNLJoinExec(context, plan, leftExec, rightExec);
         default:
           // fallback algorithm
           LOG.error("Invalid Cross Join Algorithm Enforcer: " + algorithm.name());
-          return new BNLJoinExec(context, plan, leftExec, rightExec);
+          PhysicalExec [] orderedChilds = switchJoinSidesIfNecessary(context, plan, leftExec, rightExec);
+          return new HashJoinExec(context, plan, orderedChilds[1], orderedChilds[0]);
       }
 
     } else {
-      return new BNLJoinExec(context, plan, leftExec, rightExec);
+      LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
+      // returns two PhysicalExec. smaller one is 0, and larger one is 1.
+      PhysicalExec [] orderedChilds = switchJoinSidesIfNecessary(context, plan, leftExec, rightExec);
+      return new HashJoinExec(context, plan, orderedChilds[1], orderedChilds[0]);
     }
   }
 
@@ -356,12 +354,6 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
 
       switch (algorithm) {
-        case NESTED_LOOP_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
-          return new NLJoinExec(context, plan, leftExec, rightExec);
-        case BLOCK_NESTED_LOOP_JOIN:
-          LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
-          return new BNLJoinExec(context, plan, leftExec, rightExec);
         case IN_MEMORY_HASH_JOIN:
           LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
           // returns two PhysicalExec. smaller one is 0, and larger one is 1.
@@ -389,7 +381,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
    */
   @VisibleForTesting
   public PhysicalExec [] switchJoinSidesIfNecessary(TaskAttemptContext context, JoinNode plan,
-                                                     PhysicalExec left, PhysicalExec right) throws IOException {
+                                                    PhysicalExec left, PhysicalExec right) throws IOException {
     String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
     String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
     long leftSize = estimateSizeRecursive(context, leftLineage);

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
index 92ecadd..d67cee8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
@@ -302,10 +302,6 @@ public class Enforcer implements ProtoObject<EnforcerProto> {
       sb.append("type=Join,alg=");
       if (join.getAlgorithm() == JoinEnforce.JoinAlgorithm.MERGE_JOIN) {
         sb.append("merge_join");
-      } else if (join.getAlgorithm() == JoinEnforce.JoinAlgorithm.NESTED_LOOP_JOIN) {
-        sb.append("nested_loop");
-      } else if (join.getAlgorithm() == JoinEnforce.JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN) {
-        sb.append("block_nested_loop");
       } else if (join.getAlgorithm() == JoinEnforce.JoinAlgorithm.IN_MEMORY_HASH_JOIN) {
         sb.append("in_memory_hash");
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index 4f352c1..c71324d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -194,8 +194,10 @@ public class ExecutionBlock {
   }
 
   public void addBroadcastRelation(ScanNode relationNode) {
+    if (!broadcastRelations.containsKey(relationNode.getCanonicalName())) {
+      enforcer.addBroadcast(relationNode.getCanonicalName());
+    }
     broadcastRelations.put(relationNode.getCanonicalName(), relationNode);
-    enforcer.addBroadcast(relationNode.getCanonicalName());
   }
 
   public void removeBroadcastRelation(ScanNode relationNode) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
index 6f7b4c9..dbb92e1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.global.rewriter.rules;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.OverridableConf;
 import org.apache.tajo.SessionVars;
+import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.MasterPlan;
@@ -28,6 +29,7 @@ import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteRule;
 import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.joinorder.GreedyHeuristicJoinOrderAlgorithm;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.util.TUtil;
@@ -42,9 +44,6 @@ import java.util.*;
  * <h3>Broadcastable relation</h3>
  * A relation is broadcastable when its size is smaller than a given threshold.
  *
- * <h3>Assumetion</h3>
- * If every input of an execution block is broadcastable, the output of the execution block is also broadcastable.
- *
  * <h3>Rules to convert repartition join into broadcast join</h3>
  * <ul>
  *   <li>Given an EB containing a join and its child EBs, those EBs can be merged into a single EB if at least one child EB's output is broadcastable.</li>
@@ -65,24 +64,31 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
   private BroadcastJoinPlanBuilder planBuilder;
   private BroadcastJoinPlanFinalizer planFinalizer;
 
+  protected void init(MasterPlan plan, long thresholdForNonCrossJoin, long thresholdForCrossJoin,
+                      boolean broadcastForNonCrossJoinEnabled) {
+    GlobalPlanRewriteUtil.ParentFinder parentFinder = new GlobalPlanRewriteUtil.ParentFinder();
+    RelationSizeComparator relSizeComparator = new RelationSizeComparator();
+    planBuilder = new BroadcastJoinPlanBuilder(plan, relSizeComparator, parentFinder, thresholdForNonCrossJoin,
+        thresholdForCrossJoin, broadcastForNonCrossJoinEnabled);
+    planFinalizer = new BroadcastJoinPlanFinalizer(plan, relSizeComparator);
+  }
+
   @Override
   public String getName() {
-    return "BroadcastJoinRule";
+    return "Broadcast join rule";
   }
 
   @Override
   public boolean isEligible(OverridableConf queryContext, MasterPlan plan) {
-    if (queryContext.getBool(SessionVars.TEST_BROADCAST_JOIN_ENABLED)) {
+    long thresholdForNonCrossJoin = queryContext.getLong(SessionVars.BROADCAST_NON_CROSS_JOIN_THRESHOLD);
+    long thresholdForCrossJoin = queryContext.getLong(SessionVars.BROADCAST_CROSS_JOIN_THRESHOLD);
+    boolean broadcastJoinEnabled = queryContext.getBool(SessionVars.TEST_BROADCAST_JOIN_ENABLED);
+    if (broadcastJoinEnabled &&
+        (thresholdForNonCrossJoin > 0 || thresholdForCrossJoin > 0)) {
       for (LogicalPlan.QueryBlock block : plan.getLogicalPlan().getQueryBlocks()) {
         if (block.hasNode(NodeType.JOIN)) {
-          long broadcastSizeThreshold = queryContext.getLong(SessionVars.BROADCAST_TABLE_SIZE_LIMIT);
-          if (broadcastSizeThreshold > 0) {
-            GlobalPlanRewriteUtil.ParentFinder parentFinder = new GlobalPlanRewriteUtil.ParentFinder();
-            RelationSizeComparator relSizeComparator = new RelationSizeComparator();
-            planBuilder = new BroadcastJoinPlanBuilder(plan, relSizeComparator, parentFinder, broadcastSizeThreshold);
-            planFinalizer = new BroadcastJoinPlanFinalizer(plan, relSizeComparator);
-            return true;
-          }
+          init(plan, thresholdForNonCrossJoin, thresholdForCrossJoin, broadcastJoinEnabled);
+          return true;
         }
       }
     }
@@ -116,7 +122,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
    * {@Link BroadcastJoinPlanFinalizer} checks whether every input is the broadcast candidate or not.
    * If so, it removes the broadcast property from the largest relation.
    */
-  private static class BroadcastJoinPlanFinalizer implements DirectedGraphVisitor<ExecutionBlockId> {
+  private class BroadcastJoinPlanFinalizer implements DirectedGraphVisitor<ExecutionBlockId> {
     private final MasterPlan plan;
     private final RelationSizeComparator relSizeComparator;
 
@@ -141,18 +147,25 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
     }
   }
 
-  private static class BroadcastJoinPlanBuilder implements DirectedGraphVisitor<ExecutionBlockId> {
+  private class BroadcastJoinPlanBuilder implements DirectedGraphVisitor<ExecutionBlockId> {
     private final MasterPlan plan;
     private final RelationSizeComparator relSizeComparator;
-    private final long broadcastSizeThreshold;
+    private final long thresholdForNonCrossJoin;
+    private final long thresholdForCrossJoin;
+    private final boolean broadcastForNonCrossJoinEnabled;
     private final GlobalPlanRewriteUtil.ParentFinder parentFinder;
+    private final Map<ExecutionBlockId, Long> estimatedEbOutputSize = TUtil.newHashMap();
 
     public BroadcastJoinPlanBuilder(MasterPlan plan, RelationSizeComparator relationSizeComparator,
-                                    GlobalPlanRewriteUtil.ParentFinder parentFinder, long broadcastSizeThreshold) {
+                                    GlobalPlanRewriteUtil.ParentFinder parentFinder,
+                                    long thresholdForNonCrossJoin, long thresholdForCrossJoin,
+                                    boolean broadcastForNonCrossJoinEnabled) {
       this.plan = plan;
       this.relSizeComparator = relationSizeComparator;
-      this.broadcastSizeThreshold = broadcastSizeThreshold;
+      this.thresholdForNonCrossJoin = thresholdForNonCrossJoin;
+      this.thresholdForCrossJoin = thresholdForCrossJoin;
       this.parentFinder = parentFinder;
+      this.broadcastForNonCrossJoinEnabled = broadcastForNonCrossJoinEnabled;
     }
 
     @Override
@@ -166,28 +179,30 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
       }
     }
 
+    /**
+     * Estimate the result size of leaf blocks.
+     *
+     * @param current
+     */
     private void visitLeafNode(ExecutionBlock current) {
-      // At leaf execution blocks, find input relations who's size is smaller than the predefined threshold.
+      // Preserved-row relations must not be broadcasted to avoid data duplication.
       if (!current.isPreservedRow()) {
-        // Preserved-row relations must not be broadcasted to avoid data duplication.
-        boolean fullyBroadcastable = true;
+        long totalVolume = 0;
         for (ScanNode scanNode : current.getScanNodes()) {
-          if (GlobalPlanRewriteUtil.getTableVolume(scanNode) <= broadcastSizeThreshold) {
-            current.addBroadcastRelation(scanNode);
-          } else {
-            fullyBroadcastable = false;
-          }
-        }
-        if (fullyBroadcastable && current.getScanNodes().length == 1) {
-          try {
-            updateScanOfParentAsBroadcastable(plan, current);
-          } catch (NoScanNodeForChildEbException e) {
-            // This case is when the current has two or more inputs via union, and simply ignored.
-          }
+          totalVolume += GlobalPlanRewriteUtil.getTableVolume(scanNode);
         }
+        estimatedEbOutputSize.put(current.getId(), totalVolume);
       }
     }
 
+    /**
+     * 1. Based on the join type, find broadcastable relations of the child execution blocks.
+     * 2. Update the current block's inputs based on the broadcastability of the child blocks.
+     * 3. Merge child blocks and the current block if the scan to the corresponding child block is broadcastable.
+     * 4. Estimate the result size of the current block.
+     *
+     * @param current
+     */
     private void visitNonLeafNode(ExecutionBlock current) {
       // At non-leaf execution blocks, merge broadcastable children's plan with the current plan.
 
@@ -195,21 +210,31 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
         if (current.hasJoin()) {
           List<ExecutionBlock> childs = plan.getChilds(current);
           Map<ExecutionBlockId, ExecutionBlockId> unionScanMap = current.getUnionScanMap();
+          LogicalNode found = PlannerUtil.findTopNode(current.getPlan(), NodeType.JOIN);
+          if (found == null) {
+            throw new TajoInternalError("ExecutionBlock " + current.getId() + " doesn't have any join operator, " +
+                "but the master plan indicates that it has.");
+          }
+          JoinType joinType = ((JoinNode)found).getJoinType();
+
+          for (ExecutionBlock child : childs) {
+            if (!child.isPreservedRow()) {
+              updateBroadcastableRelForChildEb(child, joinType);
+              updateInputBasedOnChildEb(child, current);
+            }
+          }
 
           if (current.hasBroadcastRelation()) {
             // The current execution block and its every child are able to be merged.
             for (ExecutionBlock child : childs) {
               addUnionNodeIfNecessary(unionScanMap, plan, child, current);
-              mergeTwoPhaseJoin(plan, child, current);
+              mergeTwoPhaseJoinIfPossible(plan, child, current);
             }
 
             checkTotalSizeOfBroadcastableRelations(current);
 
-            // We assume that if every input of an execution block is broadcastable,
-            // the output of the execution block is also broadcastable.
-            if (!current.isPreservedRow() && isFullyBroadcastable(current)) {
-              updateScanOfParentAsBroadcastable(plan, current);
-            }
+            long outputVolume = estimateOutputVolume(current);
+            estimatedEbOutputSize.put(current.getId(), outputVolume);
           }
         } else {
           List<ScanNode> relations = TUtil.newList(current.getBroadcastRelations());
@@ -220,6 +245,134 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
       }
     }
 
+    private void updateInputBasedOnChildEb(ExecutionBlock child, ExecutionBlock parent) {
+      if (isFullyBroadcastable(child)) {
+        if (plan.isLeaf(child) && child.getScanNodes().length == 1) {
+          try {
+            updateScanOfParentAsBroadcastable(plan, child, parent);
+          } catch (NoScanNodeForChildEbException e) {
+            // This case is when the current has two or more inputs via union, and simply ignored.
+          }
+        } else {
+          updateScanOfParentAsBroadcastable(plan, child, parent);
+        }
+      }
+    }
+
+    private void updateBroadcastableRelForChildEb(ExecutionBlock child, JoinType joinType) {
+      long threshold = joinType == JoinType.CROSS ? thresholdForCrossJoin : thresholdForNonCrossJoin;
+      for (ScanNode scanNode : child.getScanNodes()) {
+        long volume = GlobalPlanRewriteUtil.getTableVolume(scanNode);
+        if (volume >= 0 && volume <= threshold) {
+          // If the child eb is already visited, the below line may update its broadcast relations.
+          // Furthermore, this operation might mark the preserved-row relation as the broadcast relation with outer join.
+          // However, the rewriting result is still valid. Please consider the following query:
+          //
+          // EX) SELECT ... FROM a LEFT OUTER JOIN b on ... LEFT OUTER JOIN c on ...
+          //
+          // and assume that three relations of a, b, and c are all broadcastable.
+          // The initial global plan will be as follow:
+          //
+          // EB 2)
+          //     LEFT OUTER JOIN
+          //        /       \
+          //       c        EB_1
+          // EB 1)
+          //     LEFT OUTER JOIN
+          //        /       \
+          //       a         b
+          //
+          // When visiting EB_1, the bellow line marks only b as the broadcast relation because a is the preserved-row
+          // relation. However, when visiting EB_2, it marks both a and b as the broadcast relations because EB_1 is
+          // the null-supplying relation which has a and b as its inputs.
+          // Thus, the rewriting result will be like
+          //
+          // EB 2) broadcast: a, b
+          //     LEFT OUTER JOIN
+          //        /       \
+          //       c     LEFT OUTER JOIN
+          //                /       \
+          //               a         b
+          //
+          // This plan returns the same result as a plan that broadcasts the result of the first join.
+          // Obviously, the result must be valid.
+          child.addBroadcastRelation(scanNode);
+        }
+      }
+    }
+
+    private long estimateOutputVolume(ExecutionBlock block) {
+      return estimateOutputVolumeInternal(PlannerUtil.<JoinNode>findTopNode(block.getPlan(), NodeType.JOIN));
+    }
+
+    private long estimateOutputVolumeInternal(LogicalNode node) throws TajoInternalError {
+
+      if (node instanceof RelationNode) {
+        switch (node.getType()) {
+          case INDEX_SCAN:
+          case SCAN:
+            ScanNode scanNode = (ScanNode) node;
+            if (scanNode.getTableDesc().getStats() == null) {
+              // TODO - this case means that data is not located in HDFS. So, we need additional
+              // broadcast method.
+              return Long.MAX_VALUE;
+            } else {
+              return scanNode.getTableDesc().getStats().getNumBytes();
+            }
+          case PARTITIONS_SCAN:
+            PartitionedTableScanNode pScanNode = (PartitionedTableScanNode) node;
+            if (pScanNode.getTableDesc().getStats() == null) {
+              // TODO - this case means that data is not located in HDFS. So, we need additional
+              // broadcast method.
+              return Long.MAX_VALUE;
+            } else {
+              // if there is no selected partition
+              if (pScanNode.getInputPaths() == null || pScanNode.getInputPaths().length == 0) {
+                return 0;
+              } else {
+                return pScanNode.getTableDesc().getStats().getNumBytes();
+              }
+            }
+          case TABLE_SUBQUERY:
+            return estimateOutputVolumeInternal(((TableSubQueryNode) node).getSubQuery());
+        }
+      } else if (node instanceof UnaryNode) {
+        return estimateOutputVolumeInternal(((UnaryNode) node).getChild());
+      } else if (node instanceof UnionNode) {
+        UnionNode binaryNode = (UnionNode) node;
+        return estimateOutputVolumeInternal(binaryNode.getLeftChild()) +
+            estimateOutputVolumeInternal(binaryNode.getRightChild());
+      } else if (node instanceof JoinNode) {
+        JoinNode joinNode = (JoinNode) node;
+        JoinSpec joinSpec = joinNode.getJoinSpec();
+        long leftChildVolume = estimateOutputVolumeInternal(joinNode.getLeftChild());
+        long rightChildVolume = estimateOutputVolumeInternal(joinNode.getRightChild());
+        switch (joinNode.getJoinType()) {
+          case CROSS:
+            return leftChildVolume * rightChildVolume;
+          case INNER:
+            return (long) (leftChildVolume * rightChildVolume *
+                Math.pow(GreedyHeuristicJoinOrderAlgorithm.DEFAULT_SELECTION_FACTOR, joinSpec.getPredicates().size()));
+          case LEFT_OUTER:
+            return leftChildVolume;
+          case RIGHT_OUTER:
+            return rightChildVolume;
+          case FULL_OUTER:
+            return leftChildVolume < rightChildVolume ? leftChildVolume : rightChildVolume;
+          case LEFT_ANTI:
+          case LEFT_SEMI:
+            return (long) (leftChildVolume *
+                Math.pow(GreedyHeuristicJoinOrderAlgorithm.DEFAULT_SELECTION_FACTOR, joinSpec.getPredicates().size()));
+          case RIGHT_ANTI:
+          case RIGHT_SEMI:
+            return (long) (rightChildVolume *
+                Math.pow(GreedyHeuristicJoinOrderAlgorithm.DEFAULT_SELECTION_FACTOR, joinSpec.getPredicates().size()));
+        }
+      }
+
+      throw new TajoInternalError("Invalid State at node " + node.getPID());
+    }
+
     /**
      * When the total size of broadcastable relations exceeds the threshold, enforce repartition join for large ones
      * in order to broadcast as many relations as possible.
@@ -227,21 +380,17 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
      * @param block
      */
     private void checkTotalSizeOfBroadcastableRelations(ExecutionBlock block) {
-      List<ScanNode> broadcastCandidates = TUtil.newList();
-      for (ScanNode scanNode : block.getScanNodes()) {
-        long estimatedRelationSize = GlobalPlanRewriteUtil.getTableVolume(scanNode);
-        if (estimatedRelationSize > 0 && estimatedRelationSize <= broadcastSizeThreshold) {
-          broadcastCandidates.add(scanNode);
-        }
-      }
+      List<ScanNode> broadcastCandidates = TUtil.newList(block.getBroadcastRelations());
       Collections.sort(broadcastCandidates, relSizeComparator);
 
       // Enforce broadcast for candidates in ascending order of relation size
       long totalBroadcastVolume = 0;
+      long largeThreshold = thresholdForCrossJoin > thresholdForNonCrossJoin ?
+          thresholdForCrossJoin : thresholdForNonCrossJoin;
       int i;
       for (i = 0; i < broadcastCandidates.size(); i++) {
         long volumeOfCandidate = GlobalPlanRewriteUtil.getTableVolume(broadcastCandidates.get(i));
-        if (totalBroadcastVolume + volumeOfCandidate > broadcastSizeThreshold) {
+        if (totalBroadcastVolume + volumeOfCandidate > largeThreshold) {
           break;
         }
         totalBroadcastVolume += volumeOfCandidate;
@@ -253,8 +402,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
       }
     }
 
-    private void updateScanOfParentAsBroadcastable(MasterPlan plan, ExecutionBlock current) {
-      ExecutionBlock parent = plan.getParent(current);
+    private void updateScanOfParentAsBroadcastable(MasterPlan plan, ExecutionBlock current, ExecutionBlock parent) {
       if (parent != null && !plan.isTerminal(parent)) {
         ScanNode scanForCurrent = findScanForChildEb(current, parent);
         parent.addBroadcastRelation(scanForCurrent);
@@ -269,7 +417,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
      * @param parent parent block who has join nodes
      * @return
      */
-    private ExecutionBlock mergeTwoPhaseJoin(MasterPlan plan, ExecutionBlock child, ExecutionBlock parent) {
+    private ExecutionBlock mergeTwoPhaseJoinIfPossible(MasterPlan plan, ExecutionBlock child, ExecutionBlock parent) {
       ScanNode scanForChild = findScanForChildEb(child, parent);
 
       parentFinder.set(scanForChild);

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java
index b14687d..df1f33b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanRewriteUtil.java
@@ -152,7 +152,7 @@ public class GlobalPlanRewriteUtil {
       return computeDescendentVolume(binaryNode.getLeftChild()) + computeDescendentVolume(binaryNode.getRightChild());
     }
 
-    throw new TajoInternalError("invalid state");
+    throw new TajoInternalError("Invalid State at node " + node.getPID());
   }
 
   public static class ParentFinder implements LogicalNodeVisitor {

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
deleted file mode 100644
index d28b7f6..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import org.apache.tajo.plan.logical.JoinNode;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.worker.TaskAttemptContext;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-public class BNLJoinExec extends CommonJoinExec {
-
-  private TupleList leftTupleSlots;
-  private TupleList rightTupleSlots;
-  private Iterator<Tuple> leftIterator;
-  private Iterator<Tuple> rightIterator;
-
-  private boolean leftEnd;
-  private boolean rightEnd;
-
-  // temporal tuples and states for nested loop join
-  private Tuple leftTuple = null;
-  private Tuple rightNext = null;
-
-  private final static int TUPLE_SLOT_SIZE = 10000;
-
-  public BNLJoinExec(final TaskAttemptContext context, final JoinNode plan,
-                     final PhysicalExec leftExec, PhysicalExec rightExec) {
-    super(context, plan, leftExec, rightExec);
-    this.leftTupleSlots = new TupleList(TUPLE_SLOT_SIZE);
-    this.rightTupleSlots = new TupleList(TUPLE_SLOT_SIZE);
-    this.leftIterator = leftTupleSlots.iterator();
-    this.rightIterator = rightTupleSlots.iterator();
-    this.rightEnd = false;
-    this.leftEnd = false;
-
-    // for projection
-    if (!plan.hasTargets()) {
-      plan.setTargets(PlannerUtil.schemaToTargets(outSchema));
-    }
-  }
-
-  public Tuple next() throws IOException {
-
-    if (leftTupleSlots.isEmpty()) {
-      for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
-        Tuple t = leftChild.next();
-        if (t == null) {
-          leftEnd = true;
-          break;
-        }
-        leftTupleSlots.add(t);
-      }
-      leftIterator = leftTupleSlots.iterator();
-      leftTuple = leftIterator.next();
-    }
-
-    if (rightTupleSlots.isEmpty()) {
-      for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
-        Tuple t = rightChild.next();
-        if (t == null) {
-          rightEnd = true;
-          break;
-        }
-        rightTupleSlots.add(t);
-      }
-      rightIterator = rightTupleSlots.iterator();
-    }
-
-    if((rightNext = rightChild.next()) == null){
-      rightEnd = true;
-    }
-
-    while (!context.isStopped()) {
-      if (!rightIterator.hasNext()) { // if leftIterator ended
-        if (leftIterator.hasNext()) { // if rightTupleslot remains
-          leftTuple = leftIterator.next();
-          rightIterator = rightTupleSlots.iterator();
-        } else {
-          if (rightEnd) {
-            rightChild.rescan();
-            rightEnd = false;
-
-            if (leftEnd) {
-              return null;
-            }
-            leftTupleSlots.clear();
-            for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
-              Tuple t = leftChild.next();
-              if (t == null) {
-                leftEnd = true;
-                break;
-              }
-              leftTupleSlots.add(t);
-            }
-            if (leftTupleSlots.isEmpty()) {
-              return null;
-            }
-            leftIterator = leftTupleSlots.iterator();
-            leftTuple = leftIterator.next();
-
-          } else {
-            leftIterator = leftTupleSlots.iterator();
-            leftTuple = leftIterator.next();
-          }
-
-          rightTupleSlots.clear();
-          if (rightNext != null) {
-            rightTupleSlots.add(rightNext);
-            for (int k = 1; k < TUPLE_SLOT_SIZE; k++) { // fill right
-              Tuple t = rightChild.next();
-              if (t == null) {
-                rightEnd = true;
-                break;
-              }
-              rightTupleSlots.add(t);
-            }
-          } else {
-            for (int k = 0; k < TUPLE_SLOT_SIZE; k++) { // fill right
-              Tuple t = rightChild.next();
-              if (t == null) {
-                rightEnd = true;
-                break;
-              }
-              rightTupleSlots.add(t);
-            }
-          }
-
-          if ((rightNext = rightChild.next()) == null) {
-            rightEnd = true;
-          }
-          rightIterator = rightTupleSlots.iterator();
-        }
-      }
-
-      frameTuple.set(leftTuple, rightIterator.next());
-      if (!hasJoinQual || joinQual.eval(frameTuple).isTrue()) {
-        return projector.eval(frameTuple);
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public void rescan() throws IOException {
-    super.rescan();
-    rightEnd = false;
-    rightTupleSlots.clear();
-    leftTupleSlots.clear();
-    rightIterator = rightTupleSlots.iterator();
-    leftIterator = leftTupleSlots.iterator();
-  }
-
-  @Override
-  public void close() throws IOException {
-    super.close();
-
-    rightTupleSlots.clear();
-    leftTupleSlots.clear();
-    rightTupleSlots = null;
-    leftTupleSlots = null;
-    rightIterator = null;
-    leftIterator = null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index a59960f..e171338 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -24,6 +24,7 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.engine.planner.Projector;
@@ -31,6 +32,7 @@ import org.apache.tajo.plan.Target;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.plan.expr.EvalTreeUtil;
 import org.apache.tajo.plan.logical.IndexScanNode;
+import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.index.bst.BSTIndex;
@@ -42,7 +44,7 @@ import java.net.URI;
 import java.util.HashSet;
 import java.util.Set;
 
-public class BSTIndexScanExec extends PhysicalExec {
+public class BSTIndexScanExec extends ScanExec {
   private IndexScanNode plan;
   private SeekableScanner fileScanner;
   
@@ -85,7 +87,7 @@ public class BSTIndexScanExec extends PhysicalExec {
 
     this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
 
-    Path indexPath = new Path(indexPrefix.toString(), context.getUniqueKeyFromFragments());
+    Path indexPath = new Path(indexPrefix.toString(), IndexExecutorUtil.getIndexFileName(fragment));
     this.reader = new BSTIndex(context.getConf()).
         getIndexReader(indexPath, keySchema, comparator);
     this.reader.open();
@@ -109,6 +111,21 @@ public class BSTIndexScanExec extends PhysicalExec {
   }
 
   @Override
+  public String getTableName() {
+    return plan.getTableName();
+  }
+
+  @Override
+  public String getCanonicalName() {
+    return plan.getCanonicalName();
+  }
+
+  @Override
+  public FragmentProto[] getFragments() {
+    return new FragmentProto[]{fragment};
+  }
+
+  @Override
   public void init() throws IOException {
     Schema projected;
 
@@ -151,6 +168,11 @@ public class BSTIndexScanExec extends PhysicalExec {
     }
   }
 
+  @Override
+  public ScanNode getScanNode() {
+    return plan;
+  }
+
   private void initScanner(Schema projected) throws IOException {
 
     // Why we should check nullity? See https://issues.apache.org/jira/browse/TAJO-1422

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
index c2d93bb..62af4e1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
@@ -29,9 +29,7 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx
 
     // Please keep all physical executors except for abstract class.
     // They should be ordered in an lexicography order of their names for easy code maintenance.
-    if (exec instanceof BNLJoinExec) {
-      return visitBNLJoin(context, (BNLJoinExec) exec, stack);
-    } else if (exec instanceof BSTIndexScanExec) {
+    if (exec instanceof BSTIndexScanExec) {
       return visitBSTIndexScan(context, (BSTIndexScanExec) exec, stack);
     } else if (exec instanceof EvalExprExec) {
       return visitEvalExpr(context, (EvalExprExec) exec, stack);
@@ -63,8 +61,6 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx
       return visitMergeFullOuterJoin(context, (MergeFullOuterJoinExec) exec, stack);
     } else if (exec instanceof MergeJoinExec) {
       return visitMergeJoin(context, (MergeJoinExec) exec, stack);
-    } else if (exec instanceof NLJoinExec) {
-      return visitNLJoin(context, (NLJoinExec) exec, stack);
     } else if (exec instanceof ProjectionExec) {
       return visitProjection(context, (ProjectionExec) exec, stack);
     } else if (exec instanceof RangeShuffleFileWriteExec) {
@@ -81,6 +77,8 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx
       return visitSortBasedColPartitionStore(context, (SortBasedColPartitionStoreExec) exec, stack);
     } else if (exec instanceof StoreTableExec) {
       return visitStoreTable(context, (StoreTableExec) exec, stack);
+    } else if (exec instanceof StoreIndexExec) {
+      return visitStoreIndex(context, (StoreIndexExec) exec, stack);
     }
 
     throw new PhysicalPlanningException("Unsupported Type: " + exec.getClass().getSimpleName());
@@ -104,12 +102,6 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx
   }
 
   @Override
-  public RESULT visitBNLJoin(CONTEXT context, BNLJoinExec exec, Stack<PhysicalExec> stack)
-      throws PhysicalPlanningException {
-    return visitBinaryExecutor(context, exec, stack);
-  }
-
-  @Override
   public RESULT visitBSTIndexScan(CONTEXT context, BSTIndexScanExec exec, Stack<PhysicalExec> stack)
       throws PhysicalPlanningException {
     return null;
@@ -206,12 +198,6 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx
   }
 
   @Override
-  public RESULT visitNLJoin(CONTEXT context, NLJoinExec exec, Stack<PhysicalExec> stack) throws
-      PhysicalPlanningException {
-    return visitBinaryExecutor(context, exec, stack);
-  }
-
-  @Override
   public RESULT visitProjection(CONTEXT context, ProjectionExec exec, Stack<PhysicalExec> stack)
       throws PhysicalPlanningException {
     return visitUnaryExecutor(context, exec, stack);
@@ -253,7 +239,14 @@ public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalEx
   }
 
   @Override
-  public RESULT visitStoreTable(CONTEXT context, StoreTableExec exec, Stack<PhysicalExec> stack) throws PhysicalPlanningException {
+  public RESULT visitStoreTable(CONTEXT context, StoreTableExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitUnaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitStoreIndex(CONTEXT context, StoreIndexExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
     return visitUnaryExecutor(context, exec, stack);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
index 0d64e65..a248d52 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
@@ -23,6 +23,7 @@ import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.engine.planner.KeyProjector;
 import org.apache.tajo.engine.utils.CacheHolder;
 import org.apache.tajo.engine.utils.TableCacheKey;
+import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.plan.logical.JoinNode;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.Tuple;
@@ -40,50 +41,76 @@ import java.util.List;
  */
 public abstract class CommonHashJoinExec<T> extends CommonJoinExec {
 
-  protected final List<Column[]> joinKeyPairs;
-
   // temporal tuples and states for nested loop join
   protected boolean first = true;
   protected TupleMap<T> tupleSlots;
 
   protected Iterator<Tuple> iterator;
 
+  protected final boolean isCrossJoin;
+  protected final List<Column[]> joinKeyPairs;
+
   protected final int rightNumCols;
   protected final int leftNumCols;
 
   protected final Column[] leftKeyList;
   protected final Column[] rightKeyList;
 
-  protected boolean finished;
   protected final KeyProjector leftKeyExtractor;
 
-  public CommonHashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) {
-    super(context, plan, outer, inner);
+  protected boolean finished;
 
-    // HashJoin only can manage equi join key pairs.
-    this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, outer.getSchema(),
-        inner.getSchema(), false);
+  protected TableStats tableStatsOfCachedRightChild = null;
 
-    leftKeyList = new Column[joinKeyPairs.size()];
-    rightKeyList = new Column[joinKeyPairs.size()];
+  public CommonHashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) {
+    super(context, plan, outer, inner);
 
-    for (int i = 0; i < joinKeyPairs.size(); i++) {
-      leftKeyList[i] = outer.getSchema().getColumn(joinKeyPairs.get(i)[0].getQualifiedName());
-      rightKeyList[i] = inner.getSchema().getColumn(joinKeyPairs.get(i)[1].getQualifiedName());
+    switch (plan.getJoinType()) {
+
+      case CROSS:
+        if (hasJoinQual) {
+          throw new TajoInternalError("Cross join cannot evaluate join conditions.");
+        } else {
+          isCrossJoin = true;
+          joinKeyPairs = null;
+          rightNumCols = leftNumCols = -1;
+          leftKeyList = rightKeyList = null;
+          leftKeyExtractor = null;
+        }
+        break;
+
+      case INNER:
+        // Other join types except INNER join can have empty join condition.
+        if (!hasJoinQual) {
+          throw new TajoInternalError("Inner join must have any join conditions.");
+        }
+      default:
+        isCrossJoin = false;
+        // HashJoin only can manage equi join key pairs.
+        this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, outer.getSchema(),
+            inner.getSchema(), false);
+
+        leftKeyList = new Column[joinKeyPairs.size()];
+        rightKeyList = new Column[joinKeyPairs.size()];
+
+        for (int i = 0; i < joinKeyPairs.size(); i++) {
+          leftKeyList[i] = outer.getSchema().getColumn(joinKeyPairs.get(i)[0].getQualifiedName());
+          rightKeyList[i] = inner.getSchema().getColumn(joinKeyPairs.get(i)[1].getQualifiedName());
+        }
+
+        leftNumCols = outer.getSchema().size();
+        rightNumCols = inner.getSchema().size();
+
+        leftKeyExtractor = new KeyProjector(leftSchema, leftKeyList);
+        break;
     }
-
-    leftNumCols = outer.getSchema().size();
-    rightNumCols = inner.getSchema().size();
-
-    leftKeyExtractor = new KeyProjector(leftSchema, leftKeyList);
   }
 
   protected void loadRightToHashTable() throws IOException {
     ScanExec scanExec = PhysicalPlanUtil.findExecutor(rightChild, ScanExec.class);
     if (scanExec.canBroadcast()) {
       /* If this table can broadcast, all tasks in a node will share the same cache */
-      TableCacheKey key = CacheHolder.BroadcastCacheHolder.getCacheKey(
-          context, scanExec.getCanonicalName(), scanExec.getFragments());
+      TableCacheKey key = CacheHolder.BroadcastCacheHolder.getCacheKey(context, scanExec);
       loadRightFromCache(key);
     } else {
       this.tupleSlots = convert(buildRightToHashTable(), false);
@@ -105,10 +132,31 @@ public abstract class CommonHashJoinExec<T> extends CommonJoinExec {
         sharedResource.addBroadcastCache(key, holder);
       }
     }
+    this.tableStatsOfCachedRightChild = holder.getTableStats();
     this.tupleSlots = convert(holder.getData(), true);
   }
 
   protected TupleMap<TupleList> buildRightToHashTable() throws IOException {
+    if (isCrossJoin) {
+      return buildRightToHashTableForCrossJoin();
+    } else {
+      return buildRightToHashTableForNonCrossJoin();
+    }
+  }
+
+  protected TupleMap<TupleList> buildRightToHashTableForCrossJoin() throws IOException {
+    Tuple tuple;
+    TupleMap<TupleList> map = new TupleMap<>(1);
+    TupleList tuples = new TupleList();
+
+    while (!context.isStopped() && (tuple = rightChild.next()) != null) {
+      tuples.add(tuple);
+    }
+    map.put(null, tuples);
+    return map;
+  }
+
+  protected TupleMap<TupleList> buildRightToHashTableForNonCrossJoin() throws IOException {
     Tuple tuple;
     TupleMap<TupleList> map = new TupleMap<TupleList>(100000);
     KeyProjector keyProjector = new KeyProjector(rightSchema, rightKeyList);
@@ -162,7 +210,8 @@ public abstract class CommonHashJoinExec<T> extends CommonJoinExec {
       inputStats.setNumRows(leftInputStats.getNumRows());
     }
 
-    TableStats rightInputStats = rightChild.getInputStats();
+    TableStats rightInputStats = tableStatsOfCachedRightChild == null ?
+        rightChild.getInputStats() : tableStatsOfCachedRightChild;
     if (rightInputStats != null) {
       inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes());
       inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes());

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
index c0a8622..c463028 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -130,7 +130,7 @@ public class HashFullOuterJoinExec extends CommonHashJoinExec<Pair<Boolean, Tupl
     TupleMap<Pair<Boolean, TupleList>> tuples = new TupleMap<Pair<Boolean, TupleList>>(hashed.size());
     for (Map.Entry<KeyTuple, TupleList> entry : hashed.entrySet()) {
       // flag: initially false (whether this join key had at least one match on the counter part)
-      tuples.putWihtoutKeyCopy(entry.getKey(), new Pair<Boolean, TupleList>(false, entry.getValue()));
+      tuples.putWihtoutKeyCopy(entry.getKey(), new Pair<>(false, entry.getValue()));
     }
     return tuples;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index bd817bb..cca3548 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -59,7 +59,12 @@ public class HashJoinExec extends CommonHashJoinExec<TupleList> {
       frameTuple.setLeft(leftTuple);
 
       // getting corresponding right
-      Iterable<Tuple> hashed = tupleSlots.get(leftKeyExtractor.project(leftTuple));
+      Iterable<Tuple> hashed;
+      if (!isCrossJoin) {
+        hashed = tupleSlots.get(leftKeyExtractor.project(leftTuple));
+      } else {
+        hashed = tupleSlots.get(null);
+      }
       Iterator<Tuple> rightTuples = rightFiltered(hashed);
       if (rightTuples.hasNext()) {
         iterator = rightTuples;

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java
new file mode 100644
index 0000000..3b8317f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+
+public class IndexExecutorUtil {
+
+  public static String getIndexFileName(FragmentProto fragmentProto) {
+    FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, fragmentProto);
+    StringBuilder sb = new StringBuilder();
+    sb.append(fileFragment.getPath().getName()).append(fileFragment.getStartKey()).append(fileFragment.getLength());
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
deleted file mode 100644
index d3214c3..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import org.apache.tajo.plan.logical.JoinNode;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.worker.TaskAttemptContext;
-
-import java.io.IOException;
-
-public class NLJoinExec extends CommonJoinExec {
-
-  // temporal tuples and states for nested loop join
-  private boolean needNewOuter;
-  private Tuple outerTuple = null;
-  private Tuple innerTuple = null;
-
-  public NLJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
-      PhysicalExec inner) {
-    super(context, plan, outer, inner);
-    // for join
-    needNewOuter = true;
-  }
-
-  public Tuple next() throws IOException {
-    while (!context.isStopped()) {
-      if (needNewOuter) {
-        outerTuple = leftChild.next();
-        if (outerTuple == null) {
-          return null;
-        }
-        needNewOuter = false;
-      }
-
-      innerTuple = rightChild.next();
-      if (innerTuple == null) {
-        needNewOuter = true;
-        rightChild.rescan();
-        continue;
-      }
-
-      frameTuple.set(outerTuple, innerTuple);
-      if (hasJoinQual) {
-        if (joinQual.eval(frameTuple).isTrue()) {
-          return projector.eval(frameTuple);
-        }
-      } else {
-        return projector.eval(frameTuple);
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public void rescan() throws IOException {
-    super.rescan();
-    needNewOuter = true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
index c1a451a..2b6191e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
@@ -68,6 +68,11 @@ public class PartitionMergeScanExec extends ScanExec {
     super.init();
   }
 
+  @Override
+  public ScanNode getScanNode() {
+    return plan;
+  }
+
   private void initScanExecutors() throws IOException {
     if (scanners.size() > 0) {
       iterator = scanners.iterator();

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
index c4d90a5..554c31e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
@@ -24,9 +24,6 @@ import java.util.Stack;
 
 public interface PhysicalExecutorVisitor<CONTEXT, RESULT> {
 
-  RESULT visitBNLJoin(CONTEXT context, BNLJoinExec exec, Stack<PhysicalExec> stack)
-      throws PhysicalPlanningException;
-
   RESULT visitBSTIndexScan(CONTEXT context, BSTIndexScanExec exec, Stack<PhysicalExec> stack)
       throws PhysicalPlanningException;
 
@@ -76,9 +73,6 @@ public interface PhysicalExecutorVisitor<CONTEXT, RESULT> {
   RESULT visitMergeJoin(CONTEXT context, MergeJoinExec exec, Stack<PhysicalExec> stack)
       throws PhysicalPlanningException;
 
-  RESULT visitNLJoin(CONTEXT context, NLJoinExec exec, Stack<PhysicalExec> stack)
-      throws PhysicalPlanningException;
-
   RESULT visitProjection(CONTEXT context, ProjectionExec exec, Stack<PhysicalExec> stack)
       throws PhysicalPlanningException;
 
@@ -103,4 +97,7 @@ public interface PhysicalExecutorVisitor<CONTEXT, RESULT> {
 
   RESULT visitStoreTable(CONTEXT context, StoreTableExec exec, Stack<PhysicalExec> stack)
       throws PhysicalPlanningException;
+
+  RESULT visitStoreIndex(CONTEXT context, StoreIndexExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException;
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java
index 5cca4c5..45379bb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
 import org.apache.tajo.worker.TaskAttemptContext;
 
@@ -49,6 +50,8 @@ public abstract class ScanExec extends PhysicalExec {
     super.init();
   }
 
+  public abstract ScanNode getScanNode();
+
   public boolean canBroadcast() {
     return canBroadcast;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index b49fa40..1ecabf1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -211,6 +211,11 @@ public class SeqScanExec extends ScanExec {
   }
 
   @Override
+  public ScanNode getScanNode() {
+    return plan;
+  }
+
+  @Override
   protected void compile() throws CompilationError {
     if (plan.hasQual()) {
       qual = context.getPrecompiledEval(inSchema, qual);

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
index f9db842..fed1d5c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
@@ -27,6 +27,7 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.plan.logical.CreateIndexNode;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.*;
@@ -65,8 +66,15 @@ public class StoreIndexExec extends UnaryPhysicalExec {
       indexKeys[i] = inSchema.getColumnId(col.getQualifiedName());
     }
 
+    // TODO: this line should be improved to allow multiple scan executors.
+    ScanExec scanExec = PhysicalPlanUtil.findExecutor(this, ScanExec.class);
+    if (scanExec == null) {
+      throw new TajoInternalError("Cannot find scan executors.");
+    }
+
     TajoConf conf = context.getConf();
-    Path indexPath = new Path(logicalPlan.getIndexPath().toString(), context.getUniqueKeyFromFragments());
+    Path indexPath = new Path(logicalPlan.getIndexPath().toString(),
+        IndexExecutorUtil.getIndexFileName(scanExec.getFragments()[0]));
     // TODO: Create factory using reflection
     BSTIndex bst = new BSTIndex(conf);
     this.comparator = new BaseTupleComparator(keySchema, sortSpecs);

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
index 05936be..6eda7e8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java
@@ -18,8 +18,10 @@
 
 package org.apache.tajo.engine.utils;
 
+import org.apache.tajo.QueryId;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.engine.planner.physical.ScanExec;
 import org.apache.tajo.engine.planner.physical.TupleList;
 import org.apache.tajo.engine.planner.physical.TupleMap;
 import org.apache.tajo.storage.fragment.Fragment;
@@ -77,19 +79,17 @@ public interface CacheHolder<T> {
       if(rowBlock != null) rowBlock.release();
     }
 
-    public static TableCacheKey getCacheKey(TaskAttemptContext ctx, String canonicalName,
-                                                 CatalogProtos.FragmentProto[] fragments) throws IOException {
-      String pathNameKey = "";
-      if (fragments != null) {
-        StringBuilder stringBuilder = new StringBuilder();
-        for (CatalogProtos.FragmentProto f : fragments) {
-          Fragment fragement = FragmentConvertor.convert(ctx.getConf(), f);
-          stringBuilder.append(fragement.getKey());
-        }
-        pathNameKey = stringBuilder.toString();
-      }
-
-      return new TableCacheKey(ctx.getTaskId().getTaskId().getExecutionBlockId().toString(), canonicalName, pathNameKey);
+    public static TableCacheKey getCacheKey(TaskAttemptContext ctx, ScanExec scanExec) throws IOException {
+
+      return new TableCacheKey(ctx.getTaskId().getTaskId().getExecutionBlockId().toString(),
+          scanExec.getCanonicalName(), getUniqueKey(ctx, scanExec));
+    }
+
+    public static String getUniqueKey(TaskAttemptContext context, ScanExec scanExec) {
+      QueryId queryId = context.getTaskId().getTaskId().getExecutionBlockId().getQueryId();
+      int pid = scanExec.getScanNode().getPID();
+
+      return queryId.toString() + "_" + pid;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index aee7972..d651154 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -49,10 +49,7 @@ import org.apache.tajo.plan.logical.InsertNode;
 import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.verifier.LogicalPlanVerifier;
-import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier;
-import org.apache.tajo.plan.verifier.SyntaxErrorUtil;
-import org.apache.tajo.plan.verifier.VerificationState;
+import org.apache.tajo.plan.verifier.*;
 import org.apache.tajo.session.Session;
 import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.util.CommonTestingUtil;
@@ -75,6 +72,7 @@ public class GlobalEngine extends AbstractService {
   private LogicalPlanner planner;
   private LogicalOptimizer optimizer;
   private LogicalPlanVerifier annotatedPlanVerifier;
+  private PostLogicalPlanVerifier postLogicalPlanVerifier;
 
   private QueryExecutor queryExecutor;
   private DDLExecutor ddlExecutor;
@@ -96,6 +94,7 @@ public class GlobalEngine extends AbstractService {
       // Access path rewriter is enabled only in QueryMasterTask
       optimizer = new LogicalOptimizer(context.getConf(), context.getCatalog());
       annotatedPlanVerifier = new LogicalPlanVerifier();
+      postLogicalPlanVerifier = new PostLogicalPlanVerifier();
     } catch (Throwable t) {
       LOG.error(t.getMessage(), t);
       throw new RuntimeException(t);
@@ -267,7 +266,6 @@ public class GlobalEngine extends AbstractService {
     VerificationState state = new VerificationState();
     preVerifier.verify(queryContext, state, expression);
     if (!state.verified()) {
-
       for (Throwable error : state.getErrors()) {
         throw error;
       }
@@ -294,6 +292,13 @@ public class GlobalEngine extends AbstractService {
       }
     }
 
+    postLogicalPlanVerifier.verify(queryContext.getLong(SessionVars.BROADCAST_CROSS_JOIN_THRESHOLD), state, plan);
+    if (!state.verified()) {
+      for (Throwable error : state.getErrors()) {
+        throw error;
+      }
+    }
+
     return plan;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
index fc7e0e3..e209538 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
@@ -26,15 +26,13 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol;
-import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.QueryCoordinatorProtocolService;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.rm.NodeStatus;
 import org.apache.tajo.master.scheduler.event.ResourceReserveSchedulerEvent;
 import org.apache.tajo.rpc.AsyncRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
 import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.ProtoUtil;
 
 import java.net.InetSocketAddress;
 import java.util.Collection;
@@ -128,7 +126,7 @@ public class QueryCoordinatorService extends AbstractService {
      */
     @Override
     public void getAllWorkers(RpcController controller, PrimitiveProtos.NullProto request,
-                                     RpcCallback<WorkerConnectionsResponse> done) {
+                              RpcCallback<WorkerConnectionsResponse> done) {
 
       WorkerConnectionsResponse.Builder builder = WorkerConnectionsResponse.newBuilder();
       Collection<NodeStatus> nodeStatuses = context.getResourceManager().getRMContext().getNodes().values();

http://git-wip-us.apache.org/repos/asf/tajo/blob/13f42cf4/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
index e22663a..b848876 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -25,6 +25,7 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.ResourceProtos.AllocationResourceProto;
 import org.apache.tajo.ResourceProtos.QueryExecutionRequest;
 import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
@@ -259,12 +260,12 @@ public class QueryInProgress {
       // Update diagnosis message
       if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
         this.queryInfo.setLastMessage(queryInfo.getLastMessage());
-        LOG.info(queryId + queryInfo.getLastMessage());
       }
 
       // if any error occurs, print outs the error message
-      if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
-        LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
+      if (this.queryInfo.getQueryState() == QueryState.QUERY_FAILED ||
+          this.queryInfo.getQueryState() == QueryState.QUERY_ERROR) {
+        LOG.warn(queryId + " is stopped because " + queryInfo.getLastMessage());
       }
 
       // terminal state will let client to retrieve a query result


Mime
View raw message