tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hj...@apache.org
Subject [11/29] tajo git commit: TAJO-1211: Staging directory for CTAS and INSERT should be in the output dir.
Date Fri, 05 Dec 2014 08:21:15 GMT
TAJO-1211: Staging directory for CTAS and INSERT should be in the output dir.

Closes #274


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b4adc18c
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b4adc18c
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b4adc18c

Branch: refs/heads/hbase_storage
Commit: b4adc18cd25de550fe04a43ef69d715c146976db
Parents: bf68b77
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Mon Dec 1 17:23:35 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Mon Dec 1 17:23:35 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../java/org/apache/tajo/conf/TajoConf.java     | 12 +++-
 .../org/apache/tajo/master/GlobalEngine.java    |  7 +-
 .../java/org/apache/tajo/master/TajoMaster.java |  2 +-
 .../apache/tajo/master/querymaster/Query.java   | 74 +++++++++++++++-----
 .../master/querymaster/QueryMasterTask.java     | 23 ++++--
 .../src/main/resources/webapps/admin/index.jsp  |  2 +-
 .../apache/tajo/engine/query/TestCTASQuery.java |  1 +
 .../tajo/engine/query/TestInsertQuery.java      | 14 +++-
 9 files changed, 105 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/b4adc18c/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 33983bc..8d51d44 100644
--- a/CHANGES
+++ b/CHANGES
@@ -79,6 +79,9 @@ Release 0.9.1 - unreleased
 
   BUG FIXES
 
+    TAJO-1211: Staging directory for CTAS and INSERT should be in 
+    the output dir. (hyunsik)
+
     TAJO-1210: ByteBufLineReader does not handle the end of file, 
     if newline is not appeared. (jinho)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b4adc18c/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index f3ae453..312abfb 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -671,7 +671,15 @@ public class TajoConf extends Configuration {
     return path.indexOf("file:/") == 0 || path.indexOf("hdfs:/") == 0;
   }
 
-  public static Path getStagingDir(TajoConf conf) throws IOException {
+  /**
+   * It returns the default root staging directory used by queries without a target table
or
+   * a specified output directory. An example query is <pre>SELECT a,b,c FROM XXX;</pre>.
+   *
+   * @param conf TajoConf
+   * @return Path which points the default staging directory
+   * @throws IOException
+   */
+  public static Path getDefaultRootStagingDir(TajoConf conf) throws IOException {
     String stagingDirString = conf.getVar(ConfVars.STAGING_ROOT_DIR);
     if (!hasScheme(stagingDirString)) {
       Path warehousePath = getWarehouseDir(conf);
@@ -686,7 +694,7 @@ public class TajoConf extends Configuration {
   public static Path getQueryHistoryDir(TajoConf conf) throws IOException {
     String historyDirString = conf.getVar(ConfVars.HISTORY_QUERY_DIR);
     if (!hasScheme(historyDirString)) {
-      Path stagingPath = getStagingDir(conf);
+      Path stagingPath = getDefaultRootStagingDir(conf);
       FileSystem fs = stagingPath.getFileSystem(conf);
       Path path = new Path(fs.getUri().toString(), historyDirString);
       conf.setVar(ConfVars.HISTORY_QUERY_DIR, path.toString());

http://git-wip-us.apache.org/repos/asf/tajo/blob/b4adc18c/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 47a8750..9bf9a75 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -335,10 +335,8 @@ public class GlobalEngine extends AbstractService {
     String queryId = nodeUniqName + "_" + System.currentTimeMillis();
 
     FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf());
-    Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), fs, queryId.toString());
-
+    Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), queryId.toString(),
queryContext);
     Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
-    fs.mkdirs(stagingResultDir);
 
     TableDesc tableDesc = null;
     Path finalOutputDir = null;
@@ -349,8 +347,7 @@ public class GlobalEngine extends AbstractService {
       finalOutputDir = insertNode.getPath();
     }
 
-    TaskAttemptContext taskAttemptContext =
-        new TaskAttemptContext(queryContext, null, null, (CatalogProtos.FragmentProto[])
null, stagingDir);
+    TaskAttemptContext taskAttemptContext = new TaskAttemptContext(queryContext, null, null,
null, stagingDir);
     taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000"));
 
     EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());

http://git-wip-us.apache.org/repos/asf/tajo/blob/b4adc18c/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 6e585af..795983d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -271,7 +271,7 @@ public class TajoMaster extends CompositeService {
       LOG.info("Warehouse dir '" + wareHousePath + "' is created");
     }
 
