tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hj...@apache.org
Subject [12/29] tajo git commit: TAJO-1190: INSERT INTO to partition tables may cause NPE.
Date Fri, 05 Dec 2014 08:21:16 GMT
TAJO-1190: INSERT INTO to partition tables may cause NPE.

Closes #250


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1cdbe467
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1cdbe467
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1cdbe467

Branch: refs/heads/hbase_storage
Commit: 1cdbe467e3dc25d7af59afc116ff9e8e6273a1ac
Parents: b4adc18
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Wed Dec 3 02:25:34 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Wed Dec 3 02:25:34 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  8 ++-
 .../planner/physical/PhysicalPlanUtil.java      | 65 +++++++++++++++-----
 .../engine/planner/physical/SeqScanExec.java    |  7 ++-
 .../tajo/worker/TajoWorkerClientService.java    | 16 ++---
 .../tajo/engine/query/TestTablePartitions.java  | 56 +++++++++++++++++
 ...rtitionedTableWithSmallerExpressions5.result |  7 +++
 ...rtitionedTableWithSmallerExpressions6.result |  4 ++
 .../apache/tajo/rpc/RemoteCallException.java    |  6 +-
 8 files changed, 144 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/1cdbe467/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 8d51d44..188e024 100644
--- a/CHANGES
+++ b/CHANGES
@@ -15,7 +15,8 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
-    TAJO-1165: Needs to show error messages on query_executor.jsp. (Jihun Kang via jaehwa)
+    TAJO-1165: Needs to show error messages on query_executor.jsp. 
+    (Jihun Kang via jaehwa)
 
     TAJO-1204: Remove unused ServerName class. (DaeMyung Kang via jaehwa)
 
@@ -79,6 +80,8 @@ Release 0.9.1 - unreleased
 
   BUG FIXES
 
+    TAJO-1190: INSERT INTO to partition tables may cause NPE. (hyunsik)
+
     TAJO-1211: Staging directory for CTAS and INSERT should be in 
     the output dir. (hyunsik)
 
@@ -87,7 +90,8 @@ Release 0.9.1 - unreleased
 
     TAJO-1119: JDBC driver should support TIMESTAMP type. (jaehwa)
 
-    TAJO-1166: S3 related storage causes compilation error in Hadoop 2.6.0-SNAPSHOT. (jaehwa)
+    TAJO-1166: S3 related storage causes compilation error in Hadoop 
+    2.6.0-SNAPSHOT. (jaehwa)
 
     TAJO-1208: Failure of create table using textfile on hivemeta.
     (jinho)

http://git-wip-us.apache.org/repos/asf/tajo/blob/1cdbe467/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
index fe1f795..a63b838 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
@@ -77,31 +77,33 @@ public class PhysicalPlanUtil {
     Path path = new Path(tableDesc.getPath());
     FileSystem fs = path.getFileSystem(tajoConf);
 
+    //In the case of partitioned table, we should return same partition key data files.
+    int partitionDepth = 0;
+    if (tableDesc.hasPartition()) {
+      partitionDepth = tableDesc.getPartitionMethod().getExpressionSchema().getColumns().size();
+    }
+
     List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>();
     if (fs.exists(path)) {
       getNonZeroLengthDataFiles(fs, path, nonZeroLengthFiles, fileIndex, numResultFiles,
-          new AtomicInteger(0));
+          new AtomicInteger(0), tableDesc.hasPartition(), 0, partitionDepth);
     }
 
     List<FileFragment> fragments = new ArrayList<FileFragment>();
 
