tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [1/9] git commit: TAJO-20: INSERT INTO ... SELECT. (Hyoungjun Kim via hyunsik)
Date Mon, 21 Jul 2014 01:02:08 GMT
Repository: tajo
Updated Branches:
  refs/heads/index_support e9207f3b3 -> 334ea2ef9


TAJO-20: INSERT INTO ... SELECT. (Hyoungjun Kim via hyunsik)

Closes #72


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/499d1081
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/499d1081
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/499d1081

Branch: refs/heads/index_support
Commit: 499d1081b66e0a785dce82430e90bce9d39caa7e
Parents: 9cf1071
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Tue Jul 15 17:37:12 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Tue Jul 15 17:38:02 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../engine/planner/PreLogicalPlanVerifier.java  |   3 -
 .../planner/physical/ColPartitionStoreExec.java |  33 ++-
 .../HashBasedColPartitionStoreExec.java         |  24 +-
 .../SortBasedColPartitionStoreExec.java         |  25 +-
 .../org/apache/tajo/master/GlobalEngine.java    |   3 +
 .../apache/tajo/master/querymaster/Query.java   | 145 +++++++++++-
 .../master/querymaster/QueryMasterTask.java     |  14 --
 .../main/java/org/apache/tajo/worker/Task.java  |  14 +-
 .../java/org/apache/tajo/QueryTestCaseBase.java |  31 ++-
 .../tajo/engine/query/TestInsertQuery.java      | 227 +++++++++++++++++++
 .../tajo/engine/query/TestTablePartitions.java  | 129 ++++++++++-
 .../queries/TestInsertQuery/testInsertInto.sql  |   1 +
 .../org/apache/tajo/storage/StorageUtil.java    |  65 ++++++
 14 files changed, 641 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 4aeb20c..4146fee 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,8 @@ Release 0.9.0 - unreleased
 
   NEW FEATURES 
 
