tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [1/2] git commit: TAJO-867: OUTER JOIN with empty result subquery produces a wrong result. (Hyoungjun Kim via hyunsik)
Date Fri, 27 Jun 2014 05:09:01 GMT
Repository: tajo
Updated Branches:
  refs/heads/master d5ef39949 -> 39645597a


TAJO-867: OUTER JOIN with empty result subquery produces a wrong result. (Hyoungjun Kim via
hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/cdd35887
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/cdd35887
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/cdd35887

Branch: refs/heads/master
Commit: cdd358879051a19a421a9441550afbeac21c5709
Parents: 6cfd448
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Fri Jun 27 13:53:46 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Fri Jun 27 13:53:46 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../tajo/cli/ExecExternalShellCommand.java      |  22 ++--
 .../java/org/apache/tajo/conf/TajoConf.java     |   5 +-
 .../planner/physical/JoinTupleComparator.java   |  13 +-
 .../java/org/apache/tajo/master/TajoMaster.java |   8 +-
 .../tajo/master/querymaster/Repartitioner.java  |  37 +++++-
 .../tajo/master/querymaster/SubQuery.java       |   4 +
 .../java/org/apache/tajo/worker/TajoWorker.java |   8 +-
 .../org/apache/tajo/TajoTestingCluster.java     |  77 +++++++----
 .../apache/tajo/engine/query/TestJoinQuery.java | 130 +++++++++++++++++--
 .../TestJoinQuery/testWhereClauseJoin1.sql      |   2 +-
 .../TestJoinQuery/testWhereClauseJoin2.sql      |   2 +-
 .../TestJoinQuery/testWhereClauseJoin3.sql      |   2 +-
 .../TestJoinQuery/testWhereClauseJoin4.sql      |   2 +-
 .../java/org/apache/tajo/storage/RawFile.java   |   1 -
 15 files changed, 255 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/cdd35887/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 6389bf9..7342ae7 100644
--- a/CHANGES
+++ b/CHANGES
@@ -74,6 +74,9 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-867: OUTER JOIN with empty result subquery produces a wrong result.
+    (Hyoungjun Kim via hyunsik)
+
     TAJO-881: JOIN with union query occurs NPE. (Hyoungjun Kim via hyunsik)
 
     TAJO-884: complex join conditions should be supported in ON clause.

http://git-wip-us.apache.org/repos/asf/tajo/blob/cdd35887/tajo-client/src/main/java/org/apache/tajo/cli/ExecExternalShellCommand.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/ExecExternalShellCommand.java b/tajo-client/src/main/java/org/apache/tajo/cli/ExecExternalShellCommand.java
index 2a9805f..628f8ba 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/ExecExternalShellCommand.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/ExecExternalShellCommand.java
@@ -54,16 +54,22 @@ public class ExecExternalShellCommand extends TajoShellCommand {
 
     CountDownLatch latch = new CountDownLatch(2);
     Process process = Runtime.getRuntime().exec(execCommand);
-    InputStreamConsoleWriter inWriter = new InputStreamConsoleWriter(process.getInputStream(),
sout, "", latch);
-    InputStreamConsoleWriter errWriter = new InputStreamConsoleWriter(process.getErrorStream(),
sout, "ERROR: ", latch);
+    try {
+      InputStreamConsoleWriter inWriter = new InputStreamConsoleWriter(process.getInputStream(),
sout, "", latch);
+      InputStreamConsoleWriter errWriter = new InputStreamConsoleWriter(process.getErrorStream(),
sout, "ERROR: ", latch);
 
-    inWriter.start();
-    errWriter.start();
+      inWriter.start();
+      errWriter.start();
 
-    int processResult = process.waitFor();
-    latch.await();
-    if (processResult != 0) {
-      throw new IOException("ERROR: Failed with exit code = " + processResult);
+      int processResult = process.waitFor();
+      latch.await();
+      if (processResult != 0) {
+        throw new IOException("ERROR: Failed with exit code = " + processResult);
+      }
+    } finally {
+      org.apache.commons.io.IOUtils.closeQuietly(process.getInputStream());
+      org.apache.commons.io.IOUtils.closeQuietly(process.getOutputStream());
+      org.apache.commons.io.IOUtils.closeQuietly(process.getErrorStream());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/cdd35887/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 96a2691..6298d27 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -336,7 +336,10 @@ public class TajoConf extends Configuration {
     OPTIMIZER_JOIN_ENABLE("tajo.optimizer.join.enable", true),
 
     // DEBUG OPTION
-    TAJO_DEBUG("tajo.debug", false)
+    TAJO_DEBUG("tajo.debug", false),
+
+    // ONLY FOR TESTCASE
+    TESTCASE_MIN_TASK_NUM("tajo.testcase.min.task.num", -1)
     ;
 
     public final String varname;

http://git-wip-us.apache.org/repos/asf/tajo/blob/cdd35887/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java
index 0d4c47b..a59f8d9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/JoinTupleComparator.java
@@ -61,8 +61,17 @@ public class JoinTupleComparator implements Comparator<Tuple> {
   @Override
   public int compare(Tuple outerTuple, Tuple innerTuple) {
     for (int i = 0; i < numSortKey; i++) {
-      outer = outerTuple.get(outerSortKeyIds[i]);
-      inner = innerTuple.get(innerSortKeyIds[i]);
+      if (outerTuple == null) {
+        outer = NullDatum.get();
+      } else {
+        outer = outerTuple.get(outerSortKeyIds[i]);
+      }
+
+      if (innerTuple == null) {
+        inner = NullDatum.get();
+      } else {
+        inner = innerTuple.get(innerSortKeyIds[i]);
+      }
 
       if (outer instanceof NullDatum || inner instanceof NullDatum) {
         if (!outer.equals(inner)) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/cdd35887/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index dfae300..0962ca5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -537,8 +537,9 @@ public class TajoMaster extends CompositeService {
 
   public static List<File> getMountPath() throws Exception {
     BufferedReader mountOutput = null;
+    Process mountProcess = null;
     try {
-      Process mountProcess = Runtime.getRuntime ().exec("mount");
+      mountProcess = Runtime.getRuntime ().exec("mount");
       mountOutput = new BufferedReader(new InputStreamReader(mountProcess.getInputStream()));
       List<File> mountPaths = new ArrayList<File>();
       while (true) {
@@ -560,6 +561,11 @@ public class TajoMaster extends CompositeService {
       if(mountOutput != null) {
         mountOutput.close();
       }
+      if (mountProcess != null) {
+        org.apache.commons.io.IOUtils.closeQuietly(mountProcess.getInputStream());
+        org.apache.commons.io.IOUtils.closeQuietly(mountProcess.getOutputStream());
+        org.apache.commons.io.IOUtils.closeQuietly(mountProcess.getErrorStream());
+      }
     }
   }
   public static void main(String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/tajo/blob/cdd35887/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 6373bb4..93e40fd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -111,11 +111,10 @@ public class Repartitioner {
       }
     }
 
-    // If one of inner join tables has no input data,
-    // it should return zero rows.
+    // If one of inner join tables has no input data, it should return zero rows.
     JoinNode joinNode = PlannerUtil.findMostBottomNode(execBlock.getPlan(), NodeType.JOIN);
     if (joinNode != null) {
-      if ( (joinNode.getJoinType().equals(JoinType.INNER))) {
+      if ( (joinNode.getJoinType() == JoinType.INNER)) {
         for (int i = 0; i < stats.length; i++) {
           if (stats[i] == 0) {
             return;
@@ -124,6 +123,36 @@ public class Repartitioner {
       }
     }
 
+    // If node is outer join and a preserved relation is empty, it should return zero rows.
+    joinNode = PlannerUtil.findTopNode(execBlock.getPlan(), NodeType.JOIN);
+    if (joinNode != null) {
+      // find left top scan node
+      ScanNode leftScanNode = PlannerUtil.findTopNode(joinNode.getLeftChild(), NodeType.SCAN);
+      ScanNode rightScanNode = PlannerUtil.findTopNode(joinNode.getRightChild(), NodeType.SCAN);
+
+      long leftStats = -1;
+      long rightStats = -1;
+      if (stats.length == 2) {
+        for (int i = 0; i < stats.length; i++) {
+          if (scans[i].equals(leftScanNode)) {
+            leftStats = stats[i];
+          } else if (scans[i].equals(rightScanNode)) {
+            rightStats = stats[i];
+          }
+        }
+        if (joinNode.getJoinType() == JoinType.LEFT_OUTER) {
+          if (leftStats == 0) {
+            return;
+          }
+        }
+        if (joinNode.getJoinType() == JoinType.RIGHT_OUTER) {
+          if (rightStats == 0) {
+            return;
+          }
+        }
+      }
+    }
+
     // Assigning either fragments or fetch urls to query units
     boolean isAllBroadcastTable = true;
     int baseScanIdx = -1;
@@ -360,7 +389,7 @@ public class Repartitioner {
       String mergedKey = partition.getEbId().toString() + "," + partition.getPullHost();
 
       if (mergedPartitions.containsKey(mergedKey)) {
-        FetchImpl fetch = mergedPartitions.get(partition.getPullHost());
+        FetchImpl fetch = mergedPartitions.get(mergedKey);
         fetch.addPart(partition.getTaskId(), partition.getAttemptId());
       } else {
         // In some cases like union each IntermediateEntry has different EBID.

http://git-wip-us.apache.org/repos/asf/tajo/blob/cdd35887/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index be0c624..d4c94e8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -749,6 +749,10 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
         // determine the number of task
         taskNum = Math.min(taskNum, slots);
+        if (conf.getIntVar(ConfVars.TESTCASE_MIN_TASK_NUM) > 0) {
+          taskNum = conf.getIntVar(ConfVars.TESTCASE_MIN_TASK_NUM);
+          LOG.warn("!!!!! TESTCASE MODE !!!!!");
+        }
         LOG.info(subQuery.getId() + ", The determined number of join partitions is " + taskNum);
 
         // The shuffle output numbers of join may be inconsistent by execution block order.

http://git-wip-us.apache.org/repos/asf/tajo/blob/cdd35887/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 3768edf..ed78e49 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -540,8 +540,9 @@ public class TajoWorker extends CompositeService {
 
   public static List<File> getMountPath() throws IOException {
     BufferedReader mountOutput = null;
+    Process mountProcess = null;
     try {
-      Process mountProcess = Runtime.getRuntime ().exec("mount");
+      mountProcess = Runtime.getRuntime ().exec("mount");
       mountOutput = new BufferedReader(new InputStreamReader(mountProcess.getInputStream()));
       List<File> mountPaths = new ArrayList<File>();
       while (true) {
@@ -563,6 +564,11 @@ public class TajoWorker extends CompositeService {
       if(mountOutput != null) {
         mountOutput.close();
       }
+      if (mountProcess != null) {
+        org.apache.commons.io.IOUtils.closeQuietly(mountProcess.getInputStream());
+        org.apache.commons.io.IOUtils.closeQuietly(mountProcess.getOutputStream());
+        org.apache.commons.io.IOUtils.closeQuietly(mountProcess.getErrorStream());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/cdd35887/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 5cb98ba..38462bb 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -573,21 +573,29 @@ public class TajoTestingCluster {
     }
     TajoConf conf = util.getConfiguration();
     TajoClient client = new TajoClient(conf);
-
-    FileSystem fs = util.getDefaultFileSystem();
-    Path rootDir = util.getMaster().
-        getStorageManager().getWarehouseDir();
-    fs.mkdirs(rootDir);
-    for (int i = 0; i < names.length; i++) {
-      createTable(names[i], schemas[i], tableOption, tables[i]);
+    try {
+      FileSystem fs = util.getDefaultFileSystem();
+      Path rootDir = util.getMaster().
+          getStorageManager().getWarehouseDir();
+      fs.mkdirs(rootDir);
+      for (int i = 0; i < names.length; i++) {
+        createTable(names[i], schemas[i], tableOption, tables[i]);
+      }
+      Thread.sleep(1000);
+      ResultSet res = client.executeQueryAndGetResult(query);
+      return res;
+    } finally {
+      client.close();
     }
-    Thread.sleep(1000);
-    ResultSet res = client.executeQueryAndGetResult(query);
-    return res;
   }
 
   public static void createTable(String tableName, Schema schema,
                                  KeyValueSet tableOption, String[] tableDatas) throws Exception
{
+    createTable(tableName, schema, tableOption, tableDatas, 1);
+  }
+
+  public static void createTable(String tableName, Schema schema,
+                                 KeyValueSet tableOption, String[] tableDatas, int numDataFiles)
throws Exception {
     TpchTestBase instance = TpchTestBase.getInstance();
     TajoTestingCluster util = instance.getTestingCluster();
     while(true) {
@@ -598,25 +606,40 @@ public class TajoTestingCluster {
     }
     TajoConf conf = util.getConfiguration();
     TajoClient client = new TajoClient(conf);
-
-    FileSystem fs = util.getDefaultFileSystem();
-    Path rootDir = util.getMaster().
-        getStorageManager().getWarehouseDir();
-    if (!fs.exists(rootDir)) {
-      fs.mkdirs(rootDir);
-    }
-    Path tablePath = new Path(rootDir, tableName);
-    fs.mkdirs(tablePath);
-    if (tableDatas.length > 0) {
-      Path dfsPath = new Path(tablePath, tableName + ".tbl");
-      FSDataOutputStream out = fs.create(dfsPath);
-      for (int j = 0; j < tableDatas.length; j++) {
-        out.write((tableDatas[j] + "\n").getBytes());
+    try {
+      FileSystem fs = util.getDefaultFileSystem();
+      Path rootDir = util.getMaster().
+          getStorageManager().getWarehouseDir();
+      if (!fs.exists(rootDir)) {
+        fs.mkdirs(rootDir);
       }
-      out.close();
+      Path tablePath = new Path(rootDir, tableName);
+      fs.mkdirs(tablePath);
+      if (tableDatas.length > 0) {
+        int recordPerFile = tableDatas.length / numDataFiles;
+        if (recordPerFile == 0) {
+          recordPerFile = 1;
+        }
+        FSDataOutputStream out = null;
+        for (int j = 0; j < tableDatas.length; j++) {
+          if (out == null || j % recordPerFile == 0) {
+            if (out != null) {
+              out.close();
+            }
+            Path dfsPath = new Path(tablePath, tableName + j + ".tbl");
+            out = fs.create(dfsPath);
+          }
+          out.write((tableDatas[j] + "\n").getBytes());
+        }
+        if (out != null) {
+          out.close();
+        }
+      }
+      TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV, tableOption);
+      client.createExternalTable(tableName, schema, tablePath, meta);
+    } finally {
+      client.close();
     }
-    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV, tableOption);
-    client.createExternalTable(tableName, schema, tablePath, meta);
   }
 
     /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/cdd35887/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
index 08d4503..e109616 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
@@ -47,15 +47,34 @@ public class TestJoinQuery extends QueryTestCaseBase {
   public TestJoinQuery(String joinOption) {
     super(TajoConstants.DEFAULT_DATABASE_NAME);
 
-    if ("Hash".equals(joinOption)) {
+    testingCluster.setAllTajoDaemonConfValue(ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO.varname,
+        ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO.defaultVal);
+    testingCluster.setAllTajoDaemonConfValue(ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD.varname,
+        ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD.defaultVal);
+
+    testingCluster.setAllTajoDaemonConfValue(
+        ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD.varname,
+        ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD.defaultVal);
+
+    testingCluster.setAllTajoDaemonConfValue(ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD.varname,
+        ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD.defaultVal);
+    testingCluster.setAllTajoDaemonConfValue(ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
+        ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.defaultVal);
+
+    if (joinOption.indexOf("NoBroadcast") >= 0) {
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO.varname,
"false");
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD.varname,
"-1");
+    }
+
+    if (joinOption.indexOf("Hash") >= 0) {
       testingCluster.setAllTajoDaemonConfValue(
-          ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD.varname,
-          String.valueOf(ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD.defaultLongVal));
+          ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD.varname, String.valueOf(256
* 1048576));
       testingCluster.setAllTajoDaemonConfValue(ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD.varname,
-          String.valueOf(ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD.defaultLongVal));
+          String.valueOf(256 * 1048576));
       testingCluster.setAllTajoDaemonConfValue(ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
-          String.valueOf(ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.defaultLongVal));
-    } else {
+          String.valueOf(256 * 1048576));
+    }
+    if (joinOption.indexOf("Sort") >= 0) {
       testingCluster.setAllTajoDaemonConfValue(
           ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD.varname, String.valueOf(1));
       testingCluster.setAllTajoDaemonConfValue(ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD.varname,
@@ -68,21 +87,28 @@ public class TestJoinQuery extends QueryTestCaseBase {
   @Parameters
   public static Collection<Object[]> generateParameters() {
     return Arrays.asList(new Object[][]{
+        {"Hash_NoBroadcast"},
+        {"Sort_NoBroadcast"},
         {"Hash"},
-        {"Sort"}
+        {"Sort"},
     });
   }
 
   @AfterClass
-  public static void tearDown() {
-    // recover the default configuration.
+  public static void classTearDown() {
+    testingCluster.setAllTajoDaemonConfValue(ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO.varname,
+        ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO.defaultVal);
+    testingCluster.setAllTajoDaemonConfValue(ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD.varname,
+        ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD.defaultVal);
+
     testingCluster.setAllTajoDaemonConfValue(
         ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD.varname,
-        String.valueOf(ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD.defaultLongVal));
+        ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD.defaultVal);
+
     testingCluster.setAllTajoDaemonConfValue(ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD.varname,
-        String.valueOf(ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD.defaultLongVal));
+        ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD.defaultVal);
     testingCluster.setAllTajoDaemonConfValue(ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.varname,
-        String.valueOf(ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.defaultLongVal));
+        ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD.defaultVal);
   }
 
   @Test
@@ -361,6 +387,86 @@ public class TestJoinQuery extends QueryTestCaseBase {
   }
 
   @Test
+  public final void testLeftOuterJoinWithEmptySubquery1() throws Exception {
+    // Empty Null Supplying table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.put(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.put(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("name", Type.TEXT);
+    String[] data = new String[]{ "1|table11-1", "2|table11-2", "3|table11-3", "4|table11-4",
"5|table11-5" };
+    TajoTestingCluster.createTable("table11", schema, tableOptions, data, 2);
+
+    data = new String[]{ "1|table11-1", "2|table11-2" };
+    TajoTestingCluster.createTable("table12", schema, tableOptions, data, 2);
+
+    try {
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.TESTCASE_MIN_TASK_NUM.varname, "2");
+
+      ResultSet res = executeString("select a.id, b.id from table11 a " +
+          "left outer join (" +
+          "select table12.id from table12 inner join lineitem on table12.id = lineitem.l_orderkey
and table12.id > 10) b " +
+          "on a.id = b.id order by a.id");
+
+      String expected = "id,id\n" +
+          "-------------------------------\n" +
+          "1,null\n" +
+          "2,null\n" +
+          "3,null\n" +
+          "4,null\n" +
+          "5,null\n";
+
+      assertEquals(expected, resultSetToString(res));
+      cleanupQuery(res);
+    } finally {
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.TESTCASE_MIN_TASK_NUM.varname,
+          ConfVars.TESTCASE_MIN_TASK_NUM.defaultVal);
+      executeString("DROP TABLE table11 PURGE").close();
+      executeString("DROP TABLE table12 PURGE").close();
+    }
+  }
+
+  @Test
+  public final void testLeftOuterJoinWithEmptySubquery2() throws Exception {
+    //Empty Preserved Row table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.put(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.put(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("name", Type.TEXT);
+    String[] data = new String[]{ "1|table11-1", "2|table11-2", "3|table11-3", "4|table11-4",
"5|table11-5" };
+    TajoTestingCluster.createTable("table11", schema, tableOptions, data, 2);
+
+    data = new String[]{ "1|table11-1", "2|table11-2" };
+    TajoTestingCluster.createTable("table12", schema, tableOptions, data, 2);
+
+    try {
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.TESTCASE_MIN_TASK_NUM.varname, "2");
+
+      ResultSet res = executeString("select a.id, b.id from " +
+          "(select table12.id, table12.name, lineitem.l_shipdate " +
+          "from table12 inner join lineitem on table12.id = lineitem.l_orderkey and table12.id
> 10) a " +
+          "left outer join table11 b " +
+          "on a.id = b.id");
+
+      String expected = "id,id\n" +
+          "-------------------------------\n";
+
+      assertEquals(expected, resultSetToString(res));
+      cleanupQuery(res);
+    } finally {
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.TESTCASE_MIN_TASK_NUM.varname,
+          ConfVars.TESTCASE_MIN_TASK_NUM.defaultVal);
+      executeString("DROP TABLE table11 PURGE");
+      executeString("DROP TABLE table12 PURGE");
+    }
+  }
+  
+  @Test
   public final void testFullOuterJoinWithEmptyTable1() throws Exception {
     ResultSet res = executeQuery();
     assertResultSet(res);

http://git-wip-us.apache.org/repos/asf/tajo/blob/cdd35887/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin1.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin1.sql
index a7c5335..575456f 100644
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin1.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin1.sql
@@ -8,4 +8,4 @@ from
   region
 where
   n_regionkey = r_regionkey
-order by n_name
\ No newline at end of file
+order by n_name;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/cdd35887/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin2.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin2.sql
index 4bb1a82..efc07b3 100644
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin2.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin2.sql
@@ -6,4 +6,4 @@ from
   region
 where
   n_regionkey = r_regionkey
-order by n_name
\ No newline at end of file
+order by n_name;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/cdd35887/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin3.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin3.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin3.sql
index 631469b..04fd25b 100644
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin3.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin3.sql
@@ -7,4 +7,4 @@ from
   nation, region
 where
   n_regionkey = r_regionkey
-order by n_name
\ No newline at end of file
+order by n_name;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/cdd35887/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin4.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin4.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin4.sql
index 08e6807..777de8a 100644
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin4.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testWhereClauseJoin4.sql
@@ -6,4 +6,4 @@ from
   nation, region
 where
   n_regionkey = r_regionkey
-order by n_name
\ No newline at end of file
+order by n_name;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/cdd35887/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index 22757b5..9677bca 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -62,7 +62,6 @@ public class RawFile {
     public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, Path path) throws
IOException {
       super(conf, schema, meta, null);
       this.path = path;
-      init();
     }
 
     @SuppressWarnings("unused")


Mime
View raw message