hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From su...@apache.org
Subject [30/35] YARN-2080. Integrating reservation system with ResourceManager and client-RM protocol. Contributed by Subru Krishnan and Carlo Curino.
Date Wed, 24 Sep 2014 21:13:21 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/20f6b7fa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 0b8f321..cf59c7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -78,18 +79,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
-
-import com.google.common.annotations.VisibleForTesting;
 
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class RMAppImpl implements RMApp, Recoverable {
@@ -118,12 +113,6 @@ public class RMAppImpl implements RMApp, Recoverable {
   private final String applicationType;
   private final Set<String> applicationTags;
 
-  private final long attemptFailuresValidityInterval;
-
-  private Clock systemClock;
-
-  private boolean isNumAttemptsBeyondThreshold = false;
-
   // Mutable fields
   private long startTime;
   private long finishTime = 0;
@@ -342,8 +331,6 @@ public class RMAppImpl implements RMApp, Recoverable {
       ApplicationMasterService masterService, long submitTime,
       String applicationType, Set<String> applicationTags) {
 
-    this.systemClock = new SystemClock();
-
     this.applicationId = applicationId;
     this.name = name;
     this.rmContext = rmContext;
@@ -356,7 +343,7 @@ public class RMAppImpl implements RMApp, Recoverable {
     this.scheduler = scheduler;
     this.masterService = masterService;
     this.submitTime = submitTime;
-    this.startTime = this.systemClock.getTime();
+    this.startTime = System.currentTimeMillis();
     this.applicationType = applicationType;
     this.applicationTags = applicationTags;
 
@@ -374,9 +361,6 @@ public class RMAppImpl implements RMApp, Recoverable {
       this.maxAppAttempts = individualMaxAppAttempts;
     }
 
-    this.attemptFailuresValidityInterval =
-        submissionContext.getAttemptFailuresValidityInterval();
-
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
     this.writeLock = lock.writeLock();
@@ -384,7 +368,6 @@ public class RMAppImpl implements RMApp, Recoverable {
     this.stateMachine = stateMachineFactory.make(this);
 
     rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
-    rmContext.getSystemMetricsPublisher().appCreated(this, startTime);
   }
 
   @Override
@@ -546,7 +529,6 @@ public class RMAppImpl implements RMApp, Recoverable {
       float progress = 0.0f;
       org.apache.hadoop.yarn.api.records.Token amrmToken = null;
       if (allowAccess) {
-        trackingUrl = getDefaultProxyTrackingUrl();
         if (this.currentAttempt != null) {
           currentApplicationAttemptId = this.currentAttempt.getAppAttemptId();
           trackingUrl = this.currentAttempt.getTrackingUrl();
@@ -607,20 +589,6 @@ public class RMAppImpl implements RMApp, Recoverable {
     }
   }
 
-  private String getDefaultProxyTrackingUrl() {
-    try {
-      final String scheme = WebAppUtils.getHttpSchemePrefix(conf);
-      String proxy = WebAppUtils.getProxyHostAndPort(conf);
-      URI proxyUri = ProxyUriUtils.getUriFromAMUrl(scheme, proxy);
-      URI result = ProxyUriUtils.getProxyUri(null, proxyUri, applicationId);
-      return result.toASCIIString();
-    } catch (URISyntaxException e) {
-      LOG.warn("Could not generate default proxy tracking URL for "
-          + applicationId);
-      return UNAVAILABLE;
-    }
-  }
-
   @Override
   public long getFinishTime() {
     this.readLock.lock();
@@ -663,20 +631,6 @@ public class RMAppImpl implements RMApp, Recoverable {
   }
 
   @Override
-  public String getOriginalTrackingUrl() {
-    this.readLock.lock();
-    
-    try {
-      if (this.currentAttempt != null) {
-        return this.currentAttempt.getOriginalTrackingUrl();
-      }
-      return null;
-    } finally {
-      this.readLock.unlock();
-    }
-  }
-
-  @Override
   public StringBuilder getDiagnostics() {
     this.readLock.lock();
 
@@ -846,14 +800,16 @@ public class RMAppImpl implements RMApp, Recoverable {
       // started or started but not yet saved.
       if (app.attempts.isEmpty()) {
         app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
-          app.submissionContext.getQueue(), app.user));
+            app.submissionContext.getQueue(), app.user, app.submissionContext
+                .getReservationID()));
         return RMAppState.SUBMITTED;
       }
 
       // Add application to scheduler synchronously to guarantee scheduler
       // knows applications before AM or NM re-registers.
       app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
-        app.submissionContext.getQueue(), app.user, true));
+          app.submissionContext.getQueue(), app.user, true,
+          app.submissionContext.getReservationID()));
 
       // recover attempts
       app.recoverAppAttempts();
@@ -880,7 +836,8 @@ public class RMAppImpl implements RMApp, Recoverable {
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
       app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
-        app.submissionContext.getQueue(), app.user));
+          app.submissionContext.getQueue(), app.user, app.submissionContext
+              .getReservationID()));
     }
   }
 
