tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject git commit: TAJO-989: Cleanup of child blocks after parent execution block is complete. (jinho)
Date Tue, 05 Aug 2014 02:59:49 GMT
Repository: tajo
Updated Branches:
  refs/heads/master ae384685f -> 0f3412a74


TAJO-989: Cleanup of child blocks after parent execution block is complete. (jinho)

Closes #103


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/0f3412a7
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/0f3412a7
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/0f3412a7

Branch: refs/heads/master
Commit: 0f3412a74bb3c565df1259b19630bc17e1bc69e0
Parents: ae38468
Author: jinossy <jinossy@gmail.com>
Authored: Tue Aug 5 11:58:16 2014 +0900
Committer: jinossy <jinossy@gmail.com>
Committed: Tue Aug 5 11:58:16 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../tajo/master/querymaster/QueryMaster.java    | 30 +++++++++-
 .../tajo/master/querymaster/SubQuery.java       | 20 ++++++-
 .../tajo/worker/TajoWorkerManagerService.java   | 13 +++++
 .../main/java/org/apache/tajo/worker/Task.java  |  6 +-
 .../java/org/apache/tajo/worker/TaskRunner.java | 21 ++++++-
 .../src/main/proto/TajoWorkerProtocol.proto     |  5 ++
 .../apache/tajo/worker/TestDeletionService.java | 61 ++++++++++++++++++++
 8 files changed, 151 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 61593a2..be71bf4 100644
--- a/CHANGES
+++ b/CHANGES
@@ -29,6 +29,9 @@ Release 0.9.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-989: Cleanup of child blocks after parent execution block is complete
+    (jinho)
+
     TAJO-966: Range partition should support split of multiple characters.
     (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index f173c24..25af82f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.master.querymaster;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -28,6 +29,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
@@ -162,6 +164,30 @@ public class QueryMaster extends CompositeService implements EventHandler
{
     }
   }
 
