Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 979B31125A for ; Tue, 17 Jun 2014 01:02:46 +0000 (UTC) Received: (qmail 8742 invoked by uid 500); 17 Jun 2014 01:02:46 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 8696 invoked by uid 500); 17 Jun 2014 01:02:46 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 8685 invoked by uid 99); 17 Jun 2014 01:02:46 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jun 2014 01:02:46 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jun 2014 01:02:43 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id F32922388906; Tue, 17 Jun 2014 01:02:17 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1603036 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/... Date: Tue, 17 Jun 2014 01:02:17 -0000 To: yarn-commits@hadoop.apache.org From: junping_du@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140617010217.F32922388906@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: junping_du Date: Tue Jun 17 01:02:16 2014 New Revision: 1603036 URL: http://svn.apache.org/r1603036 Log: YARN-1339. Recover DeletionService state upon nodemanager restart. (Contributed by Jason Lowe) Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1603036&r1=1603035&r2=1603036&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Jun 17 01:02:16 2014 @@ -39,6 +39,9 @@ Release 2.5.0 - UNRELEASED YARN-1702. Added kill app functionality to RM web services. (Varun Vasudev via vinodkv) + YARN-1339. Recover DeletionService state upon nodemanager restart. (Jason Lowe + via junping_du) + IMPROVEMENTS YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java?rev=1603036&r1=1603035&r2=1603036&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java Tue Jun 17 01:02:16 2014 @@ -21,10 +21,13 @@ package org.apache.hadoop.yarn.server.no import static java.util.concurrent.TimeUnit.SECONDS; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; @@ -40,6 +43,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -50,6 +57,8 @@ public class DeletionService extends Abs private final ContainerExecutor exec; private ScheduledThreadPoolExecutor sched; private static final FileContext lfs = getLfs(); + private final NMStateStoreService stateStore; + private AtomicInteger nextTaskId = new AtomicInteger(0); static final FileContext getLfs() { try { @@ -60,14 +69,18 @@ public class DeletionService extends Abs } public DeletionService(ContainerExecutor exec) { + this(exec, new NMNullStateStoreService()); + } + + public DeletionService(ContainerExecutor exec, + NMStateStoreService stateStore) { super(DeletionService.class.getName()); this.exec = exec; this.debugDelay = 0; + this.stateStore = stateStore; } /** - * - /** * Delete the path(s) as this user. * @param user The user to delete as, or the JVM user if null * @param subDir the sub directory name @@ -76,19 +89,20 @@ public class DeletionService extends Abs public void delete(String user, Path subDir, Path... baseDirs) { // TODO if parent owned by NM, rename within parent inline if (debugDelay != -1) { - if (baseDirs == null || baseDirs.length == 0) { - sched.schedule(new FileDeletionTask(this, user, subDir, null), - debugDelay, TimeUnit.SECONDS); - } else { - sched.schedule( - new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs)), - debugDelay, TimeUnit.SECONDS); + List baseDirList = null; + if (baseDirs != null && baseDirs.length != 0) { + baseDirList = Arrays.asList(baseDirs); } + FileDeletionTask task = + new FileDeletionTask(this, user, subDir, baseDirList); + recordDeletionTaskInStateStore(task); + sched.schedule(task, debugDelay, TimeUnit.SECONDS); } } public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) { if (debugDelay != -1) { + recordDeletionTaskInStateStore(fileDeletionTask); sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS); } } @@ -109,6 +123,9 @@ public class DeletionService extends Abs } sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); sched.setKeepAliveTime(60L, SECONDS); + if (stateStore.canRecover()) { + recover(stateStore.loadDeletionServiceState()); + } super.serviceInit(conf); } @@ -139,6 +156,8 @@ public class DeletionService extends Abs } public static class FileDeletionTask implements Runnable { + public static final int INVALID_TASK_ID = -1; + private int taskId; private final String user; private final Path subDir; private final List baseDirs; @@ -152,6 +171,12 @@ public class DeletionService extends Abs private FileDeletionTask(DeletionService delService, String user, Path subDir, List baseDirs) { + this(INVALID_TASK_ID, delService, user, subDir, baseDirs); + } + + private FileDeletionTask(int taskId, DeletionService delService, + String user, Path subDir, List baseDirs) { + this.taskId = taskId; this.delService = delService; this.user = user; this.subDir = subDir; @@ -198,6 +223,12 @@ public class DeletionService extends Abs return this.success; } + public synchronized FileDeletionTask[] getSuccessorTasks() { + FileDeletionTask[] successors = + new FileDeletionTask[successorTaskSet.size()]; + return successorTaskSet.toArray(successors); + } + @Override public void run() { if (LOG.isDebugEnabled()) { @@ -286,6 +317,12 @@ public class DeletionService extends Abs * dependent tasks of it has failed marking its success = false. */ private synchronized void fileDeletionTaskFinished() { + try { + delService.stateStore.removeDeletionTask(taskId); + } catch (IOException e) { + LOG.error("Unable to remove deletion task " + taskId + + " from state store", e); + } Iterator successorTaskI = this.successorTaskSet.iterator(); while (successorTaskI.hasNext()) { @@ -318,4 +355,129 @@ public class DeletionService extends Abs Path[] baseDirs) { return new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs)); } + + private void recover(RecoveredDeletionServiceState state) + throws IOException { + List taskProtos = state.getTasks(); + Map idToInfoMap = + new HashMap(taskProtos.size()); + Set successorTasks = new HashSet(); + for (DeletionServiceDeleteTaskProto proto : taskProtos) { + DeletionTaskRecoveryInfo info = parseTaskProto(proto); + idToInfoMap.put(info.task.taskId, info); + nextTaskId.set(Math.max(nextTaskId.get(), info.task.taskId)); + successorTasks.addAll(info.successorTaskIds); + } + + // restore the task dependencies and schedule the deletion tasks that + // have no predecessors + final long now = System.currentTimeMillis(); + for (DeletionTaskRecoveryInfo info : idToInfoMap.values()) { + for (Integer successorId : info.successorTaskIds){ + DeletionTaskRecoveryInfo successor = idToInfoMap.get(successorId); + if (successor != null) { + info.task.addFileDeletionTaskDependency(successor.task); + } else { + LOG.error("Unable to locate dependency task for deletion task " + + info.task.taskId + " at " + info.task.getSubDir()); + } + } + if (!successorTasks.contains(info.task.taskId)) { + long msecTilDeletion = info.deletionTimestamp - now; + sched.schedule(info.task, msecTilDeletion, TimeUnit.MILLISECONDS); + } + } + } + + private DeletionTaskRecoveryInfo parseTaskProto( + DeletionServiceDeleteTaskProto proto) throws IOException { + int taskId = proto.getId(); + String user = proto.hasUser() ? proto.getUser() : null; + Path subdir = null; + List basePaths = null; + if (proto.hasSubdir()) { + subdir = new Path(proto.getSubdir()); + } + List basedirs = proto.getBasedirsList(); + if (basedirs != null && basedirs.size() > 0) { + basePaths = new ArrayList(basedirs.size()); + for (String basedir : basedirs) { + basePaths.add(new Path(basedir)); + } + } + + FileDeletionTask task = new FileDeletionTask(taskId, this, user, + subdir, basePaths); + return new DeletionTaskRecoveryInfo(task, + proto.getSuccessorIdsList(), + proto.getDeletionTime()); + } + + private int generateTaskId() { + // get the next ID but avoid an invalid ID + int taskId = nextTaskId.incrementAndGet(); + while (taskId == FileDeletionTask.INVALID_TASK_ID) { + taskId = nextTaskId.incrementAndGet(); + } + return taskId; + } + + private void recordDeletionTaskInStateStore(FileDeletionTask task) { + if (!stateStore.canRecover()) { + // optimize the case where we aren't really recording + return; + } + if (task.taskId != FileDeletionTask.INVALID_TASK_ID) { + return; // task already recorded + } + + task.taskId = generateTaskId(); + + FileDeletionTask[] successors = task.getSuccessorTasks(); + + // store successors first to ensure task IDs have been generated for them + for (FileDeletionTask successor : successors) { + recordDeletionTaskInStateStore(successor); + } + + DeletionServiceDeleteTaskProto.Builder builder = + DeletionServiceDeleteTaskProto.newBuilder(); + builder.setId(task.taskId); + if (task.getUser() != null) { + builder.setUser(task.getUser()); + } + if (task.getSubDir() != null) { + builder.setSubdir(task.getSubDir().toString()); + } + builder.setDeletionTime(System.currentTimeMillis() + + TimeUnit.MILLISECONDS.convert(debugDelay, TimeUnit.SECONDS)); + if (task.getBaseDirs() != null) { + for (Path dir : task.getBaseDirs()) { + builder.addBasedirs(dir.toString()); + } + } + for (FileDeletionTask successor : successors) { + builder.addSuccessorIds(successor.taskId); + } + + try { + stateStore.storeDeletionTask(task.taskId, builder.build()); + } catch (IOException e) { + LOG.error("Unable to store deletion task " + task.taskId + " for " + + task.getSubDir(), e); + } + } + + private static class DeletionTaskRecoveryInfo { + FileDeletionTask task; + List successorTaskIds; + long deletionTimestamp; + + public DeletionTaskRecoveryInfo(FileDeletionTask task, + List successorTaskIds, long deletionTimestamp) { + this.task = task; + this.successorTaskIds = successorTaskIds; + this.deletionTimestamp = deletionTimestamp; + } + } } \ No newline at end of file Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1603036&r1=1603035&r2=1603036&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Tue Jun 17 01:02:16 2014 @@ -114,7 +114,7 @@ public class NodeManager extends Composi } protected DeletionService createDeletionService(ContainerExecutor exec) { - return new DeletionService(exec); + return new DeletionService(exec, nmStore); } protected NMContext createNMContext( Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java?rev=1603036&r1=1603035&r2=1603036&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java Tue Jun 17 01:02:16 2014 @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.permission.F import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -58,6 +59,9 @@ public class NMLeveldbStateStoreService private static final String DB_SCHEMA_VERSION_KEY = "schema-version"; private static final String DB_SCHEMA_VERSION = "1.0"; + private static final String DELETION_TASK_KEY_PREFIX = + "DeletionService/deltask_"; + private static final String LOCALIZATION_KEY_PREFIX = "Localization/"; private static final String LOCALIZATION_PUBLIC_KEY_PREFIX = LOCALIZATION_KEY_PREFIX + "public/"; @@ -309,6 +313,56 @@ public class NMLeveldbStateStoreService @Override + public RecoveredDeletionServiceState loadDeletionServiceState() + throws IOException { + RecoveredDeletionServiceState state = new RecoveredDeletionServiceState(); + state.tasks = new ArrayList(); + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(DELETION_TASK_KEY_PREFIX)); + while (iter.hasNext()) { + Entry entry = iter.next(); + String key = asString(entry.getKey()); + if (!key.startsWith(DELETION_TASK_KEY_PREFIX)) { + break; + } + state.tasks.add( + DeletionServiceDeleteTaskProto.parseFrom(entry.getValue())); + } + } catch (DBException e) { + throw new IOException(e.getMessage(), e); + } finally { + if (iter != null) { + iter.close(); + } + } + return state; + } + + @Override + public void storeDeletionTask(int taskId, + DeletionServiceDeleteTaskProto taskProto) throws IOException { + String key = DELETION_TASK_KEY_PREFIX + taskId; + try { + db.put(bytes(key), taskProto.toByteArray()); + } catch (DBException e) { + throw new IOException(e.getMessage(), e); + } + } + + @Override + public void removeDeletionTask(int taskId) throws IOException { + String key = DELETION_TASK_KEY_PREFIX + taskId; + try { + db.delete(bytes(key)); + } catch (DBException e) { + throw new IOException(e.getMessage(), e); + } + } + + + @Override protected void initStorage(Configuration conf) throws IOException { Path storeRoot = createStorageDir(conf); Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java?rev=1603036&r1=1603035&r2=1603036&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java Tue Jun 17 01:02:16 2014 @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; // The state store to use when state isn't being stored @@ -61,6 +62,22 @@ public class NMNullStateStoreService ext } @Override + public RecoveredDeletionServiceState loadDeletionServiceState() + throws IOException { + throw new UnsupportedOperationException( + "Recovery not supported by this state store"); + } + + @Override + public void storeDeletionTask(int taskId, + DeletionServiceDeleteTaskProto taskProto) throws IOException { + } + + @Override + public void removeDeletionTask(int taskId) throws IOException { + } + + @Override protected void initStorage(Configuration conf) throws IOException { } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java?rev=1603036&r1=1603035&r2=1603036&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java Tue Jun 17 01:02:16 2014 @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; @Private @@ -91,6 +92,14 @@ public abstract class NMStateStoreServic } } + public static class RecoveredDeletionServiceState { + List tasks; + + public List getTasks() { + return tasks; + } + } + /** Initialize the state storage */ @Override public void serviceInit(Configuration conf) throws IOException { @@ -155,6 +164,15 @@ public abstract class NMStateStoreServic ApplicationId appId, Path localPath) throws IOException; + public abstract RecoveredDeletionServiceState loadDeletionServiceState() + throws IOException; + + public abstract void storeDeletionTask(int taskId, + DeletionServiceDeleteTaskProto taskProto) throws IOException; + + public abstract void removeDeletionTask(int taskId) throws IOException; + + protected abstract void initStorage(Configuration conf) throws IOException; protected abstract void startStorage() throws IOException; Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto?rev=1603036&r1=1603035&r2=1603036&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto Tue Jun 17 01:02:16 2014 @@ -24,6 +24,15 @@ package hadoop.yarn; import "yarn_protos.proto"; +message DeletionServiceDeleteTaskProto { + optional int32 id = 1; + optional string user = 2; + optional string subdir = 3; + optional int64 deletionTime = 4; + repeated string basedirs = 5; + repeated int32 successorIds = 6; +} + message LocalizedResourceProto { optional LocalResourceProto resource = 1; optional string localPath = 2; Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java?rev=1603036&r1=1603035&r2=1603036&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java Tue Jun 17 01:02:16 2014 @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.junit.AfterClass; import org.junit.Test; import org.mockito.Mockito; @@ -285,4 +286,58 @@ public class TestDeletionService { del.stop(); } } + + @Test + public void testRecovery() throws Exception { + Random r = new Random(); + long seed = r.nextLong(); + r.setSeed(seed); + System.out.println("SEED: " + seed); + List baseDirs = buildDirs(r, base, 4); + createDirs(new Path("."), baseDirs); + List content = buildDirs(r, new Path("."), 10); + for (Path b : baseDirs) { + createDirs(b, content); + } + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 1); + NMMemoryStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + DeletionService del = + new DeletionService(new FakeDefaultContainerExecutor(), stateStore); + try { + del.init(conf); + del.start(); + for (Path p : content) { + assertTrue(lfs.util().exists(new Path(baseDirs.get(0), p))); + del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", + p, baseDirs.toArray(new Path[4])); + } + + // restart the deletion service + del.stop(); + del = new DeletionService(new FakeDefaultContainerExecutor(), + stateStore); + del.init(conf); + del.start(); + + // verify paths are still eventually deleted + int msecToWait = 10 * 1000; + for (Path p : baseDirs) { + for (Path q : content) { + Path fp = new Path(p, q); + while (msecToWait > 0 && lfs.util().exists(fp)) { + Thread.sleep(100); + msecToWait -= 100; + } + assertFalse(lfs.util().exists(fp)); + } + } + } finally { + del.close(); + stateStore.close(); + } + } } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java?rev=1603036&r1=1603035&r2=1603036&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java Tue Jun 17 01:02:16 2014 @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery; +import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; @@ -25,10 +27,12 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; public class NMMemoryStateStoreService extends NMStateStoreService { private Map trackerStates; + private Map deleteTasks; public NMMemoryStateStoreService() { super(NMMemoryStateStoreService.class.getName()); @@ -110,6 +114,7 @@ public class NMMemoryStateStoreService e @Override protected void initStorage(Configuration conf) { trackerStates = new HashMap(); + deleteTasks = new HashMap(); } @Override @@ -121,6 +126,28 @@ public class NMMemoryStateStoreService e } + @Override + public RecoveredDeletionServiceState loadDeletionServiceState() + throws IOException { + RecoveredDeletionServiceState result = + new RecoveredDeletionServiceState(); + result.tasks = new ArrayList( + deleteTasks.values()); + return result; + } + + @Override + public synchronized void storeDeletionTask(int taskId, + DeletionServiceDeleteTaskProto taskProto) throws IOException { + deleteTasks.put(taskId, taskProto); + } + + @Override + public synchronized void removeDeletionTask(int taskId) throws IOException { + deleteTasks.remove(taskId); + } + + private static class TrackerState { Map inProgressMap = new HashMap(); Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java?rev=1603036&r1=1603035&r2=1603036&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java Tue Jun 17 01:02:16 2014 @@ -35,8 +35,10 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -404,4 +406,58 @@ public class TestNMLeveldbStateStoreServ state.getUserResources(); assertTrue(userResources.isEmpty()); } + + @Test + public void testDeletionTaskStorage() throws IOException { + // test empty when no state + RecoveredDeletionServiceState state = + stateStore.loadDeletionServiceState(); + assertTrue(state.getTasks().isEmpty()); + + // store a deletion task and verify recovered + DeletionServiceDeleteTaskProto proto = + DeletionServiceDeleteTaskProto.newBuilder() + .setId(7) + .setUser("someuser") + .setSubdir("some/subdir") + .addBasedirs("some/dir/path") + .addBasedirs("some/other/dir/path") + .setDeletionTime(123456L) + .addSuccessorIds(8) + .addSuccessorIds(9) + .build(); + stateStore.storeDeletionTask(proto.getId(), proto); + restartStateStore(); + state = stateStore.loadDeletionServiceState(); + assertEquals(1, state.getTasks().size()); + assertEquals(proto, state.getTasks().get(0)); + + // store another deletion task + DeletionServiceDeleteTaskProto proto2 = + DeletionServiceDeleteTaskProto.newBuilder() + .setId(8) + .setUser("user2") + .setSubdir("subdir2") + .setDeletionTime(789L) + .build(); + stateStore.storeDeletionTask(proto2.getId(), proto2); + restartStateStore(); + state = stateStore.loadDeletionServiceState(); + assertEquals(2, state.getTasks().size()); + assertTrue(state.getTasks().contains(proto)); + assertTrue(state.getTasks().contains(proto2)); + + // delete a task and verify gone after recovery + stateStore.removeDeletionTask(proto2.getId()); + restartStateStore(); + state = stateStore.loadDeletionServiceState(); + assertEquals(1, state.getTasks().size()); + assertEquals(proto, state.getTasks().get(0)); + + // delete the last task and verify none left + stateStore.removeDeletionTask(proto.getId()); + restartStateStore(); + state = stateStore.loadDeletionServiceState(); + assertTrue(state.getTasks().isEmpty()); + } }