+    TAJO-20: INSERT INTO ... SELECT. (Hyoungjun Kim via hyunsik)
+
     TAJO-774: Implement logical plan part and physical executor for window 
     function. (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
index f744cf6..2d6c095 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
@@ -225,9 +225,6 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <PreLogicalPlanVe
 
   public Expr visitInsert(Context context, Stack<Expr> stack, Insert expr) throws PlanningException
{
     Expr child = super.visitInsert(context, stack, expr);
-    if (!expr.isOverwrite()) {
-      context.state.addVerification("INSERT INTO statement is not supported yet.");
-    }
 
     if (expr.hasTableName()) {
       assertRelationExistence(context, expr.getTableName());

http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
index 12eb0e0..d292437 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
@@ -19,6 +19,9 @@
 package org.apache.tajo.engine.planner.physical;
 
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.CatalogUtil;
@@ -29,12 +32,17 @@ import org.apache.tajo.engine.planner.logical.CreateTableNode;
 import org.apache.tajo.engine.planner.logical.InsertNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.StorageManagerFactory;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
+import java.text.NumberFormat;
 
 public abstract class ColPartitionStoreExec extends UnaryPhysicalExec {
+  private static Log LOG = LogFactory.getLog(ColPartitionStoreExec.class);
+
   protected final TableMeta meta;
   protected final StoreTableNode plan;
   protected Path storeTablePath;
@@ -100,8 +108,31 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec
{
     }
   }
 
-
   protected Path getDataFile(String partition) {
     return StorageUtil.concatPath(storeTablePath.getParent(), partition, storeTablePath.getName());
   }
+
+  protected Appender makeAppender(String partition) throws IOException {
+    Path dataFile = getDataFile(partition);
+    FileSystem fs = dataFile.getFileSystem(context.getConf());
+
+    if (fs.exists(dataFile.getParent())) {
+      LOG.info("Path " + dataFile.getParent() + " already exists!");
+    } else {
+      fs.mkdirs(dataFile.getParent());
+      LOG.info("Add subpartition path directory :" + dataFile.getParent());
+    }
+
+    if (fs.exists(dataFile)) {
+      LOG.info("File " + dataFile + " already exists!");
+      FileStatus status = fs.getFileStatus(dataFile);
+      LOG.info("File size: " + status.getLen());
+    }
+
+    Appender appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
outSchema, dataFile);
+    appender.enableStats();
+    appender.init();
+
+    return appender;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
index df32d0b..6cef22e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
@@ -20,15 +20,11 @@ package org.apache.tajo.engine.planner.physical;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.engine.planner.logical.StoreTableNode;
 import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.StorageManagerFactory;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
@@ -59,25 +55,7 @@ public class HashBasedColPartitionStoreExec extends ColPartitionStoreExec
{
     Appender appender = appenderMap.get(partition);
 
     if (appender == null) {
-      Path dataFile = getDataFile(partition);
-      FileSystem fs = dataFile.getFileSystem(context.getConf());
-
-      if (fs.exists(dataFile.getParent())) {
-        LOG.info("Path " + dataFile.getParent() + " already exists!");
-      } else {
-        fs.mkdirs(dataFile.getParent());
-        LOG.info("Add subpartition path directory :" + dataFile.getParent());
-      }
-
-      if (fs.exists(dataFile)) {
-        LOG.info("File " + dataFile + " already exists!");
-        FileStatus status = fs.getFileStatus(dataFile);
-        LOG.info("File size: " + status.getLen());
-      }
-
-      appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
outSchema, dataFile);
-      appender.enableStats();
-      appender.init();
+      appender = makeAppender(partition);
       appenderMap.put(partition, appender);
     } else {
       appender = appenderMap.get(partition);

http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
index 8c55d7f..d09e296 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
@@ -23,15 +23,11 @@ package org.apache.tajo.engine.planner.physical;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.engine.planner.logical.StoreTableNode;
 import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.StorageManagerFactory;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;
@@ -64,26 +60,7 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec
{
   }
 
   private Appender getAppender(String partition) throws IOException {
-    Path dataFile = getDataFile(partition);
-    FileSystem fs = dataFile.getFileSystem(context.getConf());
-
-    if (fs.exists(dataFile.getParent())) {
-      LOG.info("Path " + dataFile.getParent() + " already exists!");
-    } else {
-      fs.mkdirs(dataFile.getParent());
-      LOG.info("Add subpartition path directory :" + dataFile.getParent());
-    }
-
-    if (fs.exists(dataFile)) {
-      LOG.info("File " + dataFile + " already exists!");
-      FileStatus status = fs.getFileStatus(dataFile);
-      LOG.info("File size: " + status.getLen());
-    }
-
-    appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
outSchema, dataFile);
-    appender.enableStats();
-    appender.init();
-
+    this.appender = makeAppender(partition);
     return appender;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 94c4f25..73f3cf5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -904,6 +904,9 @@ public class GlobalEngine extends AbstractService {
       if (insertNode.hasTargetTable()) { // INSERT INTO [TB_NAME]
         queryContext.setOutputTable(insertNode.getTableName());
         queryContext.setOutputPath(insertNode.getPath());
+        if (insertNode.hasPartition()) {
+          queryContext.setPartitionMethod(insertNode.getPartitionMethod());
+        }
       } else { // INSERT INTO LOCATION ...
         // When INSERT INTO LOCATION, must not set output table.
         outputPath = insertNode.getPath();

http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 31199ba..8bb3dde 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -49,9 +50,11 @@ import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
+import java.text.NumberFormat;
 import java.util.*;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -437,10 +440,43 @@ public class Query implements EventHandler<QueryEvent> {
               }
             }
           } else {
-            fs.rename(stagingResultDir, finalOutputDir);
-            LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir);
+            NodeType queryType = queryContext.getCommandType();
+
+            if (queryType == NodeType.INSERT) { // INSERT INTO an existing table
+
+              NumberFormat fmt = NumberFormat.getInstance();
+              fmt.setGroupingUsed(false);
+              fmt.setMinimumIntegerDigits(3);
+
+              if (queryContext.hasPartition()) {
+                for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
+                  if (eachFile.isFile()) {
+                    LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath());
+                    continue;
+                  }
+                  moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir,
fmt, -1);
+                }
+              } else {
+                int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) +
1;
+                for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
+                  moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir,
fmt, maxSeq++);
+                }
+              }
+              // checking all file moved and remove empty dir
+              verifyAllFileMoved(fs, stagingResultDir);
+              FileStatus[] files = fs.listStatus(stagingResultDir);
+              if (files != null && files.length != 0) {
+                for (FileStatus eachFile: files) {
+                  LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
+                }
+              }
+            } else { // CREATE TABLE AS SELECT (CTAS)
+              fs.rename(stagingResultDir, finalOutputDir);
+              LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir);
+            }
           }
         } catch (IOException e) {
+          // TODO report to client
           e.printStackTrace();
         }
       } else {
@@ -450,6 +486,111 @@ public class Query implements EventHandler<QueryEvent> {
       return finalOutputDir;
     }
 
+    private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException
{
+      FileStatus[] files = fs.listStatus(stagingPath);
+      if (files != null && files.length != 0) {
+        for (FileStatus eachFile: files) {
+          if (eachFile.isFile()) {
+            LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
+            return false;
+          } else {
+            if (verifyAllFileMoved(fs, eachFile.getPath())) {
+              fs.delete(eachFile.getPath(), false);
+            } else {
+              return false;
+            }
+          }
+        }
+      }
+
+      return true;
+    }
+
+    /**
+     * Attach the sequence number to a path.
+     *
+     * @param path Path
+     * @param seq sequence number
+     * @param nf Number format
+     * @return New path attached with sequence number
+     * @throws IOException
+     */
+    private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException
{
+      String[] tokens = path.getName().split("-");
+      if (tokens.length != 4) {
+        throw new IOException("Wrong result file name:" + path);
+      }
+      return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq);
+    }
+
+    /**
+     * Attach the sequence number to the output file name and than move the file into the
final result path.
+     *
+     * @param fs FileSystem
+     * @param stagingResultDir The staging result dir
+     * @param fileStatus The file status
+     * @param finalOutputPath Final output path
+     * @param nf Number format
+     * @param fileSeq The sequence number
+     * @throws IOException
+     */
+    private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir,
+                                            FileStatus fileStatus, Path finalOutputPath,
+                                            NumberFormat nf,
+                                            int fileSeq) throws IOException {
+      if (fileStatus.isDirectory()) {
+        String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
+        if (subPath != null) {
+          Path finalSubPath = new Path(finalOutputPath, subPath);
+          if (!fs.exists(finalSubPath)) {
+            fs.mkdirs(finalSubPath);
+          }
+          int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false);
+          for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) {
+            moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf,
++maxSeq);
+          }
+        } else {
+          throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath());
+        }
+      } else {
+        String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
+        if (subPath != null) {
+          Path finalSubPath = new Path(finalOutputPath, subPath);
+          finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath,
fileSeq, nf));
+          if (!fs.exists(finalSubPath.getParent())) {
+            fs.mkdirs(finalSubPath.getParent());
+          }
+          if (fs.exists(finalSubPath)) {
+            throw new IOException("Already exists data file:" + finalSubPath);
+          }
+          boolean success = fs.rename(fileStatus.getPath(), finalSubPath);
+          if (success) {
+            LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " +
+                "to final output[" + finalSubPath + "]");
+          } else {
+            LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " +
+                "to final output[" + finalSubPath + "]");
+          }
+        }
+      }
+    }
+
+    private String extractSubPath(Path parentPath, Path childPath) {
+      String parentPathStr = parentPath.toUri().getPath();
+      String childPathStr = childPath.toUri().getPath();
+
+      if (parentPathStr.length() > childPathStr.length()) {
+        return null;
+      }
+
+      int index = childPathStr.indexOf(parentPathStr);
+      if (index != 0) {
+        return null;
+      }
+
+      return childPathStr.substring(parentPathStr.length() + 1);
+    }
+
     private static interface QueryHook {
       boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
Path finalOutputDir);
       void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query