-    Path stagingPath = TajoConf.getStagingDir(systemConf);
+    Path stagingPath = TajoConf.getDefaultRootStagingDir(systemConf);
     LOG.info("Staging dir: " + wareHousePath);
     if (!defaultFS.exists(stagingPath)) {
       defaultFS.mkdirs(stagingPath, new FsPermission(STAGING_ROOTDIR_PERMISSION));

http://git-wip-us.apache.org/repos/asf/tajo/blob/b4adc18c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 7db6d8b..07b47c1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -411,9 +411,9 @@ public class Query implements EventHandler<QueryEvent> {
     public QueryState transition(Query query, QueryEvent queryEvent) {
       QueryCompletedEvent subQueryEvent = (QueryCompletedEvent) queryEvent;
       QueryState finalState;
+
       if (subQueryEvent.getState() == SubQueryState.SUCCEEDED) {
-        finalizeQuery(query, subQueryEvent);
-        finalState = QueryState.QUERY_SUCCEEDED;
+        finalState = finalizeQuery(query, subQueryEvent);
       } else if (subQueryEvent.getState() == SubQueryState.FAILED) {
         finalState = QueryState.QUERY_FAILED;
       } else if (subQueryEvent.getState() == SubQueryState.KILLED) {
@@ -427,26 +427,28 @@ public class Query implements EventHandler<QueryEvent> {
       return finalState;
     }
 
-    private void finalizeQuery(Query query, QueryCompletedEvent event) {
+    private QueryState finalizeQuery(Query query, QueryCompletedEvent event) {
       MasterPlan masterPlan = query.getPlan();
 
       ExecutionBlock terminal = query.getPlan().getTerminalBlock();
       DataChannel finalChannel = masterPlan.getChannel(event.getExecutionBlockId(), terminal.getId());
-      Path finalOutputDir = commitOutputData(query);
 
       QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
       try {
-        hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(),
-            finalOutputDir);
-      } catch (Exception e) {
-        query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
+        Path finalOutputDir = commitOutputData(query);
+        hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(),
finalOutputDir);
+      } catch (Throwable t) {
+        query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(t)));
+        return QueryState.QUERY_ERROR;
       }
+
+      return QueryState.QUERY_SUCCEEDED;
     }
 
     /**
      * It moves a result data stored in a staging output dir into a final output dir.
      */
-    public Path commitOutputData(Query query) {
+    public Path commitOutputData(Query query) throws IOException {
       QueryContext queryContext = query.context.getQueryContext();
       Path stagingResultDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME);
       Path finalOutputDir;
@@ -508,24 +510,49 @@ public class Query implements EventHandler<QueryEvent> {
                   fs.delete(entry.getValue(), true);
                   fs.rename(entry.getValue(), entry.getKey());
                 }
+
                 throw new IOException(ioe.getMessage());
               }
-            } else {
+            } else { // no partition
               try {
+
+                // if the final output dir exists, move all contents to the temporary table
dir.
+                // Otherwise, just make the final output dir. As a result, the final output
dir will be empty.
                 if (fs.exists(finalOutputDir)) {
-                  fs.rename(finalOutputDir, oldTableDir);
+                  fs.mkdirs(oldTableDir);
+
+                  for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter))
{
+                    fs.rename(status.getPath(), oldTableDir);
+                  }
+
                   movedToOldTable = fs.exists(oldTableDir);
                 } else { // if the parent does not exist, make its parent directory.
-                  fs.mkdirs(finalOutputDir.getParent());
+                  fs.mkdirs(finalOutputDir);
                 }
 
-                fs.rename(stagingResultDir, finalOutputDir);
+                // Move the results to the final output dir.
+                for (FileStatus status : fs.listStatus(stagingResultDir)) {
+                  fs.rename(status.getPath(), finalOutputDir);
+                }
+
+                // Check the final output dir
                 committed = fs.exists(finalOutputDir);
+
               } catch (IOException ioe) {
                 // recover the old table
                 if (movedToOldTable && !committed) {
-                  fs.rename(oldTableDir, finalOutputDir);
+
+                  // if commit is failed, recover the old data
+                  for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter))
{
+                    fs.delete(status.getPath(), true);
+                  }
+
+                  for (FileStatus status : fs.listStatus(oldTableDir)) {
+                    fs.rename(status.getPath(), finalOutputDir);
+                  }
                 }
+
+                throw new IOException(ioe.getMessage());
               }
             }
           } else {
@@ -560,13 +587,24 @@ public class Query implements EventHandler<QueryEvent> {
                 }
               }
             } else { // CREATE TABLE AS SELECT (CTAS)
-              fs.rename(stagingResultDir, finalOutputDir);
+              if (fs.exists(finalOutputDir)) {
+                for (FileStatus status : fs.listStatus(stagingResultDir)) {
+                  fs.rename(status.getPath(), finalOutputDir);
+                }
+              } else {
+                fs.rename(stagingResultDir, finalOutputDir);
+              }
               LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir);
             }
           }
-        } catch (IOException e) {
-          // TODO report to client
-          e.printStackTrace();
+
+          // remove the staging directory if the final output dir is given.
+          Path stagingDirRoot = queryContext.getStagingDir().getParent();
+          fs.delete(stagingDirRoot, true);
+
+        } catch (Throwable t) {
+          LOG.error(t);
+          throw new IOException(t);
         }
       } else {
         finalOutputDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME);

http://git-wip-us.apache.org/repos/asf/tajo/blob/b4adc18c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 5cf3df5..75d8ab6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -58,6 +58,7 @@ import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.HAServiceUtil;
 import org.apache.tajo.util.metrics.TajoMetrics;
 import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
