Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 50AF1200C7C for ; Mon, 5 Jun 2017 23:04:25 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4F361160BD4; Mon, 5 Jun 2017 21:04:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B9359160BE4 for ; Mon, 5 Jun 2017 23:04:22 +0200 (CEST) Received: (qmail 66968 invoked by uid 500); 5 Jun 2017 21:04:12 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 65186 invoked by uid 99); 5 Jun 2017 21:04:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Jun 2017 21:04:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E15B1E04F2; Mon, 5 Jun 2017 21:04:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: xgong@apache.org To: common-commits@hadoop.apache.org Date: Mon, 05 Jun 2017 21:04:28 -0000 Message-Id: <8fbeab0adad944638b796bcbfddcbab0@git.apache.org> In-Reply-To: <42db12cfffd945bb93c88f3808e3214f@git.apache.org> References: <42db12cfffd945bb93c88f3808e3214f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [19/50] [abbrv] hadoop git commit: YARN-6366. Refactor the NodeManager DeletionService to support additional DeletionTask types. Contributed by Shane Kumpf. archived-at: Mon, 05 Jun 2017 21:04:25 -0000 YARN-6366. Refactor the NodeManager DeletionService to support additional DeletionTask types. Contributed by Shane Kumpf. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/547f18cb Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/547f18cb Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/547f18cb Branch: refs/heads/YARN-5734 Commit: 547f18cb96aeda55cc19b38be2be4d631b3a5f4f Parents: 4b4a652 Author: Varun Vasudev Authored: Wed May 31 16:15:35 2017 +0530 Committer: Varun Vasudev Committed: Wed May 31 16:15:35 2017 +0530 ---------------------------------------------------------------------- .../server/nodemanager/DeletionService.java | 468 ++++--------------- .../nodemanager/api/impl/pb/NMProtoUtils.java | 110 +++++ .../nodemanager/api/impl/pb/package-info.java | 25 + .../recovery/DeletionTaskRecoveryInfo.java | 73 +++ .../deletion/recovery/package-info.java | 25 + .../deletion/task/DeletionTask.java | 258 ++++++++++ .../deletion/task/DeletionTaskType.java | 24 + .../deletion/task/FileDeletionTask.java | 202 ++++++++ .../deletion/task/package-info.java | 25 + .../localizer/LocalResourcesTrackerImpl.java | 13 +- .../localizer/ResourceLocalizationService.java | 40 +- .../logaggregation/AppLogAggregatorImpl.java | 60 ++- .../loghandler/NonAggregatingLogHandler.java | 7 +- .../yarn_server_nodemanager_recovery.proto | 1 + .../server/nodemanager/TestDeletionService.java | 57 ++- .../nodemanager/TestNodeManagerReboot.java | 99 +--- .../api/impl/pb/TestNMProtoUtils.java | 91 ++++ .../BaseContainerManagerTest.java | 7 +- .../deletion/task/FileDeletionMatcher.java | 84 ++++ .../deletion/task/TestFileDeletionTask.java | 85 ++++ .../TestLocalResourcesTrackerImpl.java | 5 +- .../TestResourceLocalizationService.java | 33 +- .../TestAppLogAggregatorImpl.java | 15 +- .../TestLogAggregationService.java | 17 +- .../TestNonAggregatingLogHandler.java | 8 +- 25 files changed, 1274 insertions(+), 558 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index aac0af9..38d69a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -21,11 +21,8 @@ package org.apache.hadoop.yarn.server.nodemanager; import static java.util.concurrent.TimeUnit.SECONDS; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -38,461 +35,176 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; -import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; +import org.apache.hadoop.yarn.server.nodemanager.api.impl.pb.NMProtoUtils; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery.DeletionTaskRecoveryInfo; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; -import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState; -import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class DeletionService extends AbstractService { - static final Log LOG = LogFactory.getLog(DeletionService.class); + + private static final Log LOG = LogFactory.getLog(DeletionService.class); + private int debugDelay; - private final ContainerExecutor exec; - private ScheduledThreadPoolExecutor sched; - private static final FileContext lfs = getLfs(); + private final ContainerExecutor containerExecutor; private final NMStateStoreService stateStore; + private ScheduledThreadPoolExecutor sched; private AtomicInteger nextTaskId = new AtomicInteger(0); - static final FileContext getLfs() { - try { - return FileContext.getLocalFSFileContext(); - } catch (UnsupportedFileSystemException e) { - throw new RuntimeException(e); - } - } - public DeletionService(ContainerExecutor exec) { this(exec, new NMNullStateStoreService()); } - public DeletionService(ContainerExecutor exec, + public DeletionService(ContainerExecutor containerExecutor, NMStateStoreService stateStore) { super(DeletionService.class.getName()); - this.exec = exec; + this.containerExecutor = containerExecutor; this.debugDelay = 0; this.stateStore = stateStore; } - - /** - * Delete the path(s) as this user. - * @param user The user to delete as, or the JVM user if null - * @param subDir the sub directory name - * @param baseDirs the base directories which contains the subDir's - */ - public void delete(String user, Path subDir, Path... baseDirs) { - // TODO if parent owned by NM, rename within parent inline - if (debugDelay != -1) { - List baseDirList = null; - if (baseDirs != null && baseDirs.length != 0) { - baseDirList = Arrays.asList(baseDirs); - } - FileDeletionTask task = - new FileDeletionTask(this, user, subDir, baseDirList); - recordDeletionTaskInStateStore(task); - sched.schedule(task, debugDelay, TimeUnit.SECONDS); - } - } - - public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) { - if (debugDelay != -1) { - recordDeletionTaskInStateStore(fileDeletionTask); - sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS); - } - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - ThreadFactory tf = new ThreadFactoryBuilder() - .setNameFormat("DeletionService #%d") - .build(); - if (conf != null) { - sched = new HadoopScheduledThreadPoolExecutor( - conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, - YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf); - debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0); - } else { - sched = new HadoopScheduledThreadPoolExecutor( - YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf); - } - sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - sched.setKeepAliveTime(60L, SECONDS); - if (stateStore.canRecover()) { - recover(stateStore.loadDeletionServiceState()); - } - super.serviceInit(conf); - } - @Override - protected void serviceStop() throws Exception { - if (sched != null) { - sched.shutdown(); - boolean terminated = false; - try { - terminated = sched.awaitTermination(10, SECONDS); - } catch (InterruptedException e) { - } - if (terminated != true) { - sched.shutdownNow(); - } - } - super.serviceStop(); + public int getDebugDelay() { + return debugDelay; } - /** - * Determine if the service has completely stopped. - * Used only by unit tests - * @return true if service has completely stopped - */ - @Private - public boolean isTerminated() { - return getServiceState() == STATE.STOPPED && sched.isTerminated(); + public ContainerExecutor getContainerExecutor() { + return containerExecutor; } - public static class FileDeletionTask implements Runnable { - public static final int INVALID_TASK_ID = -1; - private int taskId; - private final String user; - private final Path subDir; - private final List baseDirs; - private final AtomicInteger numberOfPendingPredecessorTasks; - private final Set successorTaskSet; - private final DeletionService delService; - // By default all tasks will start as success=true; however if any of - // the dependent task fails then it will be marked as false in - // fileDeletionTaskFinished(). - private boolean success; - - private FileDeletionTask(DeletionService delService, String user, - Path subDir, List baseDirs) { - this(INVALID_TASK_ID, delService, user, subDir, baseDirs); - } - - private FileDeletionTask(int taskId, DeletionService delService, - String user, Path subDir, List baseDirs) { - this.taskId = taskId; - this.delService = delService; - this.user = user; - this.subDir = subDir; - this.baseDirs = baseDirs; - this.successorTaskSet = new HashSet(); - this.numberOfPendingPredecessorTasks = new AtomicInteger(0); - success = true; - } - - /** - * increments and returns pending predecessor task count - */ - public int incrementAndGetPendingPredecessorTasks() { - return numberOfPendingPredecessorTasks.incrementAndGet(); - } - - /** - * decrements and returns pending predecessor task count - */ - public int decrementAndGetPendingPredecessorTasks() { - return numberOfPendingPredecessorTasks.decrementAndGet(); - } - - @VisibleForTesting - public String getUser() { - return this.user; - } - - @VisibleForTesting - public Path getSubDir() { - return this.subDir; - } - - @VisibleForTesting - public List getBaseDirs() { - return this.baseDirs; - } - - public synchronized void setSuccess(boolean success) { - this.success = success; - } - - public synchronized boolean getSucess() { - return this.success; - } - - public synchronized FileDeletionTask[] getSuccessorTasks() { - FileDeletionTask[] successors = - new FileDeletionTask[successorTaskSet.size()]; - return successorTaskSet.toArray(successors); - } + public NMStateStoreService getStateStore() { + return stateStore; + } - @Override - public void run() { + public void delete(DeletionTask deletionTask) { + if (debugDelay != -1) { if (LOG.isDebugEnabled()) { - LOG.debug(this); - } - boolean error = false; - if (null == user) { - if (baseDirs == null || baseDirs.size() == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("NM deleting absolute path : " + subDir); - } - try { - lfs.delete(subDir, true); - } catch (IOException e) { - error = true; - LOG.warn("Failed to delete " + subDir); - } - } else { - for (Path baseDir : baseDirs) { - Path del = subDir == null? baseDir : new Path(baseDir, subDir); - if (LOG.isDebugEnabled()) { - LOG.debug("NM deleting path : " + del); - } - try { - lfs.delete(del, true); - } catch (IOException e) { - error = true; - LOG.warn("Failed to delete " + subDir); - } - } - } - } else { - try { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Deleting path: [" + subDir + "] as user: [" + user + "]"); - } - if (baseDirs == null || baseDirs.size() == 0) { - delService.exec.deleteAsUser(new DeletionAsUserContext.Builder() - .setUser(user) - .setSubDir(subDir) - .build()); - } else { - delService.exec.deleteAsUser(new DeletionAsUserContext.Builder() - .setUser(user) - .setSubDir(subDir) - .setBasedirs(baseDirs.toArray(new Path[0])) - .build()); - } - } catch (IOException e) { - error = true; - LOG.warn("Failed to delete as user " + user, e); - } catch (InterruptedException e) { - error = true; - LOG.warn("Failed to delete as user " + user, e); - } - } - if (error) { - setSuccess(!error); - } - fileDeletionTaskFinished(); - } - - @Override - public String toString() { - StringBuffer sb = new StringBuffer("\nFileDeletionTask : "); - sb.append(" user : ").append(this.user); - sb.append(" subDir : ").append( - subDir == null ? "null" : subDir.toString()); - sb.append(" baseDir : "); - if (baseDirs == null || baseDirs.size() == 0) { - sb.append("null"); - } else { - for (Path baseDir : baseDirs) { - sb.append(baseDir.toString()).append(','); - } - } - return sb.toString(); - } - - /** - * If there is a task dependency between say tasks 1,2,3 such that - * task2 and task3 can be started only after task1 then we should define - * task2 and task3 as successor tasks for task1. - * Note:- Task dependency should be defined prior to - * @param successorTask - */ - public synchronized void addFileDeletionTaskDependency( - FileDeletionTask successorTask) { - if (successorTaskSet.add(successorTask)) { - successorTask.incrementAndGetPendingPredecessorTasks(); + String msg = String.format("Scheduling DeletionTask (delay %d) : %s", + debugDelay, deletionTask.toString()); + LOG.debug(msg); } + recordDeletionTaskInStateStore(deletionTask); + sched.schedule(deletionTask, debugDelay, TimeUnit.SECONDS); } - - /* - * This is called when - * 1) Current file deletion task ran and finished. - * 2) This can be even directly called by predecessor task if one of the - * dependent tasks of it has failed marking its success = false. - */ - private synchronized void fileDeletionTaskFinished() { - try { - delService.stateStore.removeDeletionTask(taskId); - } catch (IOException e) { - LOG.error("Unable to remove deletion task " + taskId - + " from state store", e); - } - Iterator successorTaskI = - this.successorTaskSet.iterator(); - while (successorTaskI.hasNext()) { - FileDeletionTask successorTask = successorTaskI.next(); - if (!success) { - successorTask.setSuccess(success); - } - int count = successorTask.decrementAndGetPendingPredecessorTasks(); - if (count == 0) { - if (successorTask.getSucess()) { - successorTask.delService.scheduleFileDeletionTask(successorTask); - } else { - successorTask.fileDeletionTaskFinished(); - } - } - } - } - } - - /** - * Helper method to create file deletion task. To be used only if we need - * a way to define dependencies between deletion tasks. - * @param user user on whose behalf this task is suppose to run - * @param subDir sub directory as required in - * {@link DeletionService#delete(String, Path, Path...)} - * @param baseDirs base directories as required in - * {@link DeletionService#delete(String, Path, Path...)} - */ - public FileDeletionTask createFileDeletionTask(String user, Path subDir, - Path[] baseDirs) { - return new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs)); } - private void recover(RecoveredDeletionServiceState state) + private void recover(NMStateStoreService.RecoveredDeletionServiceState state) throws IOException { List taskProtos = state.getTasks(); Map idToInfoMap = - new HashMap(taskProtos.size()); - Set successorTasks = new HashSet(); + new HashMap<>(taskProtos.size()); + Set successorTasks = new HashSet<>(); for (DeletionServiceDeleteTaskProto proto : taskProtos) { - DeletionTaskRecoveryInfo info = parseTaskProto(proto); - idToInfoMap.put(info.task.taskId, info); - nextTaskId.set(Math.max(nextTaskId.get(), info.task.taskId)); - successorTasks.addAll(info.successorTaskIds); + DeletionTaskRecoveryInfo info = + NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this); + idToInfoMap.put(info.getTask().getTaskId(), info); + nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId())); + successorTasks.addAll(info.getSuccessorTaskIds()); } // restore the task dependencies and schedule the deletion tasks that // have no predecessors final long now = System.currentTimeMillis(); for (DeletionTaskRecoveryInfo info : idToInfoMap.values()) { - for (Integer successorId : info.successorTaskIds){ + for (Integer successorId : info.getSuccessorTaskIds()){ DeletionTaskRecoveryInfo successor = idToInfoMap.get(successorId); if (successor != null) { - info.task.addFileDeletionTaskDependency(successor.task); + info.getTask().addDeletionTaskDependency(successor.getTask()); } else { LOG.error("Unable to locate dependency task for deletion task " - + info.task.taskId + " at " + info.task.getSubDir()); + + info.getTask().getTaskId()); } } - if (!successorTasks.contains(info.task.taskId)) { - long msecTilDeletion = info.deletionTimestamp - now; - sched.schedule(info.task, msecTilDeletion, TimeUnit.MILLISECONDS); + if (!successorTasks.contains(info.getTask().getTaskId())) { + long msecTilDeletion = info.getDeletionTimestamp() - now; + sched.schedule(info.getTask(), msecTilDeletion, TimeUnit.MILLISECONDS); } } } - private DeletionTaskRecoveryInfo parseTaskProto( - DeletionServiceDeleteTaskProto proto) throws IOException { - int taskId = proto.getId(); - String user = proto.hasUser() ? proto.getUser() : null; - Path subdir = null; - List basePaths = null; - if (proto.hasSubdir()) { - subdir = new Path(proto.getSubdir()); - } - List basedirs = proto.getBasedirsList(); - if (basedirs != null && basedirs.size() > 0) { - basePaths = new ArrayList(basedirs.size()); - for (String basedir : basedirs) { - basePaths.add(new Path(basedir)); - } - } - - FileDeletionTask task = new FileDeletionTask(taskId, this, user, - subdir, basePaths); - return new DeletionTaskRecoveryInfo(task, - proto.getSuccessorIdsList(), - proto.getDeletionTime()); - } - private int generateTaskId() { // get the next ID but avoid an invalid ID int taskId = nextTaskId.incrementAndGet(); - while (taskId == FileDeletionTask.INVALID_TASK_ID) { + while (taskId == DeletionTask.INVALID_TASK_ID) { taskId = nextTaskId.incrementAndGet(); } return taskId; } - private void recordDeletionTaskInStateStore(FileDeletionTask task) { + private void recordDeletionTaskInStateStore(DeletionTask task) { if (!stateStore.canRecover()) { // optimize the case where we aren't really recording return; } - if (task.taskId != FileDeletionTask.INVALID_TASK_ID) { + if (task.getTaskId() != DeletionTask.INVALID_TASK_ID) { return; // task already recorded } - task.taskId = generateTaskId(); - - FileDeletionTask[] successors = task.getSuccessorTasks(); + task.setTaskId(generateTaskId()); // store successors first to ensure task IDs have been generated for them - for (FileDeletionTask successor : successors) { + DeletionTask[] successors = task.getSuccessorTasks(); + for (DeletionTask successor : successors) { recordDeletionTaskInStateStore(successor); } - DeletionServiceDeleteTaskProto.Builder builder = - DeletionServiceDeleteTaskProto.newBuilder(); - builder.setId(task.taskId); - if (task.getUser() != null) { - builder.setUser(task.getUser()); - } - if (task.getSubDir() != null) { - builder.setSubdir(task.getSubDir().toString()); - } - builder.setDeletionTime(System.currentTimeMillis() + - TimeUnit.MILLISECONDS.convert(debugDelay, TimeUnit.SECONDS)); - if (task.getBaseDirs() != null) { - for (Path dir : task.getBaseDirs()) { - builder.addBasedirs(dir.toString()); - } - } - for (FileDeletionTask successor : successors) { - builder.addSuccessorIds(successor.taskId); - } - try { - stateStore.storeDeletionTask(task.taskId, builder.build()); + stateStore.storeDeletionTask(task.getTaskId(), + task.convertDeletionTaskToProto()); } catch (IOException e) { - LOG.error("Unable to store deletion task " + task.taskId + " for " - + task.getSubDir(), e); + LOG.error("Unable to store deletion task " + task.getTaskId(), e); } } - private static class DeletionTaskRecoveryInfo { - FileDeletionTask task; - List successorTaskIds; - long deletionTimestamp; + @Override + protected void serviceInit(Configuration conf) throws Exception { + ThreadFactory tf = new ThreadFactoryBuilder() + .setNameFormat("DeletionService #%d") + .build(); + if (conf != null) { + sched = new HadoopScheduledThreadPoolExecutor( + conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, + YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf); + debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0); + } else { + sched = new HadoopScheduledThreadPoolExecutor( + YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf); + } + sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + sched.setKeepAliveTime(60L, SECONDS); + if (stateStore.canRecover()) { + recover(stateStore.loadDeletionServiceState()); + } + super.serviceInit(conf); + } - public DeletionTaskRecoveryInfo(FileDeletionTask task, - List successorTaskIds, long deletionTimestamp) { - this.task = task; - this.successorTaskIds = successorTaskIds; - this.deletionTimestamp = deletionTimestamp; + @Override + public void serviceStop() throws Exception { + if (sched != null) { + sched.shutdown(); + boolean terminated = false; + try { + terminated = sched.awaitTermination(10, SECONDS); + } catch (InterruptedException e) { } + if (!terminated) { + sched.shutdownNow(); + } } + super.serviceStop(); + } + + /** + * Determine if the service has completely stopped. + * Used only by unit tests + * @return true if service has completely stopped + */ + @Private + public boolean isTerminated() { + return getServiceState() == STATE.STOPPED && sched.isTerminated(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/NMProtoUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/NMProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/NMProtoUtils.java new file mode 100644 index 0000000..e47b3ee --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/NMProtoUtils.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.api.impl.pb; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery.DeletionTaskRecoveryInfo; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTaskType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; + +import java.util.ArrayList; +import java.util.List; + +/** + * Utilities for converting from PB representations. + */ +public final class NMProtoUtils { + + private static final Log LOG = LogFactory.getLog(NMProtoUtils.class); + + private NMProtoUtils() { } + + /** + * Convert the Protobuf representation into a {@link DeletionTask}. + * + * @param proto the Protobuf representation for the DeletionTask + * @param deletionService the {@link DeletionService} + * @return the converted {@link DeletionTask} + */ + public static DeletionTask convertProtoToDeletionTask( + DeletionServiceDeleteTaskProto proto, DeletionService deletionService) { + int taskId = proto.getId(); + if (proto.hasTaskType() && proto.getTaskType() != null) { + if (proto.getTaskType().equals(DeletionTaskType.FILE.name())) { + LOG.debug("Converting recovered FileDeletionTask"); + return convertProtoToFileDeletionTask(proto, deletionService, taskId); + } + } + LOG.debug("Unable to get task type, trying FileDeletionTask"); + return convertProtoToFileDeletionTask(proto, deletionService, taskId); + } + + /** + * Convert the Protobuf representation into the {@link FileDeletionTask}. + * + * @param proto the Protobuf representation of the {@link FileDeletionTask} + * @param deletionService the {@link DeletionService}. + * @param taskId the ID of the {@link DeletionTask}. + * @return the populated {@link FileDeletionTask}. + */ + public static FileDeletionTask convertProtoToFileDeletionTask( + DeletionServiceDeleteTaskProto proto, DeletionService deletionService, + int taskId) { + String user = proto.hasUser() ? proto.getUser() : null; + Path subdir = null; + if (proto.hasSubdir()) { + subdir = new Path(proto.getSubdir()); + } + List basePaths = null; + List basedirs = proto.getBasedirsList(); + if (basedirs != null && basedirs.size() > 0) { + basePaths = new ArrayList<>(basedirs.size()); + for (String basedir : basedirs) { + basePaths.add(new Path(basedir)); + } + } + return new FileDeletionTask(taskId, deletionService, user, subdir, + basePaths); + } + + /** + * Convert the Protobuf representation to the {@link DeletionTaskRecoveryInfo} + * representation. + * + * @param proto the Protobuf representation of the {@link DeletionTask} + * @param deletionService the {@link DeletionService} + * @return the populated {@link DeletionTaskRecoveryInfo} + */ + public static DeletionTaskRecoveryInfo convertProtoToDeletionTaskRecoveryInfo( + DeletionServiceDeleteTaskProto proto, DeletionService deletionService) { + DeletionTask deletionTask = + NMProtoUtils.convertProtoToDeletionTask(proto, deletionService); + List successorTaskIds = new ArrayList<>(); + if (proto.getSuccessorIdsList() != null && + !proto.getSuccessorIdsList().isEmpty()) { + successorTaskIds = proto.getSuccessorIdsList(); + } + long deletionTimestamp = proto.getDeletionTime(); + return new DeletionTaskRecoveryInfo(deletionTask, successorTaskIds, + deletionTimestamp); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/package-info.java new file mode 100644 index 0000000..006f49f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/package-info.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package containing classes for working with Protobuf. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.nodemanager.api.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/DeletionTaskRecoveryInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/DeletionTaskRecoveryInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/DeletionTaskRecoveryInfo.java new file mode 100644 index 0000000..c62ea02 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/DeletionTaskRecoveryInfo.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery; + +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask; + +import java.util.List; + +/** + * Encapsulates the recovery info needed to recover a DeletionTask from the NM + * state store. + */ +public class DeletionTaskRecoveryInfo { + + private DeletionTask task; + private List successorTaskIds; + private long deletionTimestamp; + + /** + * Information needed for recovering the DeletionTask. + * + * @param task the DeletionTask + * @param successorTaskIds the dependent DeletionTasks. + * @param deletionTimestamp the scheduled times of deletion. + */ + public DeletionTaskRecoveryInfo(DeletionTask task, + List successorTaskIds, long deletionTimestamp) { + this.task = task; + this.successorTaskIds = successorTaskIds; + this.deletionTimestamp = deletionTimestamp; + } + + /** + * Return the recovered DeletionTask. + * + * @return the recovered DeletionTask. + */ + public DeletionTask getTask() { + return task; + } + + /** + * Return all of the dependent DeletionTasks. + * + * @return the dependent DeletionTasks. + */ + public List getSuccessorTaskIds() { + return successorTaskIds; + } + + /** + * Return the deletion timestamp. + * + * @return the deletion timestamp. + */ + public long getDeletionTimestamp() { + return deletionTimestamp; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/package-info.java new file mode 100644 index 0000000..28d7f62 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/recovery/package-info.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package containing classes for recovering DeletionTasks. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTask.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTask.java new file mode 100644 index 0000000..635d7a9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTask.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * DeletionTasks are supplied to the {@link DeletionService} for deletion. + */ +public abstract class DeletionTask implements Runnable { + + static final Log LOG = LogFactory.getLog(DeletionTask.class); + + public static final int INVALID_TASK_ID = -1; + + private int taskId; + private String user; + private DeletionTaskType deletionTaskType; + private DeletionService deletionService; + private final AtomicInteger numberOfPendingPredecessorTasks; + private final Set successorTaskSet; + // By default all tasks will start as success=true; however if any of + // the dependent task fails then it will be marked as false in + // deletionTaskFinished(). + private boolean success; + + /** + * Deletion task with taskId and default values. + * + * @param taskId the ID of the task, if previously set. + * @param deletionService the {@link DeletionService}. + * @param user the user associated with the delete. + * @param deletionTaskType the {@link DeletionTaskType}. + */ + public DeletionTask(int taskId, DeletionService deletionService, String user, + DeletionTaskType deletionTaskType) { + this(taskId, deletionService, user, new AtomicInteger(0), + new HashSet(), deletionTaskType); + } + + /** + * Deletion task with taskId and user supplied values. + * + * @param taskId the ID of the task, if previously set. + * @param deletionService the {@link DeletionService}. + * @param user the user associated with the delete. + * @param numberOfPendingPredecessorTasks Number of pending tasks. + * @param successorTaskSet the list of successor DeletionTasks + * @param deletionTaskType the {@link DeletionTaskType}. + */ + public DeletionTask(int taskId, DeletionService deletionService, String user, + AtomicInteger numberOfPendingPredecessorTasks, + Set successorTaskSet, DeletionTaskType deletionTaskType) { + this.taskId = taskId; + this.deletionService = deletionService; + this.user = user; + this.numberOfPendingPredecessorTasks = numberOfPendingPredecessorTasks; + this.successorTaskSet = successorTaskSet; + this.deletionTaskType = deletionTaskType; + success = true; + } + + /** + * Get the taskId for the DeletionTask. + * + * @return the taskId. + */ + public int getTaskId() { + return taskId; + } + + /** + * Set the taskId for the DeletionTask. + * + * @param taskId the taskId. + */ + public void setTaskId(int taskId) { + this.taskId = taskId; + } + + /** + * The the user assoicated with the DeletionTask. + * + * @return the user name. + */ + public String getUser() { + return user; + } + + /** + * Get the {@link DeletionService} for this DeletionTask. + * + * @return the {@link DeletionService}. + */ + public DeletionService getDeletionService() { + return deletionService; + } + + /** + * Get the {@link DeletionTaskType} for this DeletionTask. + * + * @return the {@link DeletionTaskType}. + */ + public DeletionTaskType getDeletionTaskType() { + return deletionTaskType; + } + + /** + * Set the DeletionTask run status. + * + * @param success the status of the running DeletionTask. + */ + public synchronized void setSuccess(boolean success) { + this.success = success; + } + + /** + * Return the DeletionTask run status. + * + * @return the status of the running DeletionTask. + */ + public synchronized boolean getSucess() { + return this.success; + } + + /** + * Return the list of successor tasks for the DeletionTask. + * + * @return the list of successor tasks. + */ + public synchronized DeletionTask[] getSuccessorTasks() { + DeletionTask[] successors = new DeletionTask[successorTaskSet.size()]; + return successorTaskSet.toArray(successors); + } + + /** + * Convert the DeletionTask to the Protobuf representation for storing in the + * state store and recovery. + * + * @return the protobuf representation of the DeletionTask. + */ + public abstract DeletionServiceDeleteTaskProto convertDeletionTaskToProto(); + + /** + * Add a dependent DeletionTask. + * + * If there is a task dependency between say tasks 1,2,3 such that + * task2 and task3 can be started only after task1 then we should define + * task2 and task3 as successor tasks for task1. + * Note:- Task dependency should be defined prior to calling delete. + * + * @param successorTask the DeletionTask the depends on this DeletionTask. + */ + public synchronized void addDeletionTaskDependency( + DeletionTask successorTask) { + if (successorTaskSet.add(successorTask)) { + successorTask.incrementAndGetPendingPredecessorTasks(); + } + } + + /** + * Increments and returns pending predecessor task count. + * + * @return the number of pending predecessor DeletionTasks. + */ + public int incrementAndGetPendingPredecessorTasks() { + return numberOfPendingPredecessorTasks.incrementAndGet(); + } + + /** + * Decrements and returns pending predecessor task count. + * + * @return the number of pending predecessor DeletionTasks. + */ + public int decrementAndGetPendingPredecessorTasks() { + return numberOfPendingPredecessorTasks.decrementAndGet(); + } + + /** + * Removes the DeletionTask from the state store and validates that successor + * tasks have been scheduled and completed. + * + * This is called when: + * 1) Current deletion task ran and finished. + * 2) When directly called by predecessor task if one of the + * dependent tasks of it has failed marking its success = false. + */ + synchronized void deletionTaskFinished() { + try { + NMStateStoreService stateStore = deletionService.getStateStore(); + stateStore.removeDeletionTask(taskId); + } catch (IOException e) { + LOG.error("Unable to remove deletion task " + taskId + + " from state store", e); + } + Iterator successorTaskI = this.successorTaskSet.iterator(); + while (successorTaskI.hasNext()) { + DeletionTask successorTask = successorTaskI.next(); + if (!success) { + successorTask.setSuccess(success); + } + int count = successorTask.decrementAndGetPendingPredecessorTasks(); + if (count == 0) { + if (successorTask.getSucess()) { + successorTask.deletionService.delete(successorTask); + } else { + successorTask.deletionTaskFinished(); + } + } + } + } + + /** + * Return the Protobuf builder with the base DeletionTask attributes. + * + * @return pre-populated Buidler with the base attributes. + */ + DeletionServiceDeleteTaskProto.Builder getBaseDeletionTaskProtoBuilder() { + DeletionServiceDeleteTaskProto.Builder builder = + DeletionServiceDeleteTaskProto.newBuilder(); + builder.setId(getTaskId()); + if (getUser() != null) { + builder.setUser(getUser()); + } + builder.setDeletionTime(System.currentTimeMillis() + + TimeUnit.MILLISECONDS.convert(getDeletionService().getDebugDelay(), + TimeUnit.SECONDS)); + for (DeletionTask successor : getSuccessorTasks()) { + builder.addSuccessorIds(successor.getTaskId()); + } + return builder; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTaskType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTaskType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTaskType.java new file mode 100644 index 0000000..676c71b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/DeletionTaskType.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task; + +/** + * Available types of {@link DeletionTask}s. + */ +public enum DeletionTaskType { + FILE +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionTask.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionTask.java new file mode 100644 index 0000000..fd07f16 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/FileDeletionTask.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task; + +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; + +import java.io.IOException; +import java.util.List; + +/** + * {@link DeletionTask} handling the removal of files (and directories). + */ +public class FileDeletionTask extends DeletionTask implements Runnable { + + private final Path subDir; + private final List baseDirs; + private static final FileContext lfs = getLfs(); + + private static FileContext getLfs() { + try { + return FileContext.getLocalFSFileContext(); + } catch (UnsupportedFileSystemException e) { + throw new RuntimeException(e); + } + } + + /** + * Construct a FileDeletionTask with the default INVALID_TASK_ID. + * + * @param deletionService the {@link DeletionService}. + * @param user the user deleting the file. + * @param subDir the subdirectory to delete. + * @param baseDirs the base directories containing the subdir. + */ + public FileDeletionTask(DeletionService deletionService, String user, + Path subDir, List baseDirs) { + this(INVALID_TASK_ID, deletionService, user, subDir, baseDirs); + } + + /** + * Construct a FileDeletionTask with the default INVALID_TASK_ID. + * + * @param taskId the ID of the task, if previously set. + * @param deletionService the {@link DeletionService}. + * @param user the user deleting the file. + * @param subDir the subdirectory to delete. + * @param baseDirs the base directories containing the subdir. + */ + public FileDeletionTask(int taskId, DeletionService deletionService, + String user, Path subDir, List baseDirs) { + super(taskId, deletionService, user, DeletionTaskType.FILE); + this.subDir = subDir; + this.baseDirs = baseDirs; + } + + /** + * Get the subdirectory to delete. + * + * @return the subDir for the FileDeletionTask. + */ + public Path getSubDir() { + return this.subDir; + } + + /** + * Get the base directories containing the subdirectory. + * + * @return the base directories for the FileDeletionTask. + */ + public List getBaseDirs() { + return this.baseDirs; + } + + /** + * Delete the specified file/directory as the specified user. + */ + @Override + public void run() { + if (LOG.isDebugEnabled()) { + String msg = String.format("Running DeletionTask : %s", toString()); + LOG.debug(msg); + } + boolean error = false; + if (null == getUser()) { + if (baseDirs == null || baseDirs.size() == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("NM deleting absolute path : " + subDir); + } + try { + lfs.delete(subDir, true); + } catch (IOException e) { + error = true; + LOG.warn("Failed to delete " + subDir); + } + } else { + for (Path baseDir : baseDirs) { + Path del = subDir == null? baseDir : new Path(baseDir, subDir); + if (LOG.isDebugEnabled()) { + LOG.debug("NM deleting path : " + del); + } + try { + lfs.delete(del, true); + } catch (IOException e) { + error = true; + LOG.warn("Failed to delete " + subDir); + } + } + } + } else { + try { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Deleting path: [" + subDir + "] as user: [" + getUser() + "]"); + } + if (baseDirs == null || baseDirs.size() == 0) { + getDeletionService().getContainerExecutor().deleteAsUser( + new DeletionAsUserContext.Builder() + .setUser(getUser()) + .setSubDir(subDir) + .build()); + } else { + getDeletionService().getContainerExecutor().deleteAsUser( + new DeletionAsUserContext.Builder() + .setUser(getUser()) + .setSubDir(subDir) + .setBasedirs(baseDirs.toArray(new Path[0])) + .build()); + } + } catch (IOException|InterruptedException e) { + error = true; + LOG.warn("Failed to delete as user " + getUser(), e); + } + } + if (error) { + setSuccess(!error); + } + deletionTaskFinished(); + } + + /** + * Convert the FileDeletionTask to a String representation. + * + * @return String representation of the FileDeletionTask. + */ + @Override + public String toString() { + StringBuilder sb = new StringBuilder("FileDeletionTask :"); + sb.append(" id : ").append(getTaskId()); + sb.append(" user : ").append(getUser()); + sb.append(" subDir : ").append( + subDir == null ? "null" : subDir.toString()); + sb.append(" baseDir : "); + if (baseDirs == null || baseDirs.size() == 0) { + sb.append("null"); + } else { + for (Path baseDir : baseDirs) { + sb.append(baseDir.toString()).append(','); + } + } + return sb.toString().trim(); + } + + /** + * Convert the FileDeletionTask to the Protobuf representation for storing + * in the state store and recovery. + * + * @return the protobuf representation of the FileDeletionTask. + */ + public DeletionServiceDeleteTaskProto convertDeletionTaskToProto() { + DeletionServiceDeleteTaskProto.Builder builder = + getBaseDeletionTaskProtoBuilder(); + builder.setTaskType(DeletionTaskType.FILE.name()); + if (getSubDir() != null) { + builder.setSubdir(getSubDir().toString()); + } + if (getBaseDirs() != null) { + for (Path dir : getBaseDirs()) { + builder.addBasedirs(dir.toString()); + } + } + return builder.build(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/package-info.java new file mode 100644 index 0000000..f1a3985 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/deletion/task/package-info.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package containing DeletionTasks for use with the DeletionService. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java index af34e92..47e6a55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent; @@ -113,9 +114,9 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker { this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager; if (this.useLocalCacheDirectoryManager) { directoryManagers = - new ConcurrentHashMap(); + new ConcurrentHashMap<>(); inProgressLocalResourcesMap = - new ConcurrentHashMap(); + new ConcurrentHashMap<>(); } this.conf = conf; this.stateStore = stateStore; @@ -393,7 +394,9 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker { return false; } else { // ResourceState is LOCALIZED or INIT if (ResourceState.LOCALIZED.equals(rsrc.getState())) { - delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath())); + FileDeletionTask deletionTask = new FileDeletionTask(delService, + getUser(), getPathToDelete(rsrc.getLocalPath()), null); + delService.delete(deletionTask); } removeResource(rem.getRequest()); LOG.info("Removed " + rsrc.getLocalPath() + " from localized cache"); @@ -488,7 +491,9 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker { LOG.warn("Directory " + uniquePath + " already exists, " + "try next one."); if (delService != null) { - delService.delete(getUser(), uniquePath); + FileDeletionTask deletionTask = new FileDeletionTask(delService, + getUser(), uniquePath, null); + delService.delete(deletionTask); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 663bad7..5bc0da7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -95,7 +95,6 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Localize import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; -import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; @@ -113,6 +112,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; @@ -604,7 +604,9 @@ public class ResourceLocalizationService extends CompositeService private void submitDirForDeletion(String userName, Path dir) { try { lfs.getFileStatus(dir); - delService.delete(userName, dir, new Path[] {}); + FileDeletionTask deletionTask = new FileDeletionTask(delService, userName, + dir, null); + delService.delete(deletionTask); } catch (UnsupportedFileSystemException ue) { LOG.warn("Local dir " + dir + " is an unsupported filesystem", ue); } catch (IOException ie) { @@ -1234,10 +1236,13 @@ public class ResourceLocalizationService extends CompositeService event.getResource().unlock(); } if (!paths.isEmpty()) { - delService.delete(context.getUser(), - null, paths.toArray(new Path[paths.size()])); + FileDeletionTask deletionTask = new FileDeletionTask(delService, + context.getUser(), null, paths); + delService.delete(deletionTask); } - delService.delete(null, nmPrivateCTokensPath, new Path[] {}); + FileDeletionTask deletionTask = new FileDeletionTask(delService, null, + nmPrivateCTokensPath, null); + delService.delete(deletionTask); } } @@ -1456,7 +1461,9 @@ public class ResourceLocalizationService extends CompositeService String appName = fileStatus.getPath().getName(); if (appName.matches("^application_\\d+_\\d+_DEL_\\d+$")) { LOG.info("delete app log dir," + appName); - del.delete(null, fileStatus.getPath()); + FileDeletionTask deletionTask = new FileDeletionTask(del, null, + fileStatus.getPath(), null); + del.delete(deletionTask); } } } @@ -1516,7 +1523,9 @@ public class ResourceLocalizationService extends CompositeService || status.getPath().getName() .matches(".*" + ContainerLocalizer.FILECACHE + "_DEL_.*")) { - del.delete(null, status.getPath(), new Path[] {}); + FileDeletionTask deletionTask = new FileDeletionTask(del, null, + status.getPath(), null); + del.delete(deletionTask); } } catch (IOException ex) { // Do nothing, just give the warning @@ -1530,24 +1539,25 @@ public class ResourceLocalizationService extends CompositeService private void cleanUpFilesPerUserDir(FileContext lfs, DeletionService del, Path userDirPath) throws IOException { RemoteIterator userDirStatus = lfs.listStatus(userDirPath); - FileDeletionTask dependentDeletionTask = - del.createFileDeletionTask(null, userDirPath, new Path[] {}); + FileDeletionTask dependentDeletionTask = new FileDeletionTask(del, null, + userDirPath, new ArrayList()); if (userDirStatus != null && userDirStatus.hasNext()) { List deletionTasks = new ArrayList(); while (userDirStatus.hasNext()) { FileStatus status = userDirStatus.next(); String owner = status.getOwner(); - FileDeletionTask deletionTask = - del.createFileDeletionTask(owner, null, - new Path[] { status.getPath() }); - deletionTask.addFileDeletionTaskDependency(dependentDeletionTask); + List pathList = new ArrayList<>(); + pathList.add(status.getPath()); + FileDeletionTask deletionTask = new FileDeletionTask(del, owner, null, + pathList); + deletionTask.addDeletionTaskDependency(dependentDeletionTask); deletionTasks.add(deletionTask); } for (FileDeletionTask task : deletionTasks) { - del.scheduleFileDeletionTask(task); + del.delete(task); } } else { - del.scheduleFileDeletionTask(dependentDeletionTask); + del.delete(dependentDeletionTask); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index f465534..0d9e686 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -69,6 +69,8 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Times; @@ -258,19 +260,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { return; } - if (UserGroupInformation.isSecurityEnabled()) { - Credentials systemCredentials = - context.getSystemCredentialsForApps().get(appId); - if (systemCredentials != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Adding new framework-token for " + appId - + " for log-aggregation: " + systemCredentials.getAllTokens() - + "; userUgi=" + userUgi); - } - // this will replace old token - userUgi.addCredentials(systemCredentials); - } - } + addCredentials(); // Create a set of Containers whose logs will be uploaded in this cycle. // It includes: @@ -332,9 +322,12 @@ public class AppLogAggregatorImpl implements AppLogAggregator { finishedContainers.contains(container)); if (uploadedFilePathsInThisCycle.size() > 0) { uploadedLogsInThisCycle = true; - this.delService.delete(this.userUgi.getShortUserName(), null, - uploadedFilePathsInThisCycle - .toArray(new Path[uploadedFilePathsInThisCycle.size()])); + List uploadedFilePathsInThisCycleList = new ArrayList<>(); + uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle); + DeletionTask deletionTask = new FileDeletionTask(delService, + this.userUgi.getShortUserName(), null, + uploadedFilePathsInThisCycleList); + delService.delete(deletionTask); } // This container is finished, and all its logs have been uploaded, @@ -352,11 +345,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { } long currentTime = System.currentTimeMillis(); - final Path renamedPath = this.rollingMonitorInterval <= 0 - ? remoteNodeLogFileForApp : new Path( - remoteNodeLogFileForApp.getParent(), - remoteNodeLogFileForApp.getName() + "_" - + currentTime); + final Path renamedPath = getRenamedPath(currentTime); final boolean rename = uploadedLogsInThisCycle; try { @@ -396,6 +385,28 @@ public class AppLogAggregatorImpl implements AppLogAggregator { } } + private Path getRenamedPath(long currentTime) { + return this.rollingMonitorInterval <= 0 ? remoteNodeLogFileForApp + : new Path(remoteNodeLogFileForApp.getParent(), + remoteNodeLogFileForApp.getName() + "_" + currentTime); + } + + private void addCredentials() { + if (UserGroupInformation.isSecurityEnabled()) { + Credentials systemCredentials = + context.getSystemCredentialsForApps().get(appId); + if (systemCredentials != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding new framework-token for " + appId + + " for log-aggregation: " + systemCredentials.getAllTokens() + + "; userUgi=" + userUgi); + } + // this will replace old token + userUgi.addCredentials(systemCredentials); + } + } + } + @VisibleForTesting protected LogWriter createLogWriter() { return new LogWriter(); @@ -561,8 +572,11 @@ public class AppLogAggregatorImpl implements AppLogAggregator { } if (localAppLogDirs.size() > 0) { - this.delService.delete(this.userUgi.getShortUserName(), null, - localAppLogDirs.toArray(new Path[localAppLogDirs.size()])); + List localAppLogDirsList = new ArrayList<>(); + localAppLogDirsList.addAll(localAppLogDirs); + DeletionTask deletionTask = new FileDeletionTask(delService, + this.userUgi.getShortUserName(), null, localAppLogDirsList); + this.delService.delete(deletionTask); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java index 2901743..9961748 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; @@ -247,8 +248,10 @@ public class NonAggregatingLogHandler extends AbstractService implements new ApplicationEvent(this.applicationId, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)); if (localAppLogDirs.size() > 0) { - NonAggregatingLogHandler.this.delService.delete(user, null, - (Path[]) localAppLogDirs.toArray(new Path[localAppLogDirs.size()])); + FileDeletionTask deletionTask = new FileDeletionTask( + NonAggregatingLogHandler.this.delService, user, null, + localAppLogDirs); + NonAggregatingLogHandler.this.delService.delete(deletionTask); } try { NonAggregatingLogHandler.this.stateStore.removeLogDeleter( http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto index 7831711..7212953 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto @@ -41,6 +41,7 @@ message DeletionServiceDeleteTaskProto { optional int64 deletionTime = 4; repeated string basedirs = 5; repeated int32 successorIds = 6; + optional string taskType = 7; } message LocalizedResourceProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/547f18cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java index 2e0bbe0..87f4a1c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java @@ -33,13 +33,14 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.junit.AfterClass; import org.junit.Test; import org.mockito.Mockito; + public class TestDeletionService { private static final FileContext lfs = getLfs(); @@ -123,8 +124,9 @@ public class TestDeletionService { del.start(); try { for (Path p : dirs) { - del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", - p, null); + FileDeletionTask deletionTask = new FileDeletionTask(del, + (Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, null); + del.delete(deletionTask); } int msecToWait = 20 * 1000; @@ -159,8 +161,10 @@ public class TestDeletionService { del.start(); for (Path p : content) { assertTrue(lfs.util().exists(new Path(baseDirs.get(0), p))); - del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", - p, baseDirs.toArray(new Path[4])); + FileDeletionTask deletionTask = new FileDeletionTask(del, + (Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, + baseDirs); + del.delete(deletionTask); } int msecToWait = 20 * 1000; @@ -196,8 +200,9 @@ public class TestDeletionService { del.init(conf); del.start(); for (Path p : dirs) { - del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, - null); + FileDeletionTask deletionTask = new FileDeletionTask(del, + (Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, null); + del.delete(deletionTask); } int msecToWait = 20 * 1000; for (Path p : dirs) { @@ -220,7 +225,9 @@ public class TestDeletionService { try { del.init(conf); del.start(); - del.delete("dingo", new Path("/does/not/exist")); + FileDeletionTask deletionTask = new FileDeletionTask(del, "dingo", + new Path("/does/not/exist"), null); + del.delete(deletionTask); } finally { del.stop(); } @@ -247,18 +254,20 @@ public class TestDeletionService { // first we will try to delete sub directories which are present. This // should then trigger parent directory to be deleted. List subDirs = buildDirs(r, dirs.get(0), 2); - + FileDeletionTask dependentDeletionTask = - del.createFileDeletionTask(null, dirs.get(0), new Path[] {}); + new FileDeletionTask(del, null, dirs.get(0), new ArrayList()); List deletionTasks = new ArrayList(); for (Path subDir : subDirs) { + List subDirList = new ArrayList<>(); + subDirList.add(subDir); FileDeletionTask deletionTask = - del.createFileDeletionTask(null, null, new Path[] { subDir }); - deletionTask.addFileDeletionTaskDependency(dependentDeletionTask); + new FileDeletionTask(del, null, dirs.get(0), subDirList); + deletionTask.addDeletionTaskDependency(dependentDeletionTask); deletionTasks.add(deletionTask); } for (FileDeletionTask task : deletionTasks) { - del.scheduleFileDeletionTask(task); + del.delete(task); } int msecToWait = 20 * 1000; @@ -274,19 +283,21 @@ public class TestDeletionService { subDirs = buildDirs(r, dirs.get(1), 2); subDirs.add(new Path(dirs.get(1), "absentFile")); - dependentDeletionTask = - del.createFileDeletionTask(null, dirs.get(1), new Path[] {}); + dependentDeletionTask = new FileDeletionTask(del, null, dirs.get(1), + new ArrayList()); deletionTasks = new ArrayList(); for (Path subDir : subDirs) { - FileDeletionTask deletionTask = - del.createFileDeletionTask(null, null, new Path[] { subDir }); - deletionTask.addFileDeletionTaskDependency(dependentDeletionTask); + List subDirList = new ArrayList<>(); + subDirList.add(subDir); + FileDeletionTask deletionTask = new FileDeletionTask(del, null, null, + subDirList); + deletionTask.addDeletionTaskDependency(dependentDeletionTask); deletionTasks.add(deletionTask); } // marking one of the tasks as a failure. deletionTasks.get(2).setSuccess(false); for (FileDeletionTask task : deletionTasks) { - del.scheduleFileDeletionTask(task); + del.delete(task); } msecToWait = 20 * 1000; @@ -327,8 +338,10 @@ public class TestDeletionService { del.start(); for (Path p : content) { assertTrue(lfs.util().exists(new Path(baseDirs.get(0), p))); - del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", - p, baseDirs.toArray(new Path[4])); + FileDeletionTask deletionTask = new FileDeletionTask(del, + (Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, + baseDirs); + del.delete(deletionTask); } // restart the deletion service @@ -341,8 +354,10 @@ public class TestDeletionService { // verify paths are still eventually deleted int msecToWait = 10 * 1000; for (Path p : baseDirs) { + System.out.println("TEST Basedir: " + p.getName()); for (Path q : content) { Path fp = new Path(p, q); + System.out.println("TEST Path: " + fp.toString()); while (msecToWait > 0 && lfs.util().exists(fp)) { Thread.sleep(100); msecToWait -= 100; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org