query,

http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 0061717..071e5d4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -368,7 +368,6 @@ public class QueryMasterTask extends CompositeService {
 
   private void initStagingDir() throws IOException {
     Path stagingDir = null;
-    Path outputDir = null;
     FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf);
 
     try {
@@ -378,20 +377,7 @@ public class QueryMasterTask extends CompositeService {
 
       // Create a subdirectories
       LOG.info("The staging dir '" + stagingDir + "' is created.");
-
       queryContext.setStagingDir(stagingDir);
-
-      /////////////////////////////////////////////////
-      // Check and Create Output Directory If Necessary
-      /////////////////////////////////////////////////
-      if (queryContext.hasOutputPath()) {
-        outputDir = queryContext.getOutputPath();
-        if (!queryContext.isOutputOverwrite()) {
-          if (defaultFS.exists(outputDir)) {
-            throw new IOException("The output directory '" + outputDir + " already exists.");
-          }
-        }
-      }
     } catch (IOException ioe) {
       if (stagingDir != null && defaultFS.exists(stagingDir)) {
         try {

http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index c3f3827..230c63a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -121,6 +121,17 @@ public class Task {
         }
       };
 
+  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ =
+      new ThreadLocal<NumberFormat>() {
+        @Override
+        public NumberFormat initialValue() {
+          NumberFormat fmt = NumberFormat.getInstance();
+          fmt.setGroupingUsed(false);
+          fmt.setMinimumIntegerDigits(3);
+          return fmt;
+        }
+      };
+
   public Task(QueryUnitAttemptId taskId,
               final TaskRunner.TaskRunnerContext worker,
               final QueryMasterProtocolService.Interface masterProxy,
@@ -179,7 +190,8 @@ public class Task {
       Path outFilePath = StorageUtil.concatPath(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME,
           OUTPUT_FILE_PREFIX +
           OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskId.getQueryUnitId().getExecutionBlockId().getId())
+ "-" +
-          OUTPUT_FILE_FORMAT_TASK.get().format(taskId.getQueryUnitId().getId()));
+          OUTPUT_FILE_FORMAT_TASK.get().format(taskId.getQueryUnitId().getId()) + "-" +
+          OUTPUT_FILE_FORMAT_SEQ.get().format(0));
       LOG.info("Output File Path: " + outFilePath);
       context.setOutputPath(outFilePath);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index 29613cb..9d15732 100644
--- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -574,14 +574,24 @@ public class QueryTestCaseBase {
   public String getTableFileContents(Path path) throws Exception {
     FileSystem fs = path.getFileSystem(conf);
 
+    FileStatus[] files = fs.listStatus(path);
+
+    if (files == null || files.length == 0) {
+      return "";
+    }
+
     StringBuilder sb = new StringBuilder();
+    byte[] buf = new byte[1024];
 
-    List<Path> paths = listFiles(fs, path);
-    for (Path eachPath: paths) {
-      InputStream in = fs.open(eachPath);
+    for (FileStatus file: files) {
+      if (file.isDirectory()) {
+        sb.append(getTableFileContents(file.getPath()));
+        continue;
+      }
+
+      InputStream in = fs.open(file.getPath());
       try {
         while (true) {
-          byte[] buf = new byte[1024];
           int readBytes = in.read(buf);
           if (readBytes <= 0) {
             break;
@@ -593,6 +603,7 @@ public class QueryTestCaseBase {
         in.close();
       }
     }
+
     return sb.toString();
   }
 
@@ -612,6 +623,18 @@ public class QueryTestCaseBase {
     return getTableFileContents(path);
   }
 
+  public List<Path> listTableFiles(String tableName) throws Exception {
+    TableDesc tableDesc = testingCluster.getMaster().getCatalog().getTableDesc(getCurrentDatabase(),
tableName);
+    if (tableDesc == null) {
+      return null;
+    }
+
+    Path path = tableDesc.getPath();
+    FileSystem fs = path.getFileSystem(conf);
+
+    return listFiles(fs, path);
+  }
+
   private List<Path> listFiles(FileSystem fs, Path path) throws Exception {
     List<Path> result = new ArrayList<Path>();
     FileStatus[] files = fs.listStatus(path);

http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
index 822bf51..4b48182 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
@@ -35,8 +35,10 @@ import org.junit.experimental.categories.Category;
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.sql.ResultSet;
+import java.util.List;
 
 import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 @Category(IntegrationTest.class)
 public class TestInsertQuery extends QueryTestCaseBase {
@@ -61,6 +63,231 @@ public class TestInsertQuery extends QueryTestCaseBase {
   }
 
   @Test
+  public final void testInsertInto() throws Exception {
+    // create table and upload test data
+    ResultSet res = executeFile("table1_ddl.sql");
+    res.close();
+
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), "table1"));
+
+    res = executeFile("testInsertOverwrite.sql");
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1");
+    if (!testingCluster.isHCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+
+    res = executeFile("testInsertInto.sql");
+    res.close();
+
+    List<Path> dataFiles = listTableFiles("table1");
+    assertEquals(2, dataFiles.size());
+
+    for (int i = 0; i < dataFiles.size(); i++) {
+      String name = dataFiles.get(i).getName();
+      assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*"));
+      String[] tokens = name.split("-");
+      assertEquals(4, tokens.length);
+      assertEquals(i, Integer.parseInt(tokens[3]));
+    }
+
+    String tableDatas = getTableFileContents("table1");
+
+    String expected = "1|1|17.0\n" +
+        "1|1|36.0\n" +
+        "2|2|38.0\n" +
+        "3|2|45.0\n" +
+        "3|3|49.0\n" +
+        "1|1|17.0\n" +
+        "1|1|36.0\n" +
+        "2|2|38.0\n" +
+        "3|2|45.0\n" +
+        "3|3|49.0\n";
+
+    assertNotNull(tableDatas);
+    assertEquals(expected, tableDatas);
+
+    executeString("DROP TABLE table1 PURGE");
+  }
+
+  @Test
+  public final void testInsertIntoLocation() throws Exception {
+    FileSystem fs = null;
+    Path path = new Path("/tajo-data/testInsertIntoLocation");
+    try {
+      executeString("insert into location '" + path + "' select l_orderkey, l_partkey, l_linenumber
from default.lineitem").close();
+
+      String resultFileData = getTableFileContents(path);
+      String expected = "1|1|1\n" +
+          "1|1|2\n" +
+          "2|2|1\n" +
+          "3|2|1\n" +
+          "3|3|2\n";
+
+      assertEquals(expected, resultFileData);
+
+      fs = path.getFileSystem(testingCluster.getConfiguration());
+
+      FileStatus[] files = fs.listStatus(path);
+      assertNotNull(files);
+      assertEquals(1, files.length);
+
+      for (FileStatus eachFileStatus : files) {
+        String name = eachFileStatus.getPath().getName();
+        assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*"));
+      }
+
+      executeString("insert into location '" + path + "' select l_orderkey, l_partkey, l_linenumber
from default.lineitem").close();
+      resultFileData = getTableFileContents(path);
+      expected = "1|1|1\n" +
+          "1|1|2\n" +
+          "2|2|1\n" +
+          "3|2|1\n" +
+          "3|3|2\n";
+
+      assertEquals(expected + expected, resultFileData);
+
+      files = fs.listStatus(path);
+      assertNotNull(files);
+      assertEquals(2, files.length);
+
+      for (FileStatus eachFileStatus : files) {
+        String name = eachFileStatus.getPath().getName();
+        assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*"));
+      }
+    } finally {
+      if (fs != null) {
+        fs.delete(path, true);
+      }
+    }
+  }
+
+  @Test
+  public final void testInsertIntoPartitionedTable() throws Exception {
+    String tableName = CatalogUtil.normalizeIdentifier("testInsertIntoPartitionedTable");
+    executeString("create table " + tableName + " (n_name TEXT, n_regionkey INT4)" +
+        "USING csv PARTITION by column(n_nationkey INT4)" ).close();
+
+    try {
+      executeString("insert into " + tableName + " select n_name, n_regionkey, n_nationkey
from default.nation").close();
+
+      ResultSet res = executeString("select * from " + tableName);
+
+      String expected = "n_name,n_regionkey,n_nationkey\n" +
+          "-------------------------------\n" +
+          "ALGERIA,0,0\n" +
+          "ARGENTINA,1,1\n" +
+          "IRAN,4,10\n" +
+          "IRAQ,4,11\n" +
+          "JAPAN,2,12\n" +
+          "JORDAN,4,13\n" +
+          "KENYA,0,14\n" +
+          "MOROCCO,0,15\n" +
+          "MOZAMBIQUE,0,16\n" +
+          "PERU,1,17\n" +
+          "CHINA,2,18\n" +
+          "ROMANIA,3,19\n" +
+          "BRAZIL,1,2\n" +
+          "SAUDI ARABIA,4,20\n" +
+          "VIETNAM,2,21\n" +
+          "RUSSIA,3,22\n" +
+          "UNITED KINGDOM,3,23\n" +
+          "UNITED STATES,1,24\n" +
+          "CANADA,1,3\n" +
+          "EGYPT,4,4\n" +
+          "ETHIOPIA,0,5\n" +
+          "FRANCE,3,6\n" +
+          "GERMANY,3,7\n" +
+          "INDIA,2,8\n" +
+          "INDONESIA,2,9\n";
+
+      assertEquals(expected, resultSetToString(res));
+      res.close();
+
+      executeString("insert into " + tableName + " select n_name, n_regionkey, n_nationkey
from default.nation").close();
+      res = executeString("select * from " + tableName);
+      expected = "n_name,n_regionkey,n_nationkey\n" +
+          "-------------------------------\n" +
+          "ALGERIA,0,0\n" +
+          "ALGERIA,0,0\n" +
+          "ARGENTINA,1,1\n" +
+          "ARGENTINA,1,1\n" +
+          "IRAN,4,10\n" +
+          "IRAN,4,10\n" +
+          "IRAQ,4,11\n" +
+          "IRAQ,4,11\n" +
+          "JAPAN,2,12\n" +
+          "JAPAN,2,12\n" +
+          "JORDAN,4,13\n" +
+          "JORDAN,4,13\n" +
+          "KENYA,0,14\n" +
+          "KENYA,0,14\n" +
+          "MOROCCO,0,15\n" +
+          "MOROCCO,0,15\n" +
+          "MOZAMBIQUE,0,16\n" +
+          "MOZAMBIQUE,0,16\n" +
+          "PERU,1,17\n" +
+          "PERU,1,17\n" +
+          "CHINA,2,18\n" +
+          "CHINA,2,18\n" +
+          "ROMANIA,3,19\n" +
+          "ROMANIA,3,19\n" +
+          "BRAZIL,1,2\n" +
+          "BRAZIL,1,2\n" +
+          "SAUDI ARABIA,4,20\n" +
+          "SAUDI ARABIA,4,20\n" +
+          "VIETNAM,2,21\n" +
+          "VIETNAM,2,21\n" +
+          "RUSSIA,3,22\n" +
+          "RUSSIA,3,22\n" +
+          "UNITED KINGDOM,3,23\n" +
+          "UNITED KINGDOM,3,23\n" +
+          "UNITED STATES,1,24\n" +
+          "UNITED STATES,1,24\n" +
+          "CANADA,1,3\n" +
+          "CANADA,1,3\n" +
+          "EGYPT,4,4\n" +
+          "EGYPT,4,4\n" +
+          "ETHIOPIA,0,5\n" +
+          "ETHIOPIA,0,5\n" +
+          "FRANCE,3,6\n" +
+          "FRANCE,3,6\n" +
+          "GERMANY,3,7\n" +
+          "GERMANY,3,7\n" +
+          "INDIA,2,8\n" +
+          "INDIA,2,8\n" +
+          "INDONESIA,2,9\n" +
+          "INDONESIA,2,9\n";
+
+      assertEquals(expected, resultSetToString(res));
+
+      TableDesc tableDesc = testingCluster.getMaster().getCatalog().getTableDesc(getCurrentDatabase(),
tableName);
+      assertNotNull(tableDesc);
+
+      Path path = tableDesc.getPath();
+      FileSystem fs = path.getFileSystem(testingCluster.getConfiguration());
+
+      FileStatus[] files = fs.listStatus(path);
+      assertNotNull(files);
+      assertEquals(25, files.length);
+
+      for (FileStatus eachFileStatus: files) {
+        assertTrue(eachFileStatus.getPath().getName().indexOf("n_nationkey=") == 0);
+        FileStatus[] dataFiles = fs.listStatus(eachFileStatus.getPath());
+        assertEquals(2, dataFiles.length);
+        for (FileStatus eachDataFileStatus: dataFiles) {
+          String name = eachDataFileStatus.getPath().getName();
+          assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*"));
+        }
+      }
+    } finally {
+      executeString("DROP TABLE " + tableName + " PURGE");
+    }
+  }
+
+  @Test
   public final void testInsertOverwriteSmallerColumns() throws Exception {
     ResultSet res = executeFile("table1_ddl.sql");
     res.close();

http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index 9db8e41..d9aea53 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -47,11 +47,9 @@ import java.sql.ResultSet;
 import java.util.List;
 import java.util.Map;
 
-import static junit.framework.TestCase.*;
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.SCATTERED_HASH_SHUFFLE;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.*;
 
 public class TestTablePartitions extends QueryTestCaseBase {
 
@@ -354,6 +352,131 @@ public class TestTablePartitions extends QueryTestCaseBase {
   }
 
   @Test
+  public final void testInsertIntoColumnPartitionedTableByThreeColumns() throws Exception
{
+    String tableName = CatalogUtil.normalizeIdentifier("testInsertIntoColumnPartitionedTableByThreeColumns");
+    ResultSet res = testBase.execute(
+        "create table " + tableName + " (col4 text) partition by column(col1 int4, col2 int4,
col3 float8) ");
+    res.close();
+    TajoTestingCluster cluster = testBase.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+
+    res = executeString("insert into " + tableName
+        + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+    Path path = desc.getPath();
+
+    FileSystem fs = FileSystem.get(conf);
+    assertTrue(fs.isDirectory(path));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
+    if (!testingCluster.isHCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+
+    res = executeString("select * from " + tableName + " where col2 = 2");
+
+    Map<Double, int []> resultRows1 = Maps.newHashMap();
+    resultRows1.put(45.0d, new int[]{3, 2});
+    resultRows1.put(38.0d, new int[]{2, 2});
+
+    for (int i = 0; i < 2; i++) {
+      assertTrue(res.next());
+      assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(2));
+      assertEquals(resultRows1.get(res.getDouble(4))[1], res.getInt(3));
+    }
+    res.close();
+
+
+    Map<Double, int []> resultRows2 = Maps.newHashMap();
+    resultRows2.put(49.0d, new int[]{3, 3});
+    resultRows2.put(45.0d, new int[]{3, 2});
+    resultRows2.put(38.0d, new int[]{2, 2});
+
+    res = executeString("select * from " + tableName + " where (col1 = 2 or col1 = 3) and
col2 >= 2");
+
+    for (int i = 0; i < 3; i++) {
+      assertTrue(res.next());
+      assertEquals(resultRows2.get(res.getDouble(4))[0], res.getInt(2));
+      assertEquals(resultRows2.get(res.getDouble(4))[1], res.getInt(3));
+    }
+    res.close();
+
+    // insert into already exists partitioned table
+    res = executeString("insert into " + tableName
+        + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
+    res.close();
+
+    desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+    path = desc.getPath();
+
+    assertTrue(fs.isDirectory(path));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
+
+    if (!testingCluster.isHCatalogStoreRunning()) {
+      assertEquals(5, desc.getStats().getNumRows().intValue());
+    }
+    String expected = "N\n" +
+        "N\n" +
+        "N\n" +
+        "N\n" +
+        "N\n" +
+        "N\n" +
+        "R\n" +
+        "R\n" +
+        "R\n" +
+        "R\n";
+
+    String tableData = getTableFileContents(desc.getPath());
+    assertEquals(expected, tableData);
+
+    res = executeString("select * from " + tableName + " where col2 = 2");
+    String resultSetData = resultSetToString(res);
+    res.close();
+    expected = "col4,col1,col2,col3\n" +
+        "-------------------------------\n" +
+        "N,2,2,38.0\n" +
+        "N,2,2,38.0\n" +
+        "R,3,2,45.0\n" +
+        "R,3,2,45.0\n";
+    assertEquals(expected, resultSetData);
+
+    res = executeString("select * from " + tableName + " where (col1 = 2 or col1 = 3) and
col2 >= 2");
+    resultSetData = resultSetToString(res);
+    res.close();
+    expected = "col4,col1,col2,col3\n" +
+        "-------------------------------\n" +
+        "N,2,2,38.0\n" +
+        "N,2,2,38.0\n" +
+        "R,3,2,45.0\n" +
+        "R,3,2,45.0\n" +
+        "R,3,3,49.0\n" +
+        "R,3,3,49.0\n";
+    assertEquals(expected, resultSetData);
+  }
+
+  @Test
   public final void testColumnPartitionedTableByOneColumnsWithCompression() throws Exception
{
     String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByOneColumnsWithCompression");
     ResultSet res = executeString(

http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/test/resources/queries/TestInsertQuery/testInsertInto.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestInsertQuery/testInsertInto.sql b/tajo-core/src/test/resources/queries/TestInsertQuery/testInsertInto.sql
new file mode 100644
index 0000000..9656694
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestInsertQuery/testInsertInto.sql
@@ -0,0 +1 @@
+insert into table1 select l_orderkey, l_partkey, l_quantity from default.lineitem;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
index c4a9744..d11dc09 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
@@ -20,6 +20,7 @@ package org.apache.tajo.storage;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.*;
@@ -29,6 +30,8 @@ import org.apache.tajo.util.KeyValueSet;
 import parquet.hadoop.ParquetOutputFormat;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 public class StorageUtil extends StorageConstants{
   public static int getRowByteSize(Schema schema) {
@@ -120,4 +123,66 @@ public class StorageUtil extends StorageConstants{
 
     return options;
   }
+
+  static final String fileNamePatternV08 = "part-[0-9]*-[0-9]*";
+  static final String fileNamePatternV09 = "part-[0-9]*-[0-9]*-[0-9]*";
+
+  /**
+   * Written files can be one of two forms: "part-[0-9]*-[0-9]*" or "part-[0-9]*-[0-9]*-[0-9]*".
+   *
+   * This method finds the maximum sequence number from existing data files through the above
patterns.
+   * If it cannot find any matched file or the maximum number, it will return -1.
+   *
+   * @param fs
+   * @param path
+   * @param recursive
+   * @return The maximum sequence number
+   * @throws IOException
+   */
+  public static int getMaxFileSequence(FileSystem fs, Path path, boolean recursive) throws
IOException {
+    if (!fs.isDirectory(path)) {
+      return -1;
+    }
+
+    FileStatus[] files = fs.listStatus(path);
+
+    if (files == null || files.length == 0) {
+      return -1;
+    }
+
+    int maxValue = -1;
+    List<Path> fileNamePatternMatchedList = new ArrayList<Path>();
+
+    for (FileStatus eachFile: files) {
+      // In the case of partition table, return largest value within all partition dirs.
+      if (eachFile.isDirectory() && recursive) {
+        int value = getMaxFileSequence(fs, eachFile.getPath(), recursive);
+        if (value > maxValue) {
+          maxValue = value;
+        }
+      } else {
+        if (eachFile.getPath().getName().matches(fileNamePatternV08) ||
+            eachFile.getPath().getName().matches(fileNamePatternV09)) {
+          fileNamePatternMatchedList.add(eachFile.getPath());
+        }
+      }
+    }
+
+    if (fileNamePatternMatchedList.isEmpty()) {
+      return maxValue;
+    }
+    Path lastFile = fileNamePatternMatchedList.get(fileNamePatternMatchedList.size() - 1);
+    String pathName = lastFile.getName();
+
+    // 0.8: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq>
+    // 0.9: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq>-<Sequence>
+    String[] pathTokens = pathName.split("-");
+    if (pathTokens.length == 3) {
+      return -1;
+    } else if(pathTokens.length == 4) {
+      return Integer.parseInt(pathTokens[3]);
+    } else {
+      return -1;
+    }
+  }
 }


Mime
View raw message