@@ -79,6 +80,8 @@ public class QueryMasterTask extends CompositeService {
   final public static FsPermission STAGING_DIR_PERMISSION =
       FsPermission.createImmutable((short) 0700); // rwx--------
 
+  public static final String TMP_STAGING_DIR_PREFIX = ".staging";
+
   private QueryId queryId;
 
   private Session session;
@@ -399,8 +402,7 @@ public class QueryMasterTask extends CompositeService {
 
     try {
 
-      stagingDir = initStagingDir(systemConf, defaultFS, queryId.toString());
-      defaultFS.mkdirs(new Path(stagingDir, TajoConstants.RESULT_DIR_NAME));
+      stagingDir = initStagingDir(systemConf, queryId.toString(), queryContext);
 
       // Create a subdirectories
       LOG.info("The staging dir '" + stagingDir + "' is created.");
@@ -423,7 +425,7 @@ public class QueryMasterTask extends CompositeService {
    * It initializes the final output and staging directory and sets
    * them to variables.
    */
-  public static Path initStagingDir(TajoConf conf, FileSystem fs, String queryId) throws
IOException {
+  public static Path initStagingDir(TajoConf conf, String queryId, QueryContext context)
throws IOException {
 
     String realUser;
     String currentUser;
@@ -432,13 +434,21 @@ public class QueryMasterTask extends CompositeService {
     realUser = ugi.getShortUserName();
     currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
 
-    Path stagingDir = null;
+    FileSystem fs;
+    Path stagingDir;
 
     ////////////////////////////////////////////
     // Create Output Directory
     ////////////////////////////////////////////
 
-    stagingDir = new Path(TajoConf.getStagingDir(conf), queryId);
+    if (context.isCreateTable() || context.isInsert()) {
+      stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX,
queryId);
+    } else {
+      stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
+    }
+
+    // initializ
+    fs = stagingDir.getFileSystem(conf);
 
     if (fs.exists(stagingDir)) {
       throw new IOException("The staging directory '" + stagingDir + "' already exists");
@@ -462,6 +472,9 @@ public class QueryMasterTask extends CompositeService {
       fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
     }
 
+    Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+    fs.mkdirs(stagingResultDir);
+
     return stagingDir;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b4adc18c/tajo-core/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp
index 30cbf88..6778725 100644
--- a/tajo-core/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/index.jsp
@@ -131,7 +131,7 @@
     <tr><td width='150'>Root dir:</td><td><%=TajoConf.getTajoRootDir(master.getContext().getConf())%></td></tr>
     <tr><td width='150'>System dir:</td><td><%=TajoConf.getSystemDir(master.getContext().getConf())%></td></tr>
     <tr><td width='150'>Warehouse dir:</td><td><%=TajoConf.getWarehouseDir(master.getContext().getConf())%></td></tr>
-    <tr><td width='150'>Staging dir:</td><td><%=TajoConf.getStagingDir(master.getContext().getConf())%></td></tr>
+    <tr><td width='150'>Staging dir:</td><td><%=TajoConf.getDefaultRootStagingDir(master.getContext().getConf())%></td></tr>
     <tr><td width='150'>Client Service:</td><td><%=NetUtils.normalizeInetSocketAddress(master.getTajoMasterClientService().getBindAddress())%></td></tr>
     <tr><td width='150'>Catalog Service:</td><td><%=master.getCatalogServer().getCatalogServerName()%></td></tr>
     <tr><td width='150'>Heap(Free/Total/Max): </td><td><%=Runtime.getRuntime().freeMemory()/1024/1024%>
MB / <%=Runtime.getRuntime().totalMemory()/1024/1024%> MB / <%=Runtime.getRuntime().maxMemory()/1024/1024%>
MB</td>

http://git-wip-us.apache.org/repos/asf/tajo/blob/b4adc18c/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
index 0e02079..0e89803 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
@@ -28,6 +28,7 @@ import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b4adc18c/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
index 9c97a55..117f186 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
@@ -29,6 +29,7 @@ import org.apache.tajo.QueryTestCaseBase;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.util.CommonTestingUtil;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -114,8 +115,19 @@ public class TestInsertQuery extends QueryTestCaseBase {
 
   @Test
   public final void testInsertIntoLocation() throws Exception {
+    Path dfsPath = new Path("/tajo-data/testInsertIntoLocation");
+    assertTestInsertIntoLocation(dfsPath);
+  }
+
+  @Test
+  public final void testInsertIntoLocationDifferentFSs() throws Exception {
+    Path localPath = CommonTestingUtil.getTestDir();
+    assertTestInsertIntoLocation(localPath);
+  }
+
+  public final void assertTestInsertIntoLocation(Path path) throws Exception {
     FileSystem fs = null;
-    Path path = new Path("/tajo-data/testInsertIntoLocation");
+
     try {
       executeString("insert into location '" + path + "' select l_orderkey, l_partkey, l_linenumber
from default.lineitem").close();
 


Mime
View raw message