-    //In the case of partitioned table, return same partition key data files.
-    int numPartitionColumns = 0;
-    if (tableDesc.hasPartition()) {
-      numPartitionColumns = tableDesc.getPartitionMethod().getExpressionSchema().getColumns().size();
-    }
+
     String[] previousPartitionPathNames = null;
     for (FileStatus eachFile: nonZeroLengthFiles) {
       FileFragment fileFragment = new FileFragment(tableDesc.getName(), eachFile.getPath(),
0, eachFile.getLen(), null);
 
-      if (numPartitionColumns > 0) {
+      if (partitionDepth > 0) {
         // finding partition key;
         Path filePath = fileFragment.getPath();
         Path parentPath = filePath;
-        String[] parentPathNames = new String[numPartitionColumns];
-        for (int i = 0; i < numPartitionColumns; i++) {
+        String[] parentPathNames = new String[partitionDepth];
+        for (int i = 0; i < partitionDepth; i++) {
           parentPath = parentPath.getParent();
-          parentPathNames[numPartitionColumns - i - 1] = parentPath.getName();
+          parentPathNames[partitionDepth - i - 1] = parentPath.getName();
         }
 
         // If current partitionKey == previousPartitionKey, add to result.
@@ -120,20 +122,53 @@ public class PhysicalPlanUtil {
     return FragmentConvertor.toFragmentProtoArray(fragments.toArray(new FileFragment[]{}));
   }
 
+  /**
+   *
+   * @param fs
+   * @param path The table path
+   * @param result The final result files to be used
+   * @param startFileIndex
+   * @param numResultFiles
+   * @param currentFileIndex
+   * @param partitioned A flag to indicate if this table is partitioned
+   * @param currentDepth Current visiting depth of partition directories
+   * @param maxDepth The partition depth of this table
+   * @throws IOException
+   */
   private static void getNonZeroLengthDataFiles(FileSystem fs, Path path, List<FileStatus>
result,
                                          int startFileIndex, int numResultFiles,
-                                         AtomicInteger currentFileIndex) throws IOException
{
+                                         AtomicInteger currentFileIndex, boolean partitioned,
+                                         int currentDepth, int maxDepth) throws IOException
{
+    // Intermediate directory
     if (fs.isDirectory(path)) {
+
       FileStatus[] files = fs.listStatus(path, StorageManager.hiddenFileFilter);
+
       if (files != null && files.length > 0) {
+
         for (FileStatus eachFile : files) {
+
+          // checking if the enough number of files are found
           if (result.size() >= numResultFiles) {
             return;
           }
+
           if (eachFile.isDirectory()) {
-            getNonZeroLengthDataFiles(fs, eachFile.getPath(), result, startFileIndex, numResultFiles,
-                currentFileIndex);
-          } else if (eachFile.isFile() && eachFile.getLen() > 0) {
+            getNonZeroLengthDataFiles(
+                fs,
+                eachFile.getPath(),
+                result,
+                startFileIndex,
+                numResultFiles,
+                currentFileIndex,
+                partitioned,
+                currentDepth + 1, // increment a visiting depth
+                maxDepth);
+
+
+            // if partitioned table, we should ignore files located in the intermediate directory.
+            // we can ensure that this file is in leaf directory if currentDepth == maxDepth.
+          } else if (eachFile.isFile() && eachFile.getLen() > 0 && (!partitioned
|| currentDepth == maxDepth)) {
             if (currentFileIndex.get() >= startFileIndex) {
               result.add(eachFile);
             }
@@ -141,6 +176,8 @@ public class PhysicalPlanUtil {
           }
         }
       }
+
+      // Files located in leaf directory
     } else {
       FileStatus fileStatus = fs.getFileStatus(path);
       if (fileStatus != null && fileStatus.getLen() > 0) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/1cdbe467/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 3cbb7c9..759b19c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -332,7 +332,12 @@ public class SeqScanExec extends PhysicalExec {
     if (scanner != null) {
       return scanner.getInputStats();
     } else {
-      return inputStats;
+      if (inputStats != null) {
+        return inputStats;
+      } else {
+        // If no fragment, there is no scanner. So, we need to create a dummy table stat.
+        return new TableStats();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/1cdbe467/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index a41ffce..0f4a60c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -32,6 +32,7 @@ import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse;
 import org.apache.tajo.ipc.ClientProtos.QueryIdRequest;
@@ -39,8 +40,6 @@ import org.apache.tajo.ipc.ClientProtos.ResultCode;
 import org.apache.tajo.ipc.QueryMasterClientProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.querymaster.Query;
-import org.apache.tajo.master.querymaster.QueryInProgress;
-import org.apache.tajo.master.querymaster.QueryJobManager;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.rpc.BlockingRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
@@ -132,6 +131,10 @@ public class TajoWorkerClientService extends AbstractService {
       return null;
     }
 
+    private boolean hasResultTableDesc(QueryContext queryContext) {
+      return !(queryContext.isCreateTable() || queryContext.isInsert());
+    }
+
     @Override
     public ClientProtos.GetQueryResultResponse getQueryResult(
             RpcController controller,
@@ -151,7 +154,9 @@ public class TajoWorkerClientService extends AbstractService {
       } else {
         switch (queryMasterTask.getState()) {
           case QUERY_SUCCEEDED:
-            builder.setTableDesc(queryMasterTask.getQuery().getResultDesc().getProto());
+//            if (hasResultTableDesc(queryMasterTask.getQueryTaskContext().getQueryContext()))
{
+              builder.setTableDesc(queryMasterTask.getQuery().getResultDesc().getProto());
+            //}
             break;
           case QUERY_FAILED:
           case QUERY_ERROR:
@@ -191,10 +196,7 @@ public class TajoWorkerClientService extends AbstractService {
           return builder.build();
         }
 
-        builder.setHasResult(
-            !(queryMasterTask.getQueryTaskContext().getQueryContext().isCreateTable() ||
-                queryMasterTask.getQueryTaskContext().getQueryContext().isInsert())
-        );
+        builder.setHasResult(hasResultTableDesc(queryMasterTask.getQueryTaskContext().getQueryContext()));
 
         queryMasterTask.touchSessionTime();
         Query query = queryMasterTask.getQuery();

http://git-wip-us.apache.org/repos/asf/tajo/blob/1cdbe467/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index 0e9ec7d..cff5bfb 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.query;
 
 import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -43,6 +44,7 @@ import org.apache.tajo.jdbc.TajoResultSet;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.worker.TajoWorker;
 import org.junit.Test;
@@ -820,6 +822,39 @@ public class TestTablePartitions extends QueryTestCaseBase {
     }
   }
 
+  @Test
+  public final void testColumnPartitionedTableWithSmallerExpressions5() throws Exception
{
+    String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions5");
+    ResultSet res = executeString(
+        "create table " + tableName + " (col1 text) partition by column(col2 text) ");
+    res.close();
+
+    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+    res = executeString("insert overwrite into " + tableName + "(col1) select l_returnflag
from lineitem");
+    res.close();
+    res = executeString("select * from " + tableName);
+    assertResultSet(res);
+    res.close();
+  }
+
+  @Test
+  public final void testColumnPartitionedTableWithSmallerExpressions6() throws Exception
{
+    String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions6");
+    ResultSet res = executeString(
+        "create table " + tableName + " (col1 text) partition by column(col2 text) ");
+    res.close();
+
+    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+    res = executeString(
+        "insert overwrite into " + tableName + "(col1) select l_returnflag from lineitem
where l_orderkey = 1");
+    res.close();
+    res = executeString("select * from " + tableName);
+    assertResultSet(res);
+    res.close();
+  }
+
   private MasterPlan getQueryPlan(ResultSet res) {
     QueryId queryId = ((TajoResultSet)res).getQueryId();
     for (TajoWorker eachWorker: testingCluster.getTajoWorkers()) {
@@ -936,4 +971,25 @@ public class TestTablePartitions extends QueryTestCaseBase {
     assertResultSet(res);
     cleanupQuery(res);
   }
+
+  @Test
+  public final void testIgnoreFilesInIntermediateDir() throws Exception {
+    // See - TAJO-1219: Files located in intermediate directories of partitioned table should
be ignored
+    // It verifies that Tajo ignores files located in intermediate directories of partitioned
table.
+
+    Path testDir = CommonTestingUtil.getTestDir();
+
+    executeString(
+        "CREATE EXTERNAL TABLE testIgnoreFilesInIntermediateDir (col1 int) USING CSV PARTITION
BY COLUMN (col2 text) " +
+        "LOCATION '" + testDir + "'");
+
+    FileSystem fs = testDir.getFileSystem(conf);
+    FSDataOutputStream fos = fs.create(new Path(testDir, "table1.data"));
+    fos.write("a|b|c".getBytes());
+    fos.close();
+
+    ResultSet res = executeString("select * from testIgnoreFilesInIntermediateDir;");
+    assertFalse(res.next());
+    res.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1cdbe467/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions5.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions5.result
b/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions5.result
new file mode 100644
index 0000000..f972753
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions5.result
@@ -0,0 +1,7 @@
+col1,col2
+-------------------------------
+N,__TAJO_DEFAULT_PARTITION__
+N,__TAJO_DEFAULT_PARTITION__
+N,__TAJO_DEFAULT_PARTITION__
+R,__TAJO_DEFAULT_PARTITION__
+R,__TAJO_DEFAULT_PARTITION__
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/1cdbe467/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions6.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions6.result
b/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions6.result
new file mode 100644
index 0000000..6b8e2f1
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestTablePartitions/testColumnPartitionedTableWithSmallerExpressions6.result
@@ -0,0 +1,4 @@
+col1,col2
+-------------------------------
+N,__TAJO_DEFAULT_PARTITION__
+N,__TAJO_DEFAULT_PARTITION__
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/1cdbe467/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
index 90ee58a..52ef31a 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java
@@ -49,7 +49,11 @@ public class RemoteCallException extends RemoteException {
   public RpcResponse getResponse() {
     RpcResponse.Builder builder = RpcResponse.newBuilder();
     builder.setId(seqId);
-    builder.setErrorMessage(getCause().getMessage());
+    if (getCause().getMessage() == null) {
+      builder.setErrorMessage(getCause().getClass().getName());
+    } else {
+      builder.setErrorMessage(getCause().getMessage());
+    }
     builder.setErrorTrace(getStackTraceString(getCause()));
     builder.setErrorClass(originExceptionClass);
 


Mime
View raw message