Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EA82C11EEA for ; Mon, 21 Jul 2014 01:02:08 +0000 (UTC) Received: (qmail 61654 invoked by uid 500); 21 Jul 2014 01:02:08 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 61573 invoked by uid 500); 21 Jul 2014 01:02:08 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 61522 invoked by uid 99); 21 Jul 2014 01:02:08 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Jul 2014 01:02:08 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 27DED9A6370; Mon, 21 Jul 2014 01:02:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jihoonson@apache.org To: commits@tajo.apache.org Date: Mon, 21 Jul 2014 01:02:08 -0000 Message-Id: <2d9cd9dcc5094a34890b8b74b4da0815@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/9] git commit: TAJO-20: INSERT INTO ... SELECT. (Hyoungjun Kim via hyunsik) Repository: tajo Updated Branches: refs/heads/index_support e9207f3b3 -> 334ea2ef9 TAJO-20: INSERT INTO ... SELECT. (Hyoungjun Kim via hyunsik) Closes #72 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/499d1081 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/499d1081 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/499d1081 Branch: refs/heads/index_support Commit: 499d1081b66e0a785dce82430e90bce9d39caa7e Parents: 9cf1071 Author: Hyunsik Choi Authored: Tue Jul 15 17:37:12 2014 +0900 Committer: Hyunsik Choi Committed: Tue Jul 15 17:38:02 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../engine/planner/PreLogicalPlanVerifier.java | 3 - .../planner/physical/ColPartitionStoreExec.java | 33 ++- .../HashBasedColPartitionStoreExec.java | 24 +- .../SortBasedColPartitionStoreExec.java | 25 +- .../org/apache/tajo/master/GlobalEngine.java | 3 + .../apache/tajo/master/querymaster/Query.java | 145 +++++++++++- .../master/querymaster/QueryMasterTask.java | 14 -- .../main/java/org/apache/tajo/worker/Task.java | 14 +- .../java/org/apache/tajo/QueryTestCaseBase.java | 31 ++- .../tajo/engine/query/TestInsertQuery.java | 227 +++++++++++++++++++ .../tajo/engine/query/TestTablePartitions.java | 129 ++++++++++- .../queries/TestInsertQuery/testInsertInto.sql | 1 + .../org/apache/tajo/storage/StorageUtil.java | 65 ++++++ 14 files changed, 641 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 4aeb20c..4146fee 100644 --- a/CHANGES +++ b/CHANGES @@ -4,6 +4,8 @@ Release 0.9.0 - unreleased NEW FEATURES + TAJO-20: INSERT INTO ... SELECT. (Hyoungjun Kim via hyunsik) + TAJO-774: Implement logical plan part and physical executor for window function. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java index f744cf6..2d6c095 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java @@ -225,9 +225,6 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor stack, Insert expr) throws PlanningException { Expr child = super.visitInsert(context, stack, expr); - if (!expr.isOverwrite()) { - context.state.addVerification("INSERT INTO statement is not supported yet."); - } if (expr.hasTableName()) { assertRelationExistence(context, expr.getTableName()); http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java index 12eb0e0..d292437 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java @@ -19,6 +19,9 @@ package org.apache.tajo.engine.planner.physical; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.CatalogUtil; @@ -29,12 +32,17 @@ import org.apache.tajo.engine.planner.logical.CreateTableNode; import org.apache.tajo.engine.planner.logical.InsertNode; import org.apache.tajo.engine.planner.logical.NodeType; import org.apache.tajo.engine.planner.logical.StoreTableNode; +import org.apache.tajo.storage.Appender; +import org.apache.tajo.storage.StorageManagerFactory; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; +import java.text.NumberFormat; public abstract class ColPartitionStoreExec extends UnaryPhysicalExec { + private static Log LOG = LogFactory.getLog(ColPartitionStoreExec.class); + protected final TableMeta meta; protected final StoreTableNode plan; protected Path storeTablePath; @@ -100,8 +108,31 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec { } } - protected Path getDataFile(String partition) { return StorageUtil.concatPath(storeTablePath.getParent(), partition, storeTablePath.getName()); } + + protected Appender makeAppender(String partition) throws IOException { + Path dataFile = getDataFile(partition); + FileSystem fs = dataFile.getFileSystem(context.getConf()); + + if (fs.exists(dataFile.getParent())) { + LOG.info("Path " + dataFile.getParent() + " already exists!"); + } else { + fs.mkdirs(dataFile.getParent()); + LOG.info("Add subpartition path directory :" + dataFile.getParent()); + } + + if (fs.exists(dataFile)) { + LOG.info("File " + dataFile + " already exists!"); + FileStatus status = fs.getFileStatus(dataFile); + LOG.info("File size: " + status.getLen()); + } + + Appender appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, dataFile); + appender.enableStats(); + appender.init(); + + return appender; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java index df32d0b..6cef22e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java @@ -20,15 +20,11 @@ package org.apache.tajo.engine.planner.physical; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.datum.Datum; import org.apache.tajo.engine.planner.logical.StoreTableNode; import org.apache.tajo.storage.Appender; -import org.apache.tajo.storage.StorageManagerFactory; import org.apache.tajo.storage.Tuple; import org.apache.tajo.worker.TaskAttemptContext; @@ -59,25 +55,7 @@ public class HashBasedColPartitionStoreExec extends ColPartitionStoreExec { Appender appender = appenderMap.get(partition); if (appender == null) { - Path dataFile = getDataFile(partition); - FileSystem fs = dataFile.getFileSystem(context.getConf()); - - if (fs.exists(dataFile.getParent())) { - LOG.info("Path " + dataFile.getParent() + " already exists!"); - } else { - fs.mkdirs(dataFile.getParent()); - LOG.info("Add subpartition path directory :" + dataFile.getParent()); - } - - if (fs.exists(dataFile)) { - LOG.info("File " + dataFile + " already exists!"); - FileStatus status = fs.getFileStatus(dataFile); - LOG.info("File size: " + status.getLen()); - } - - appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, dataFile); - appender.enableStats(); - appender.init(); + appender = makeAppender(partition); appenderMap.put(partition, appender); } else { appender = appenderMap.get(partition); http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java index 8c55d7f..d09e296 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java @@ -23,15 +23,11 @@ package org.apache.tajo.engine.planner.physical; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.datum.Datum; import org.apache.tajo.engine.planner.logical.StoreTableNode; import org.apache.tajo.storage.Appender; -import org.apache.tajo.storage.StorageManagerFactory; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; import org.apache.tajo.worker.TaskAttemptContext; @@ -64,26 +60,7 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec { } private Appender getAppender(String partition) throws IOException { - Path dataFile = getDataFile(partition); - FileSystem fs = dataFile.getFileSystem(context.getConf()); - - if (fs.exists(dataFile.getParent())) { - LOG.info("Path " + dataFile.getParent() + " already exists!"); - } else { - fs.mkdirs(dataFile.getParent()); - LOG.info("Add subpartition path directory :" + dataFile.getParent()); - } - - if (fs.exists(dataFile)) { - LOG.info("File " + dataFile + " already exists!"); - FileStatus status = fs.getFileStatus(dataFile); - LOG.info("File size: " + status.getLen()); - } - - appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, dataFile); - appender.enableStats(); - appender.init(); - + this.appender = makeAppender(partition); return appender; } http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index 94c4f25..73f3cf5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -904,6 +904,9 @@ public class GlobalEngine extends AbstractService { if (insertNode.hasTargetTable()) { // INSERT INTO [TB_NAME] queryContext.setOutputTable(insertNode.getTableName()); queryContext.setOutputPath(insertNode.getPath()); + if (insertNode.hasPartition()) { + queryContext.setPartitionMethod(insertNode.getPartitionMethod()); + } } else { // INSERT INTO LOCATION ... // When INSERT INTO LOCATION, must not set output table. outputPath = insertNode.getPath(); http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index 31199ba..8bb3dde 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -23,6 +23,7 @@ import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.event.EventHandler; @@ -49,9 +50,11 @@ import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.master.event.*; import org.apache.tajo.storage.AbstractStorageManager; import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.TUtil; import java.io.IOException; +import java.text.NumberFormat; import java.util.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -437,10 +440,43 @@ public class Query implements EventHandler { } } } else { - fs.rename(stagingResultDir, finalOutputDir); - LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); + NodeType queryType = queryContext.getCommandType(); + + if (queryType == NodeType.INSERT) { // INSERT INTO an existing table + + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(3); + + if (queryContext.hasPartition()) { + for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { + if (eachFile.isFile()) { + LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath()); + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1); + } + } else { + int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; + for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++); + } + } + // checking all file moved and remove empty dir + verifyAllFileMoved(fs, stagingResultDir); + FileStatus[] files = fs.listStatus(stagingResultDir); + if (files != null && files.length != 0) { + for (FileStatus eachFile: files) { + LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); + } + } + } else { // CREATE TABLE AS SELECT (CTAS) + fs.rename(stagingResultDir, finalOutputDir); + LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); + } } } catch (IOException e) { + // TODO report to client e.printStackTrace(); } } else { @@ -450,6 +486,111 @@ public class Query implements EventHandler { return finalOutputDir; } + private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException { + FileStatus[] files = fs.listStatus(stagingPath); + if (files != null && files.length != 0) { + for (FileStatus eachFile: files) { + if (eachFile.isFile()) { + LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); + return false; + } else { + if (verifyAllFileMoved(fs, eachFile.getPath())) { + fs.delete(eachFile.getPath(), false); + } else { + return false; + } + } + } + } + + return true; + } + + /** + * Attach the sequence number to a path. + * + * @param path Path + * @param seq sequence number + * @param nf Number format + * @return New path attached with sequence number + * @throws IOException + */ + private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException { + String[] tokens = path.getName().split("-"); + if (tokens.length != 4) { + throw new IOException("Wrong result file name:" + path); + } + return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq); + } + + /** + * Attach the sequence number to the output file name and than move the file into the final result path. + * + * @param fs FileSystem + * @param stagingResultDir The staging result dir + * @param fileStatus The file status + * @param finalOutputPath Final output path + * @param nf Number format + * @param fileSeq The sequence number + * @throws IOException + */ + private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, + FileStatus fileStatus, Path finalOutputPath, + NumberFormat nf, + int fileSeq) throws IOException { + if (fileStatus.isDirectory()) { + String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); + if (subPath != null) { + Path finalSubPath = new Path(finalOutputPath, subPath); + if (!fs.exists(finalSubPath)) { + fs.mkdirs(finalSubPath); + } + int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false); + for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) { + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq); + } + } else { + throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath()); + } + } else { + String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); + if (subPath != null) { + Path finalSubPath = new Path(finalOutputPath, subPath); + finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf)); + if (!fs.exists(finalSubPath.getParent())) { + fs.mkdirs(finalSubPath.getParent()); + } + if (fs.exists(finalSubPath)) { + throw new IOException("Already exists data file:" + finalSubPath); + } + boolean success = fs.rename(fileStatus.getPath(), finalSubPath); + if (success) { + LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " + + "to final output[" + finalSubPath + "]"); + } else { + LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " + + "to final output[" + finalSubPath + "]"); + } + } + } + } + + private String extractSubPath(Path parentPath, Path childPath) { + String parentPathStr = parentPath.toUri().getPath(); + String childPathStr = childPath.toUri().getPath(); + + if (parentPathStr.length() > childPathStr.length()) { + return null; + } + + int index = childPathStr.indexOf(parentPathStr); + if (index != 0) { + return null; + } + + return childPathStr.substring(parentPathStr.length() + 1); + } + private static interface QueryHook { boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir); void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query, http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index 0061717..071e5d4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -368,7 +368,6 @@ public class QueryMasterTask extends CompositeService { private void initStagingDir() throws IOException { Path stagingDir = null; - Path outputDir = null; FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf); try { @@ -378,20 +377,7 @@ public class QueryMasterTask extends CompositeService { // Create a subdirectories LOG.info("The staging dir '" + stagingDir + "' is created."); - queryContext.setStagingDir(stagingDir); - - ///////////////////////////////////////////////// - // Check and Create Output Directory If Necessary - ///////////////////////////////////////////////// - if (queryContext.hasOutputPath()) { - outputDir = queryContext.getOutputPath(); - if (!queryContext.isOutputOverwrite()) { - if (defaultFS.exists(outputDir)) { - throw new IOException("The output directory '" + outputDir + " already exists."); - } - } - } } catch (IOException ioe) { if (stagingDir != null && defaultFS.exists(stagingDir)) { try { http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index c3f3827..230c63a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -121,6 +121,17 @@ public class Task { } }; + static final ThreadLocal OUTPUT_FILE_FORMAT_SEQ = + new ThreadLocal() { + @Override + public NumberFormat initialValue() { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(3); + return fmt; + } + }; + public Task(QueryUnitAttemptId taskId, final TaskRunner.TaskRunnerContext worker, final QueryMasterProtocolService.Interface masterProxy, @@ -179,7 +190,8 @@ public class Task { Path outFilePath = StorageUtil.concatPath(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME, OUTPUT_FILE_PREFIX + OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskId.getQueryUnitId().getExecutionBlockId().getId()) + "-" + - OUTPUT_FILE_FORMAT_TASK.get().format(taskId.getQueryUnitId().getId())); + OUTPUT_FILE_FORMAT_TASK.get().format(taskId.getQueryUnitId().getId()) + "-" + + OUTPUT_FILE_FORMAT_SEQ.get().format(0)); LOG.info("Output File Path: " + outFilePath); context.setOutputPath(outFilePath); } http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java index 29613cb..9d15732 100644 --- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java @@ -574,14 +574,24 @@ public class QueryTestCaseBase { public String getTableFileContents(Path path) throws Exception { FileSystem fs = path.getFileSystem(conf); + FileStatus[] files = fs.listStatus(path); + + if (files == null || files.length == 0) { + return ""; + } + StringBuilder sb = new StringBuilder(); + byte[] buf = new byte[1024]; - List paths = listFiles(fs, path); - for (Path eachPath: paths) { - InputStream in = fs.open(eachPath); + for (FileStatus file: files) { + if (file.isDirectory()) { + sb.append(getTableFileContents(file.getPath())); + continue; + } + + InputStream in = fs.open(file.getPath()); try { while (true) { - byte[] buf = new byte[1024]; int readBytes = in.read(buf); if (readBytes <= 0) { break; @@ -593,6 +603,7 @@ public class QueryTestCaseBase { in.close(); } } + return sb.toString(); } @@ -612,6 +623,18 @@ public class QueryTestCaseBase { return getTableFileContents(path); } + public List listTableFiles(String tableName) throws Exception { + TableDesc tableDesc = testingCluster.getMaster().getCatalog().getTableDesc(getCurrentDatabase(), tableName); + if (tableDesc == null) { + return null; + } + + Path path = tableDesc.getPath(); + FileSystem fs = path.getFileSystem(conf); + + return listFiles(fs, path); + } + private List listFiles(FileSystem fs, Path path) throws Exception { List result = new ArrayList(); FileStatus[] files = fs.listStatus(path); http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java index 822bf51..4b48182 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java @@ -35,8 +35,10 @@ import org.junit.experimental.categories.Category; import java.io.BufferedReader; import java.io.InputStreamReader; import java.sql.ResultSet; +import java.util.List; import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; @Category(IntegrationTest.class) public class TestInsertQuery extends QueryTestCaseBase { @@ -61,6 +63,231 @@ public class TestInsertQuery extends QueryTestCaseBase { } @Test + public final void testInsertInto() throws Exception { + // create table and upload test data + ResultSet res = executeFile("table1_ddl.sql"); + res.close(); + + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), "table1")); + + res = executeFile("testInsertOverwrite.sql"); + res.close(); + + TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), "table1"); + if (!testingCluster.isHCatalogStoreRunning()) { + assertEquals(5, desc.getStats().getNumRows().intValue()); + } + + res = executeFile("testInsertInto.sql"); + res.close(); + + List dataFiles = listTableFiles("table1"); + assertEquals(2, dataFiles.size()); + + for (int i = 0; i < dataFiles.size(); i++) { + String name = dataFiles.get(i).getName(); + assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*")); + String[] tokens = name.split("-"); + assertEquals(4, tokens.length); + assertEquals(i, Integer.parseInt(tokens[3])); + } + + String tableDatas = getTableFileContents("table1"); + + String expected = "1|1|17.0\n" + + "1|1|36.0\n" + + "2|2|38.0\n" + + "3|2|45.0\n" + + "3|3|49.0\n" + + "1|1|17.0\n" + + "1|1|36.0\n" + + "2|2|38.0\n" + + "3|2|45.0\n" + + "3|3|49.0\n"; + + assertNotNull(tableDatas); + assertEquals(expected, tableDatas); + + executeString("DROP TABLE table1 PURGE"); + } + + @Test + public final void testInsertIntoLocation() throws Exception { + FileSystem fs = null; + Path path = new Path("/tajo-data/testInsertIntoLocation"); + try { + executeString("insert into location '" + path + "' select l_orderkey, l_partkey, l_linenumber from default.lineitem").close(); + + String resultFileData = getTableFileContents(path); + String expected = "1|1|1\n" + + "1|1|2\n" + + "2|2|1\n" + + "3|2|1\n" + + "3|3|2\n"; + + assertEquals(expected, resultFileData); + + fs = path.getFileSystem(testingCluster.getConfiguration()); + + FileStatus[] files = fs.listStatus(path); + assertNotNull(files); + assertEquals(1, files.length); + + for (FileStatus eachFileStatus : files) { + String name = eachFileStatus.getPath().getName(); + assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*")); + } + + executeString("insert into location '" + path + "' select l_orderkey, l_partkey, l_linenumber from default.lineitem").close(); + resultFileData = getTableFileContents(path); + expected = "1|1|1\n" + + "1|1|2\n" + + "2|2|1\n" + + "3|2|1\n" + + "3|3|2\n"; + + assertEquals(expected + expected, resultFileData); + + files = fs.listStatus(path); + assertNotNull(files); + assertEquals(2, files.length); + + for (FileStatus eachFileStatus : files) { + String name = eachFileStatus.getPath().getName(); + assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*")); + } + } finally { + if (fs != null) { + fs.delete(path, true); + } + } + } + + @Test + public final void testInsertIntoPartitionedTable() throws Exception { + String tableName = CatalogUtil.normalizeIdentifier("testInsertIntoPartitionedTable"); + executeString("create table " + tableName + " (n_name TEXT, n_regionkey INT4)" + + "USING csv PARTITION by column(n_nationkey INT4)" ).close(); + + try { + executeString("insert into " + tableName + " select n_name, n_regionkey, n_nationkey from default.nation").close(); + + ResultSet res = executeString("select * from " + tableName); + + String expected = "n_name,n_regionkey,n_nationkey\n" + + "-------------------------------\n" + + "ALGERIA,0,0\n" + + "ARGENTINA,1,1\n" + + "IRAN,4,10\n" + + "IRAQ,4,11\n" + + "JAPAN,2,12\n" + + "JORDAN,4,13\n" + + "KENYA,0,14\n" + + "MOROCCO,0,15\n" + + "MOZAMBIQUE,0,16\n" + + "PERU,1,17\n" + + "CHINA,2,18\n" + + "ROMANIA,3,19\n" + + "BRAZIL,1,2\n" + + "SAUDI ARABIA,4,20\n" + + "VIETNAM,2,21\n" + + "RUSSIA,3,22\n" + + "UNITED KINGDOM,3,23\n" + + "UNITED STATES,1,24\n" + + "CANADA,1,3\n" + + "EGYPT,4,4\n" + + "ETHIOPIA,0,5\n" + + "FRANCE,3,6\n" + + "GERMANY,3,7\n" + + "INDIA,2,8\n" + + "INDONESIA,2,9\n"; + + assertEquals(expected, resultSetToString(res)); + res.close(); + + executeString("insert into " + tableName + " select n_name, n_regionkey, n_nationkey from default.nation").close(); + res = executeString("select * from " + tableName); + expected = "n_name,n_regionkey,n_nationkey\n" + + "-------------------------------\n" + + "ALGERIA,0,0\n" + + "ALGERIA,0,0\n" + + "ARGENTINA,1,1\n" + + "ARGENTINA,1,1\n" + + "IRAN,4,10\n" + + "IRAN,4,10\n" + + "IRAQ,4,11\n" + + "IRAQ,4,11\n" + + "JAPAN,2,12\n" + + "JAPAN,2,12\n" + + "JORDAN,4,13\n" + + "JORDAN,4,13\n" + + "KENYA,0,14\n" + + "KENYA,0,14\n" + + "MOROCCO,0,15\n" + + "MOROCCO,0,15\n" + + "MOZAMBIQUE,0,16\n" + + "MOZAMBIQUE,0,16\n" + + "PERU,1,17\n" + + "PERU,1,17\n" + + "CHINA,2,18\n" + + "CHINA,2,18\n" + + "ROMANIA,3,19\n" + + "ROMANIA,3,19\n" + + "BRAZIL,1,2\n" + + "BRAZIL,1,2\n" + + "SAUDI ARABIA,4,20\n" + + "SAUDI ARABIA,4,20\n" + + "VIETNAM,2,21\n" + + "VIETNAM,2,21\n" + + "RUSSIA,3,22\n" + + "RUSSIA,3,22\n" + + "UNITED KINGDOM,3,23\n" + + "UNITED KINGDOM,3,23\n" + + "UNITED STATES,1,24\n" + + "UNITED STATES,1,24\n" + + "CANADA,1,3\n" + + "CANADA,1,3\n" + + "EGYPT,4,4\n" + + "EGYPT,4,4\n" + + "ETHIOPIA,0,5\n" + + "ETHIOPIA,0,5\n" + + "FRANCE,3,6\n" + + "FRANCE,3,6\n" + + "GERMANY,3,7\n" + + "GERMANY,3,7\n" + + "INDIA,2,8\n" + + "INDIA,2,8\n" + + "INDONESIA,2,9\n" + + "INDONESIA,2,9\n"; + + assertEquals(expected, resultSetToString(res)); + + TableDesc tableDesc = testingCluster.getMaster().getCatalog().getTableDesc(getCurrentDatabase(), tableName); + assertNotNull(tableDesc); + + Path path = tableDesc.getPath(); + FileSystem fs = path.getFileSystem(testingCluster.getConfiguration()); + + FileStatus[] files = fs.listStatus(path); + assertNotNull(files); + assertEquals(25, files.length); + + for (FileStatus eachFileStatus: files) { + assertTrue(eachFileStatus.getPath().getName().indexOf("n_nationkey=") == 0); + FileStatus[] dataFiles = fs.listStatus(eachFileStatus.getPath()); + assertEquals(2, dataFiles.length); + for (FileStatus eachDataFileStatus: dataFiles) { + String name = eachDataFileStatus.getPath().getName(); + assertTrue(name.matches("part-[0-9]*-[0-9]*-[0-9]*")); + } + } + } finally { + executeString("DROP TABLE " + tableName + " PURGE"); + } + } + + @Test public final void testInsertOverwriteSmallerColumns() throws Exception { ResultSet res = executeFile("table1_ddl.sql"); res.close(); http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index 9db8e41..d9aea53 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -47,11 +47,9 @@ import java.sql.ResultSet; import java.util.List; import java.util.Map; -import static junit.framework.TestCase.*; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.SCATTERED_HASH_SHUFFLE; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.*; public class TestTablePartitions extends QueryTestCaseBase { @@ -354,6 +352,131 @@ public class TestTablePartitions extends QueryTestCaseBase { } @Test + public final void testInsertIntoColumnPartitionedTableByThreeColumns() throws Exception { + String tableName = CatalogUtil.normalizeIdentifier("testInsertIntoColumnPartitionedTableByThreeColumns"); + ResultSet res = testBase.execute( + "create table " + tableName + " (col4 text) partition by column(col1 int4, col2 int4, col3 float8) "); + res.close(); + TajoTestingCluster cluster = testBase.getTestingCluster(); + CatalogService catalog = cluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + + res = executeString("insert into " + tableName + + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem"); + res.close(); + + TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + Path path = desc.getPath(); + + FileSystem fs = FileSystem.get(conf); + assertTrue(fs.isDirectory(path)); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0"))); + if (!testingCluster.isHCatalogStoreRunning()) { + assertEquals(5, desc.getStats().getNumRows().intValue()); + } + + res = executeString("select * from " + tableName + " where col2 = 2"); + + Map resultRows1 = Maps.newHashMap(); + resultRows1.put(45.0d, new int[]{3, 2}); + resultRows1.put(38.0d, new int[]{2, 2}); + + for (int i = 0; i < 2; i++) { + assertTrue(res.next()); + assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(2)); + assertEquals(resultRows1.get(res.getDouble(4))[1], res.getInt(3)); + } + res.close(); + + + Map resultRows2 = Maps.newHashMap(); + resultRows2.put(49.0d, new int[]{3, 3}); + resultRows2.put(45.0d, new int[]{3, 2}); + resultRows2.put(38.0d, new int[]{2, 2}); + + res = executeString("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2"); + + for (int i = 0; i < 3; i++) { + assertTrue(res.next()); + assertEquals(resultRows2.get(res.getDouble(4))[0], res.getInt(2)); + assertEquals(resultRows2.get(res.getDouble(4))[1], res.getInt(3)); + } + res.close(); + + // insert into already exists partitioned table + res = executeString("insert into " + tableName + + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem"); + res.close(); + + desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + path = desc.getPath(); + + assertTrue(fs.isDirectory(path)); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0"))); + + if (!testingCluster.isHCatalogStoreRunning()) { + assertEquals(5, desc.getStats().getNumRows().intValue()); + } + String expected = "N\n" + + "N\n" + + "N\n" + + "N\n" + + "N\n" + + "N\n" + + "R\n" + + "R\n" + + "R\n" + + "R\n"; + + String tableData = getTableFileContents(desc.getPath()); + assertEquals(expected, tableData); + + res = executeString("select * from " + tableName + " where col2 = 2"); + String resultSetData = resultSetToString(res); + res.close(); + expected = "col4,col1,col2,col3\n" + + "-------------------------------\n" + + "N,2,2,38.0\n" + + "N,2,2,38.0\n" + + "R,3,2,45.0\n" + + "R,3,2,45.0\n"; + assertEquals(expected, resultSetData); + + res = executeString("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2"); + resultSetData = resultSetToString(res); + res.close(); + expected = "col4,col1,col2,col3\n" + + "-------------------------------\n" + + "N,2,2,38.0\n" + + "N,2,2,38.0\n" + + "R,3,2,45.0\n" + + "R,3,2,45.0\n" + + "R,3,3,49.0\n" + + "R,3,3,49.0\n"; + assertEquals(expected, resultSetData); + } + + @Test public final void testColumnPartitionedTableByOneColumnsWithCompression() throws Exception { String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByOneColumnsWithCompression"); ResultSet res = executeString( http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-core/src/test/resources/queries/TestInsertQuery/testInsertInto.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestInsertQuery/testInsertInto.sql b/tajo-core/src/test/resources/queries/TestInsertQuery/testInsertInto.sql new file mode 100644 index 0000000..9656694 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestInsertQuery/testInsertInto.sql @@ -0,0 +1 @@ +insert into table1 select l_orderkey, l_partkey, l_quantity from default.lineitem; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/499d1081/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java index c4a9744..d11dc09 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java @@ -20,6 +20,7 @@ package org.apache.tajo.storage; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.*; @@ -29,6 +30,8 @@ import org.apache.tajo.util.KeyValueSet; import parquet.hadoop.ParquetOutputFormat; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; public class StorageUtil extends StorageConstants{ public static int getRowByteSize(Schema schema) { @@ -120,4 +123,66 @@ public class StorageUtil extends StorageConstants{ return options; } + + static final String fileNamePatternV08 = "part-[0-9]*-[0-9]*"; + static final String fileNamePatternV09 = "part-[0-9]*-[0-9]*-[0-9]*"; + + /** + * Written files can be one of two forms: "part-[0-9]*-[0-9]*" or "part-[0-9]*-[0-9]*-[0-9]*". + * + * This method finds the maximum sequence number from existing data files through the above patterns. + * If it cannot find any matched file or the maximum number, it will return -1. + * + * @param fs + * @param path + * @param recursive + * @return The maximum sequence number + * @throws IOException + */ + public static int getMaxFileSequence(FileSystem fs, Path path, boolean recursive) throws IOException { + if (!fs.isDirectory(path)) { + return -1; + } + + FileStatus[] files = fs.listStatus(path); + + if (files == null || files.length == 0) { + return -1; + } + + int maxValue = -1; + List fileNamePatternMatchedList = new ArrayList(); + + for (FileStatus eachFile: files) { + // In the case of partition table, return largest value within all partition dirs. + if (eachFile.isDirectory() && recursive) { + int value = getMaxFileSequence(fs, eachFile.getPath(), recursive); + if (value > maxValue) { + maxValue = value; + } + } else { + if (eachFile.getPath().getName().matches(fileNamePatternV08) || + eachFile.getPath().getName().matches(fileNamePatternV09)) { + fileNamePatternMatchedList.add(eachFile.getPath()); + } + } + } + + if (fileNamePatternMatchedList.isEmpty()) { + return maxValue; + } + Path lastFile = fileNamePatternMatchedList.get(fileNamePatternMatchedList.size() - 1); + String pathName = lastFile.getName(); + + // 0.8: pathName = part-- + // 0.9: pathName = part--- + String[] pathTokens = pathName.split("-"); + if (pathTokens.length == 3) { + return -1; + } else if(pathTokens.length == 4) { + return Integer.parseInt(pathTokens[3]); + } else { + return -1; + } + } }