+  protected void cleanupExecutionBlock(List<TajoIdProtos.ExecutionBlockIdProto> executionBlockIds)
{
+    LOG.info("cleanup executionBlocks : " + executionBlockIds);
+    NettyClientBase rpc = null;
+    List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
+    TajoWorkerProtocol.ExecutionBlockListProto.Builder builder = TajoWorkerProtocol.ExecutionBlockListProto.newBuilder();
+    builder.addAllExecutionBlockId(Lists.newArrayList(executionBlockIds));
+    TajoWorkerProtocol.ExecutionBlockListProto executionBlockListProto = builder.build();
+    for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+      try {
+        if (worker.getPeerRpcPort() == 0) continue;
+
+        rpc = connPool.getConnection(NetUtils.createSocketAddr(worker.getHost(), worker.getPeerRpcPort()),
+            TajoWorkerProtocol.class, true);
+        TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub();
+
+        tajoWorkerProtocolService.cleanupExecutionBlocks(null, executionBlockListProto, NullCallback.get());
+      } catch (Exception e) {
+        LOG.error(e.getMessage());
+      } finally {
+        connPool.releaseConnection(rpc);
+      }
+    }
+  }
+
   private void cleanup(QueryId queryId) {
     LOG.info("cleanup query resources : " + queryId);
     NettyClientBase rpc = null;
@@ -338,7 +364,9 @@ public class QueryMaster extends CompositeService implements EventHandler
{
           queryMasterTask.stop();
           //if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")
          //     && !workerContext.isYarnContainerMode()) {
-            cleanup(queryId);       // TODO We will support yarn mode
+          if (!getContext().getConf().getBoolVar(TajoConf.ConfVars.TAJO_DEBUG)) {
+            cleanup(queryId);
+          }
           //}
         } catch (Exception e) {
           LOG.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index f2e9dd5..17efa21 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.util.Records;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
@@ -1004,7 +1005,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.containers.size()
+ " containers!");
         subQuery.eventHandler.handle(
             new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_LAUNCH,
-                subQuery.getId(), allocationEvent.getAllocatedContainer()));
+                subQuery.getId(), allocationEvent.getAllocatedContainer())
+        );
 
         subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_START));
       } catch (Throwable t) {
@@ -1107,6 +1109,20 @@ public class SubQuery implements EventHandler<SubQueryEvent>
{
   private void cleanup() {
     stopScheduler();
     releaseContainers();
+
+    if (!getContext().getConf().getBoolVar(TajoConf.ConfVars.TAJO_DEBUG)) {
+      List<ExecutionBlock> childs = getMasterPlan().getChilds(getId());
+      List<TajoIdProtos.ExecutionBlockIdProto> ebIds = Lists.newArrayList();
+      for (ExecutionBlock executionBlock :  childs){
+        ebIds.add(executionBlock.getId().getProto());
+      }
+
+      try {
+        getContext().getQueryMasterContext().getQueryMaster().cleanupExecutionBlock(ebIds);
+      } catch (Throwable e) {
+        LOG.error(e);
+      }
+    }
   }
 
   private static class SubQueryCompleteTransition
@@ -1114,7 +1130,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
     @Override
     public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
-      // TODO - Commit subQuery & do cleanup
+      // TODO - Commit subQuery
       // TODO - records succeeded, failed, killed completed task
       // TODO - records metrics
       try {

http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 13ef15d..e77da70 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoIdProtos;
@@ -151,4 +152,16 @@ public class TajoWorkerManagerService extends CompositeService
     workerContext.cleanup(new QueryId(request).toString());
     done.run(TajoWorker.TRUE_PROTO);
   }
+
+  @Override
+  public void cleanupExecutionBlocks(RpcController controller, TajoWorkerProtocol.ExecutionBlockListProto
request,
+                                     RpcCallback<PrimitiveProtos.BoolProto> done) {
+    for (TajoIdProtos.ExecutionBlockIdProto executionBlockIdProto : request.getExecutionBlockIdList())
{
+      String inputDir = TaskRunner.getBaseInputDir(new ExecutionBlockId(executionBlockIdProto)).toString();
+      workerContext.cleanup(inputDir);
+      String outputDir = TaskRunner.getBaseOutputDir(new ExecutionBlockId(executionBlockIdProto)).toString();
+      workerContext.cleanup(outputDir);
+    }
+    done.run(TajoWorker.TRUE_PROTO);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 230c63a..3a4536a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -794,12 +794,10 @@ public class Task {
       }
     }
   }
+
   public static Path getTaskAttemptDir(QueryUnitAttemptId quid) {
     Path workDir =
-        StorageUtil.concatPath(
-            quid.getQueryUnitId().getExecutionBlockId().getQueryId().toString(),
-            "in",
-            quid.getQueryUnitId().getExecutionBlockId().toString(),
+        StorageUtil.concatPath(TaskRunner.getBaseInputDir(quid.getQueryUnitId().getExecutionBlockId()),
             String.valueOf(quid.getQueryUnitId().getId()),
             String.valueOf(quid.getId()));
     return workDir;

http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 3fcee06..9676192 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -45,6 +45,7 @@ import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.TajoIdUtils;
 
 import java.net.InetSocketAddress;
@@ -172,6 +173,24 @@ public class TaskRunner extends AbstractService {
     return executionBlockId + "," + containerId;
   }
 
+  public static Path getBaseOutputDir(ExecutionBlockId executionBlockId){
+    Path workDir =
+        StorageUtil.concatPath(
+            executionBlockId.getQueryId().toString(),
+            "output",
+            String.valueOf(executionBlockId.getId()));
+    return workDir;
+  }
+
+  public static Path getBaseInputDir(ExecutionBlockId executionBlockId) {
+    Path workDir =
+        StorageUtil.concatPath(
+            executionBlockId.getQueryId().toString(),
+            "in",
+            executionBlockId.toString());
+    return workDir;
+  }
+
   @Override
   public void init(Configuration conf) {
     this.systemConf = (TajoConf)conf;
@@ -182,7 +201,7 @@ public class TaskRunner extends AbstractService {
       localFS = FileSystem.getLocal(conf);
 
       // the base dir for an output dir
-      baseDir = queryId.toString() + "/output" + "/" + executionBlockId.getId();
+      baseDir = getBaseOutputDir(executionBlockId).toString();
 
       // initialize LocalDirAllocator
       lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);

http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index ce8ce86..dc2b1d7 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -179,6 +179,10 @@ message RunExecutionBlockRequestProto {
     optional string queryOutputPath = 6;
 }
 
+message ExecutionBlockListProto {
+    repeated ExecutionBlockIdProto executionBlockId = 1;
+}
+
 service TajoWorkerProtocolService {
   rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
 
@@ -186,6 +190,7 @@ service TajoWorkerProtocolService {
   rpc executeExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto);
   rpc killTaskAttempt(QueryUnitAttemptIdProto) returns (BoolProto);
   rpc cleanup(QueryIdProto) returns (BoolProto);
+  rpc cleanupExecutionBlocks(ExecutionBlockListProto) returns (BoolProto);
 }
 
 message EnforceProperty {

http://git-wip-us.apache.org/repos/asf/tajo/blob/0f3412a7/tajo-core/src/test/java/org/apache/tajo/worker/TestDeletionService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestDeletionService.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestDeletionService.java
new file mode 100644
index 0000000..98251c1
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestDeletionService.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestDeletionService {
+  DeletionService deletionService;
+
+  @Before
+  public void setUp() throws Exception {
+  }
+
+  @After
+  public void tearDown() {
+    if(deletionService != null){
+      deletionService.stop();
+    }
+  }
+
+  @Test
+  public final void testTemporalDirectory() throws IOException, InterruptedException {
+    int delay = 1;
+    deletionService = new DeletionService(1, delay);
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    Path tempPath = CommonTestingUtil.getTestDir();
+    assertTrue(fs.exists(tempPath));
+    deletionService.delete(tempPath);
+    assertTrue(fs.exists(tempPath));
+
+    Thread.sleep(delay * 2 * 1000);
+    assertFalse(fs.exists(tempPath));
+  }
+}


Mime
View raw message