@@ -931,7 +888,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       msg = "Unmanaged application " + this.getApplicationId()
               + " failed due to " + failedEvent.getDiagnostics()
               + ". Failing the application.";
-    } else if (this.isNumAttemptsBeyondThreshold) {
+    } else if (getNumFailedAppAttempts() >= this.maxAppAttempts) {
       msg = "Application " + this.getApplicationId() + " failed "
               + this.maxAppAttempts + " times due to "
               + failedEvent.getDiagnostics() + ". Failing the application.";
@@ -964,7 +921,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       RMAppState stateToBeStored) {
     rememberTargetTransitions(event, transitionToDo, targetFinalState);
     this.stateBeforeFinalSaving = getState();
-    this.storedFinishTime = this.systemClock.getTime();
+    this.storedFinishTime = System.currentTimeMillis();
 
     LOG.info("Updating application " + this.applicationId
         + " with final state: " + this.targetedFinalState);
@@ -1131,7 +1088,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       }
       app.finishTime = app.storedFinishTime;
       if (app.finishTime == 0 ) {
-        app.finishTime = app.systemClock.getTime();
+        app.finishTime = System.currentTimeMillis();
       }
       // Recovered apps that are completed were not added to scheduler, so no
       // need to remove them from scheduler.
@@ -1145,23 +1102,16 @@ public class RMAppImpl implements RMApp, Recoverable {
 
       app.rmContext.getRMApplicationHistoryWriter()
           .applicationFinished(app, finalState);
-      app.rmContext.getSystemMetricsPublisher()
-          .appFinished(app, finalState, app.finishTime);
     };
   }
 
   private int getNumFailedAppAttempts() {
     int completedAttempts = 0;
-    long endTime = this.systemClock.getTime();
     // Do not count AM preemption, hardware failures or NM resync
     // as attempt failure.
     for (RMAppAttempt attempt : attempts.values()) {
       if (attempt.shouldCountTowardsMaxAttemptRetry()) {
-        if (this.attemptFailuresValidityInterval <= 0
-            || (attempt.getFinishTime() > endTime
-                - this.attemptFailuresValidityInterval)) {
-          completedAttempts++;
-        }
+        completedAttempts++;
       }
     }
     return completedAttempts;
@@ -1178,10 +1128,9 @@ public class RMAppImpl implements RMApp, Recoverable {
 
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
-      int numberOfFailure = app.getNumFailedAppAttempts();
       if (!app.submissionContext.getUnmanagedAM()
-          && numberOfFailure < app.maxAppAttempts) {
-        boolean transferStateFromPreviousAttempt;
+          && app.getNumFailedAppAttempts() < app.maxAppAttempts) {
+        boolean transferStateFromPreviousAttempt = false;
         RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
         transferStateFromPreviousAttempt =
             failedEvent.getTransferStateFromPreviousAttempt();
@@ -1191,16 +1140,13 @@ public class RMAppImpl implements RMApp, Recoverable {
         // Transfer the state from the previous attempt to the current attempt.
         // Note that the previous failed attempt may still be collecting the
         // container events from the scheduler and update its data structures
-        // before the new attempt is created. We always transferState for
-        // finished containers so that they can be acked to NM,
-        // but when pulling finished container we will check this flag again.
-        ((RMAppAttemptImpl) app.currentAttempt)
-          .transferStateFromPreviousAttempt(oldAttempt);
+        // before the new attempt is created.
+        if (transferStateFromPreviousAttempt) {
+          ((RMAppAttemptImpl) app.currentAttempt)
+            .transferStateFromPreviousAttempt(oldAttempt);
+        }
         return initialState;
       } else {
-        if (numberOfFailure >= app.maxAppAttempts) {
-          app.isNumAttemptsBeyondThreshold = true;
-        }
         app.rememberTargetTransitionsAndStoreState(event,
           new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED,
           RMAppState.FAILED);
@@ -1293,4 +1239,9 @@ public class RMAppImpl implements RMApp, Recoverable {
   public void setSystemClock(Clock clock) {
     this.systemClock = clock;
   }
+
+  @Override
+  public ReservationId getReservationId() {
+    return submissionContext.getReservationID();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20f6b7fa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
index 7e0b89e..a54e4bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
@@ -19,25 +19,33 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 
 public class AppAddedSchedulerEvent extends SchedulerEvent {
 
   private final ApplicationId applicationId;
   private final String queue;
   private final String user;
+  private final ReservationId reservationID;
   private final boolean isAppRecovering;
 
   public AppAddedSchedulerEvent(
       ApplicationId applicationId, String queue, String user) {
-    this(applicationId, queue, user, false);
+    this(applicationId, queue, user, false, null);
   }
 
   public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
-      String user, boolean isAppRecovering) {
+      String user, ReservationId reservationID) {
+    this(applicationId, queue, user, false, reservationID);
+  }
+
+  public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
+      String user, boolean isAppRecovering, ReservationId reservationID) {
     super(SchedulerEventType.APP_ADDED);
     this.applicationId = applicationId;
     this.queue = queue;
     this.user = user;
+    this.reservationID = reservationID;
     this.isAppRecovering = isAppRecovering;
   }
 
@@ -56,4 +64,8 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
   public boolean getIsAppRecovering() {
     return isAppRecovering;
   }
+
+  public ReservationId getReservationID() {
+    return reservationID;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20f6b7fa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index db867a9..954e21d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -33,6 +33,7 @@ import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -74,6 +75,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -91,7 +98,12 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -107,6 +119,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
@@ -117,11 +130,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.UTCClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -1199,4 +1217,102 @@ public class TestClientRMService {
     when(yarnScheduler.getAppResourceUsageReport(attemptId)).thenReturn(null);
     return yarnScheduler;
   }
+
+  @Test
+  public void testReservationAPIs() {
+    // initialize
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    ReservationSystemTestUtil.setupQueueConfiguration(conf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    MockNM nm;
+    try {
+      nm = rm.registerNode("127.0.0.1:0", 102400, 100);
+      // allow plan follower to synchronize
+      Thread.sleep(1050);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+
+    // Create a client.
+    ClientRMService clientService = rm.getClientRMService();
+
+    // create a reservation
+    Clock clock = new UTCClock();
+    long arrival = clock.getTime();
+    long duration = 60000;
+    long deadline = (long) (arrival + 1.05 * duration);
+    ReservationSubmissionRequest sRequest =
+        createSimpleReservationRequest(4, arrival, deadline, duration);
+    ReservationSubmissionResponse sResponse = null;
+    try {
+      sResponse = clientService.submitReservation(sRequest);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(sResponse);
+    ReservationId reservationID = sResponse.getReservationId();
+    Assert.assertNotNull(reservationID);
+    LOG.info("Submit reservation response: " + reservationID);
+
+    // Update the reservation
+    ReservationDefinition rDef = sRequest.getReservationDefinition();
+    ReservationRequest rr =
+        rDef.getReservationRequests().getReservationResources().get(0);
+    rr.setNumContainers(5);
+    arrival = clock.getTime();
+    duration = 30000;
+    deadline = (long) (arrival + 1.05 * duration);
+    rr.setDuration(duration);
+    rDef.setArrival(arrival);
+    rDef.setDeadline(deadline);
+    ReservationUpdateRequest uRequest =
+        ReservationUpdateRequest.newInstance(rDef, reservationID);
+    ReservationUpdateResponse uResponse = null;
+    try {
+      uResponse = clientService.updateReservation(uRequest);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(sResponse);
+    LOG.info("Update reservation response: " + uResponse);
+
+    // Delete the reservation
+    ReservationDeleteRequest dRequest =
+        ReservationDeleteRequest.newInstance(reservationID);
+    ReservationDeleteResponse dResponse = null;
+    try {
+      dResponse = clientService.deleteReservation(dRequest);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(sResponse);
+    LOG.info("Delete reservation response: " + dResponse);
+
+    // clean-up
+    rm.stop();
+    nm = null;
+    rm = null;
+  }
+
+  private ReservationSubmissionRequest createSimpleReservationRequest(
+      int numContainers, long arrival, long deadline, long duration) {
+    // create a request with a single atomic ask
+    ReservationRequest r =
+        ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+            numContainers, 1, duration);
+    ReservationRequests reqs =
+        ReservationRequests.newInstance(Collections.singletonList(r),
+            ReservationRequestInterpreter.R_ALL);
+    ReservationDefinition rDef =
+        ReservationDefinition.newInstance(arrival, deadline, reqs,
+            "testClientRMService#reservation");
+    ReservationSubmissionRequest request =
+        ReservationSubmissionRequest.newInstance(rDef,
+            ReservationSystemTestUtil.reservationQ);
+    return request;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20f6b7fa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityReservationSystem.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityReservationSystem.java
new file mode 100644
index 0000000..2a77791
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityReservationSystem.java
@@ -0,0 +1,102 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCapacityReservationSystem {
+
+  @Test
+  public void testInitialize() {
+    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+    CapacityScheduler capScheduler = null;
+    try {
+      capScheduler = testUtil.mockCapacityScheduler(10);
+    } catch (IOException e) {
+      Assert.fail(e.getMessage());
+    }
+    CapacityReservationSystem reservationSystem =
+        new CapacityReservationSystem();
+    reservationSystem.setRMContext(capScheduler.getRMContext());
+    try {
+      reservationSystem.reinitialize(capScheduler.getConf(),
+          capScheduler.getRMContext());
+    } catch (YarnException e) {
+      Assert.fail(e.getMessage());
+    }
+    String planQName = testUtil.getreservationQueueName();
+    Plan plan = reservationSystem.getPlan(planQName);
+    Assert.assertNotNull(plan);
+    Assert.assertTrue(plan instanceof InMemoryPlan);
+    Assert.assertEquals(planQName, plan.getQueueName());
+    Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
+    Assert
+        .assertTrue(plan.getReservationAgent() instanceof GreedyReservationAgent);
+    Assert
+        .assertTrue(plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
+  }
+
+  @Test
+  public void testReinitialize() {
+    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+    CapacityScheduler capScheduler = null;
+    try {
+      capScheduler = testUtil.mockCapacityScheduler(10);
+    } catch (IOException e) {
+      Assert.fail(e.getMessage());
+    }
+    CapacityReservationSystem reservationSystem =
+        new CapacityReservationSystem();
+    CapacitySchedulerConfiguration conf = capScheduler.getConfiguration();
+    RMContext mockContext = capScheduler.getRMContext();
+    reservationSystem.setRMContext(mockContext);
+    try {
+      reservationSystem.reinitialize(capScheduler.getConfiguration(),
+          mockContext);
+    } catch (YarnException e) {
+      Assert.fail(e.getMessage());
+    }
+    // Assert queue in original config
+    String planQName = testUtil.getreservationQueueName();
+    Plan plan = reservationSystem.getPlan(planQName);
+    Assert.assertNotNull(plan);
+    Assert.assertTrue(plan instanceof InMemoryPlan);
+    Assert.assertEquals(planQName, plan.getQueueName());
+    Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
+    Assert
+        .assertTrue(plan.getReservationAgent() instanceof GreedyReservationAgent);
+    Assert
+        .assertTrue(plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
+
+    // Dynamically add a plan
+    String newQ = "reservation";
+    Assert.assertNull(reservationSystem.getPlan(newQ));
+    testUtil.updateQueueConfiguration(conf, newQ);
+    try {
+      capScheduler.reinitialize(conf, mockContext);
+    } catch (IOException e) {
+      Assert.fail(e.getMessage());
+    }
+    try {
+      reservationSystem.reinitialize(conf, mockContext);
+    } catch (YarnException e) {
+      Assert.fail(e.getMessage());
+    }
+    Plan newPlan = reservationSystem.getPlan(newQ);
+    Assert.assertNotNull(newPlan);
+    Assert.assertTrue(newPlan instanceof InMemoryPlan);
+    Assert.assertEquals(newQ, newPlan.getQueueName());
+    Assert.assertEquals(1024, newPlan.getTotalCapacity().getMemory());
+    Assert
+        .assertTrue(newPlan.getReservationAgent() instanceof GreedyReservationAgent);
+    Assert
+        .assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/20f6b7fa/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationInputValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationInputValidator.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationInputValidator.java
new file mode 100644
index 0000000..f5917bb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationInputValidator.java
@@ -0,0 +1,560 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.text.MessageFormat;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationDeleteRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateRequestPBImpl;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestReservationInputValidator {
+
+  private static final Log LOG = LogFactory
+      .getLog(TestReservationInputValidator.class);
+
+  private static final String PLAN_NAME = "test-reservation";
+
+  private Clock clock;
+  private Map<String, Plan> plans = new HashMap<String, Plan>(1);
+  private ReservationSystem rSystem;
+  private Plan plan;
+
+  private ReservationInputValidator rrValidator;
+
+  @Before
+  public void setUp() {
+    clock = mock(Clock.class);
+    plan = mock(Plan.class);
+    rSystem = mock(ReservationSystem.class);
+    plans.put(PLAN_NAME, plan);
+    rrValidator = new ReservationInputValidator(clock);
+    when(clock.getTime()).thenReturn(1L);
+    ResourceCalculator rCalc = new DefaultResourceCalculator();
+    Resource resource = Resource.newInstance(10240, 10);
+    when(plan.getResourceCalculator()).thenReturn(rCalc);
+    when(plan.getTotalCapacity()).thenReturn(resource);
+    when(rSystem.getQueueForReservation(any(ReservationId.class))).thenReturn(
+        PLAN_NAME);
+    when(rSystem.getPlan(PLAN_NAME)).thenReturn(plan);
+  }
+
+  @After
+  public void tearDown() {
+    rrValidator = null;
+    clock = null;
+    plan = null;
+  }
+
+  @Test
+  public void testSubmitReservationNormal() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(1, 1, 1, 5, 3);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+    } catch (YarnException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(plan);
+  }
+
+  @Test
+  public void testSubmitReservationDoesnotExist() {
+    ReservationSubmissionRequest request =
+        new ReservationSubmissionRequestPBImpl();
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .equals("The queue to submit is not specified. Please try again with a valid
reservable queue."));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationInvalidPlan() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(1, 1, 1, 5, 3);
+    when(rSystem.getPlan(PLAN_NAME)).thenReturn(null);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .endsWith(" is not managed by reservation system. Please try again with a valid
reservable queue."));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationNoDefinition() {
+    ReservationSubmissionRequest request =
+        new ReservationSubmissionRequestPBImpl();
+    request.setQueue(PLAN_NAME);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .equals("Missing reservation definition. Please try again by specifying a reservation
definition."));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationInvalidDeadline() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(1, 1, 1, 0, 3);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message
+          .startsWith("The specified deadline: 0 is the past"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationInvalidRR() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(0, 0, 1, 5, 3);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message
+          .startsWith("No resources have been specified to reserve"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationEmptyRR() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(1, 0, 1, 5, 3);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message
+          .startsWith("No resources have been specified to reserve"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationInvalidDuration() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(1, 1, 1, 3, 4);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message.startsWith("The time difference"));
+      Assert
+          .assertTrue(message
+              .contains("must  be greater or equal to the minimum resource duration"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationExceedsGangSize() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(1, 1, 1, 5, 4);
+    Resource resource = Resource.newInstance(512, 1);
+    when(plan.getTotalCapacity()).thenReturn(resource);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .startsWith("The size of the largest gang in the reservation refinition"));
+      Assert.assertTrue(message.contains("exceed the capacity available "));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationNormal() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(1, 1, 1, 5, 3);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+    } catch (YarnException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(plan);
+  }
+
+  @Test
+  public void testUpdateReservationNoID() {
+    ReservationUpdateRequest request = new ReservationUpdateRequestPBImpl();
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .startsWith("Missing reservation id. Please try again by specifying a reservation
id."));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationDoesnotExist() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(1, 1, 1, 5, 4);
+    ReservationId rId = request.getReservationId();
+    when(rSystem.getQueueForReservation(rId)).thenReturn(null);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message.equals(MessageFormat
+              .format(
+                  "The specified reservation with ID: {0} is unknown. Please try again with
a valid reservation.",
+                  rId)));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationInvalidPlan() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(1, 1, 1, 5, 4);
+    when(rSystem.getPlan(PLAN_NAME)).thenReturn(null);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .endsWith(" is not associated with any valid plan. Please try again with a
valid reservation."));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationNoDefinition() {
+    ReservationUpdateRequest request = new ReservationUpdateRequestPBImpl();
+    request.setReservationId(ReservationSystemTestUtil.getNewReservationId());
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .startsWith("Missing reservation definition. Please try again by specifying
a reservation definition."));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationInvalidDeadline() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(1, 1, 1, 0, 3);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message
+          .startsWith("The specified deadline: 0 is the past"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationInvalidRR() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(0, 0, 1, 5, 3);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message
+          .startsWith("No resources have been specified to reserve"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationEmptyRR() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(1, 0, 1, 5, 3);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message
+          .startsWith("No resources have been specified to reserve"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationInvalidDuration() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(1, 1, 1, 3, 4);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .contains("must  be greater or equal to the minimum resource duration"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationExceedsGangSize() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(1, 1, 1, 5, 4);
+    Resource resource = Resource.newInstance(512, 1);
+    when(plan.getTotalCapacity()).thenReturn(resource);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .startsWith("The size of the largest gang in the reservation refinition"));
+      Assert.assertTrue(message.contains("exceed the capacity available "));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testDeleteReservationNormal() {
+    ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    request.setReservationId(reservationID);
+    ReservationAllocation reservation = mock(ReservationAllocation.class);
+    when(plan.getReservationById(reservationID)).thenReturn(reservation);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
+    } catch (YarnException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(plan);
+  }
+
+  @Test
+  public void testDeleteReservationNoID() {
+    ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .startsWith("Missing reservation id. Please try again by specifying a reservation
id."));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testDeleteReservationDoesnotExist() {
+    ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
+    ReservationId rId = ReservationSystemTestUtil.getNewReservationId();
+    request.setReservationId(rId);
+    when(rSystem.getQueueForReservation(rId)).thenReturn(null);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message.equals(MessageFormat
+              .format(
+                  "The specified reservation with ID: {0} is unknown. Please try again with
a valid reservation.",
+                  rId)));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testDeleteReservationInvalidPlan() {
+    ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    request.setReservationId(reservationID);
+    when(rSystem.getPlan(PLAN_NAME)).thenReturn(null);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .endsWith(" is not associated with any valid plan. Please try again with a
valid reservation."));
+      LOG.info(message);
+    }
+  }
+
+  private ReservationSubmissionRequest createSimpleReservationSubmissionRequest(
+      int numRequests, int numContainers, long arrival, long deadline,
+      long duration) {
+    // create a request with a single atomic ask
+    ReservationSubmissionRequest request =
+        new ReservationSubmissionRequestPBImpl();
+    ReservationDefinition rDef = new ReservationDefinitionPBImpl();
+    rDef.setArrival(arrival);
+    rDef.setDeadline(deadline);
+    if (numRequests > 0) {
+      ReservationRequests reqs = new ReservationRequestsPBImpl();
+      rDef.setReservationRequests(reqs);
+      if (numContainers > 0) {
+        ReservationRequest r =
+            ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+                numContainers, 1, duration);
+
+        reqs.setReservationResources(Collections.singletonList(r));
+        reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+      }
+    }
+    request.setQueue(PLAN_NAME);
+    request.setReservationDefinition(rDef);
+    return request;
+  }
+
+  private ReservationUpdateRequest createSimpleReservationUpdateRequest(
+      int numRequests, int numContainers, long arrival, long deadline,
+      long duration) {
+    // create a request with a single atomic ask
+    ReservationUpdateRequest request = new ReservationUpdateRequestPBImpl();
+    ReservationDefinition rDef = new ReservationDefinitionPBImpl();
+    rDef.setArrival(arrival);
+    rDef.setDeadline(deadline);
+    if (numRequests > 0) {
+      ReservationRequests reqs = new ReservationRequestsPBImpl();
+      rDef.setReservationRequests(reqs);
+      if (numContainers > 0) {
+        ReservationRequest r =
+            ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+                numContainers, 1, duration);
+
+        reqs.setReservationResources(Collections.singletonList(r));
+        reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+      }
+    }
+    request.setReservationDefinition(rDef);
+    request.setReservationId(ReservationSystemTestUtil.getNewReservationId());
+    return request;
+  }
+
+}


Mime
View raw message