hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cur...@apache.org
Subject [48/50] [abbrv] YARN-2080. Integrating reservation system with ResourceManager and client-RM protocol. Contributed by Subru Krishnan and Carlo Curino.
Date Mon, 22 Sep 2014 23:48:37 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e53fdea3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
index 7e0b89e..a54e4bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
@@ -19,25 +19,33 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 
 public class AppAddedSchedulerEvent extends SchedulerEvent {
 
   private final ApplicationId applicationId;
   private final String queue;
   private final String user;
+  private final ReservationId reservationID;
   private final boolean isAppRecovering;
 
   public AppAddedSchedulerEvent(
       ApplicationId applicationId, String queue, String user) {
-    this(applicationId, queue, user, false);
+    this(applicationId, queue, user, false, null);
   }
 
   public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
-      String user, boolean isAppRecovering) {
+      String user, ReservationId reservationID) {
+    this(applicationId, queue, user, false, reservationID);
+  }
+
+  public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
+      String user, boolean isAppRecovering, ReservationId reservationID) {
     super(SchedulerEventType.APP_ADDED);
     this.applicationId = applicationId;
     this.queue = queue;
     this.user = user;
+    this.reservationID = reservationID;
     this.isAppRecovering = isAppRecovering;
   }
 
@@ -56,4 +64,8 @@ public class AppAddedSchedulerEvent extends SchedulerEvent {
   public boolean getIsAppRecovering() {
     return isAppRecovering;
   }
+
+  public ReservationId getReservationID() {
+    return reservationID;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e53fdea3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index db867a9..954e21d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -33,6 +33,7 @@ import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -74,6 +75,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -91,7 +98,12 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -107,6 +119,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
@@ -117,11 +130,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.UTCClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -1199,4 +1217,102 @@ public class TestClientRMService {
     when(yarnScheduler.getAppResourceUsageReport(attemptId)).thenReturn(null);
     return yarnScheduler;
   }
+
+  @Test
+  public void testReservationAPIs() {
+    // initialize
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    ReservationSystemTestUtil.setupQueueConfiguration(conf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    MockNM nm;
+    try {
+      nm = rm.registerNode("127.0.0.1:0", 102400, 100);
+      // allow plan follower to synchronize
+      Thread.sleep(1050);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+
+    // Create a client.
+    ClientRMService clientService = rm.getClientRMService();
+
+    // create a reservation
+    Clock clock = new UTCClock();
+    long arrival = clock.getTime();
+    long duration = 60000;
+    long deadline = (long) (arrival + 1.05 * duration);
+    ReservationSubmissionRequest sRequest =
+        createSimpleReservationRequest(4, arrival, deadline, duration);
+    ReservationSubmissionResponse sResponse = null;
+    try {
+      sResponse = clientService.submitReservation(sRequest);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(sResponse);
+    ReservationId reservationID = sResponse.getReservationId();
+    Assert.assertNotNull(reservationID);
+    LOG.info("Submit reservation response: " + reservationID);
+
+    // Update the reservation
+    ReservationDefinition rDef = sRequest.getReservationDefinition();
+    ReservationRequest rr =
+        rDef.getReservationRequests().getReservationResources().get(0);
+    rr.setNumContainers(5);
+    arrival = clock.getTime();
+    duration = 30000;
+    deadline = (long) (arrival + 1.05 * duration);
+    rr.setDuration(duration);
+    rDef.setArrival(arrival);
+    rDef.setDeadline(deadline);
+    ReservationUpdateRequest uRequest =
+        ReservationUpdateRequest.newInstance(rDef, reservationID);
+    ReservationUpdateResponse uResponse = null;
+    try {
+      uResponse = clientService.updateReservation(uRequest);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(sResponse);
+    LOG.info("Update reservation response: " + uResponse);
+
+    // Delete the reservation
+    ReservationDeleteRequest dRequest =
+        ReservationDeleteRequest.newInstance(reservationID);
+    ReservationDeleteResponse dResponse = null;
+    try {
+      dResponse = clientService.deleteReservation(dRequest);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(sResponse);
+    LOG.info("Delete reservation response: " + dResponse);
+
+    // clean-up
+    rm.stop();
+    nm = null;
+    rm = null;
+  }
+
+  private ReservationSubmissionRequest createSimpleReservationRequest(
+      int numContainers, long arrival, long deadline, long duration) {
+    // create a request with a single atomic ask
+    ReservationRequest r =
+        ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+            numContainers, 1, duration);
+    ReservationRequests reqs =
+        ReservationRequests.newInstance(Collections.singletonList(r),
+            ReservationRequestInterpreter.R_ALL);
+    ReservationDefinition rDef =
+        ReservationDefinition.newInstance(arrival, deadline, reqs,
+            "testClientRMService#reservation");
+    ReservationSubmissionRequest request =
+        ReservationSubmissionRequest.newInstance(rDef,
+            ReservationSystemTestUtil.reservationQ);
+    return request;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e53fdea3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityReservationSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityReservationSystem.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityReservationSystem.java
new file mode 100644
index 0000000..2a77791
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityReservationSystem.java
@@ -0,0 +1,102 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCapacityReservationSystem {
+
+  @Test
+  public void testInitialize() {
+    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+    CapacityScheduler capScheduler = null;
+    try {
+      capScheduler = testUtil.mockCapacityScheduler(10);
+    } catch (IOException e) {
+      Assert.fail(e.getMessage());
+    }
+    CapacityReservationSystem reservationSystem =
+        new CapacityReservationSystem();
+    reservationSystem.setRMContext(capScheduler.getRMContext());
+    try {
+      reservationSystem.reinitialize(capScheduler.getConf(),
+          capScheduler.getRMContext());
+    } catch (YarnException e) {
+      Assert.fail(e.getMessage());
+    }
+    String planQName = testUtil.getreservationQueueName();
+    Plan plan = reservationSystem.getPlan(planQName);
+    Assert.assertNotNull(plan);
+    Assert.assertTrue(plan instanceof InMemoryPlan);
+    Assert.assertEquals(planQName, plan.getQueueName());
+    Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
+    Assert
+        .assertTrue(plan.getReservationAgent() instanceof GreedyReservationAgent);
+    Assert
+        .assertTrue(plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
+  }
+
+  @Test
+  public void testReinitialize() {
+    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+    CapacityScheduler capScheduler = null;
+    try {
+      capScheduler = testUtil.mockCapacityScheduler(10);
+    } catch (IOException e) {
+      Assert.fail(e.getMessage());
+    }
+    CapacityReservationSystem reservationSystem =
+        new CapacityReservationSystem();
+    CapacitySchedulerConfiguration conf = capScheduler.getConfiguration();
+    RMContext mockContext = capScheduler.getRMContext();
+    reservationSystem.setRMContext(mockContext);
+    try {
+      reservationSystem.reinitialize(capScheduler.getConfiguration(),
+          mockContext);
+    } catch (YarnException e) {
+      Assert.fail(e.getMessage());
+    }
+    // Assert queue in original config
+    String planQName = testUtil.getreservationQueueName();
+    Plan plan = reservationSystem.getPlan(planQName);
+    Assert.assertNotNull(plan);
+    Assert.assertTrue(plan instanceof InMemoryPlan);
+    Assert.assertEquals(planQName, plan.getQueueName());
+    Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
+    Assert
+        .assertTrue(plan.getReservationAgent() instanceof GreedyReservationAgent);
+    Assert
+        .assertTrue(plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
+
+    // Dynamically add a plan
+    String newQ = "reservation";
+    Assert.assertNull(reservationSystem.getPlan(newQ));
+    testUtil.updateQueueConfiguration(conf, newQ);
+    try {
+      capScheduler.reinitialize(conf, mockContext);
+    } catch (IOException e) {
+      Assert.fail(e.getMessage());
+    }
+    try {
+      reservationSystem.reinitialize(conf, mockContext);
+    } catch (YarnException e) {
+      Assert.fail(e.getMessage());
+    }
+    Plan newPlan = reservationSystem.getPlan(newQ);
+    Assert.assertNotNull(newPlan);
+    Assert.assertTrue(newPlan instanceof InMemoryPlan);
+    Assert.assertEquals(newQ, newPlan.getQueueName());
+    Assert.assertEquals(1024, newPlan.getTotalCapacity().getMemory());
+    Assert
+        .assertTrue(newPlan.getReservationAgent() instanceof GreedyReservationAgent);
+    Assert
+        .assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e53fdea3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationInputValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationInputValidator.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationInputValidator.java
new file mode 100644
index 0000000..f5917bb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationInputValidator.java
@@ -0,0 +1,560 @@
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.text.MessageFormat;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationDeleteRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateRequestPBImpl;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestReservationInputValidator {
+
+  private static final Log LOG = LogFactory
+      .getLog(TestReservationInputValidator.class);
+
+  private static final String PLAN_NAME = "test-reservation";
+
+  private Clock clock;
+  private Map<String, Plan> plans = new HashMap<String, Plan>(1);
+  private ReservationSystem rSystem;
+  private Plan plan;
+
+  private ReservationInputValidator rrValidator;
+
+  @Before
+  public void setUp() {
+    clock = mock(Clock.class);
+    plan = mock(Plan.class);
+    rSystem = mock(ReservationSystem.class);
+    plans.put(PLAN_NAME, plan);
+    rrValidator = new ReservationInputValidator(clock);
+    when(clock.getTime()).thenReturn(1L);
+    ResourceCalculator rCalc = new DefaultResourceCalculator();
+    Resource resource = Resource.newInstance(10240, 10);
+    when(plan.getResourceCalculator()).thenReturn(rCalc);
+    when(plan.getTotalCapacity()).thenReturn(resource);
+    when(rSystem.getQueueForReservation(any(ReservationId.class))).thenReturn(
+        PLAN_NAME);
+    when(rSystem.getPlan(PLAN_NAME)).thenReturn(plan);
+  }
+
+  @After
+  public void tearDown() {
+    rrValidator = null;
+    clock = null;
+    plan = null;
+  }
+
+  @Test
+  public void testSubmitReservationNormal() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(1, 1, 1, 5, 3);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+    } catch (YarnException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(plan);
+  }
+
+  @Test
+  public void testSubmitReservationDoesnotExist() {
+    ReservationSubmissionRequest request =
+        new ReservationSubmissionRequestPBImpl();
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .equals("The queue to submit is not specified. Please try again with a valid
reservable queue."));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationInvalidPlan() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(1, 1, 1, 5, 3);
+    when(rSystem.getPlan(PLAN_NAME)).thenReturn(null);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .endsWith(" is not managed by reservation system. Please try again with a valid
reservable queue."));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationNoDefinition() {
+    ReservationSubmissionRequest request =
+        new ReservationSubmissionRequestPBImpl();
+    request.setQueue(PLAN_NAME);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .equals("Missing reservation definition. Please try again by specifying a reservation
definition."));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationInvalidDeadline() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(1, 1, 1, 0, 3);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message
+          .startsWith("The specified deadline: 0 is the past"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationInvalidRR() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(0, 0, 1, 5, 3);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message
+          .startsWith("No resources have been specified to reserve"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationEmptyRR() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(1, 0, 1, 5, 3);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message
+          .startsWith("No resources have been specified to reserve"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationInvalidDuration() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(1, 1, 1, 3, 4);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message.startsWith("The time difference"));
+      Assert
+          .assertTrue(message
+              .contains("must  be greater or equal to the minimum resource duration"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testSubmitReservationExceedsGangSize() {
+    ReservationSubmissionRequest request =
+        createSimpleReservationSubmissionRequest(1, 1, 1, 5, 4);
+    Resource resource = Resource.newInstance(512, 1);
+    when(plan.getTotalCapacity()).thenReturn(resource);
+    Plan plan = null;
+    try {
+      plan =
+          rrValidator.validateReservationSubmissionRequest(rSystem, request,
+              ReservationSystemTestUtil.getNewReservationId());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .startsWith("The size of the largest gang in the reservation refinition"));
+      Assert.assertTrue(message.contains("exceed the capacity available "));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationNormal() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(1, 1, 1, 5, 3);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+    } catch (YarnException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(plan);
+  }
+
+  @Test
+  public void testUpdateReservationNoID() {
+    ReservationUpdateRequest request = new ReservationUpdateRequestPBImpl();
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .startsWith("Missing reservation id. Please try again by specifying a reservation
id."));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationDoesnotExist() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(1, 1, 1, 5, 4);
+    ReservationId rId = request.getReservationId();
+    when(rSystem.getQueueForReservation(rId)).thenReturn(null);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message.equals(MessageFormat
+              .format(
+                  "The specified reservation with ID: {0} is unknown. Please try again with
a valid reservation.",
+                  rId)));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationInvalidPlan() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(1, 1, 1, 5, 4);
+    when(rSystem.getPlan(PLAN_NAME)).thenReturn(null);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .endsWith(" is not associated with any valid plan. Please try again with a
valid reservation."));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationNoDefinition() {
+    ReservationUpdateRequest request = new ReservationUpdateRequestPBImpl();
+    request.setReservationId(ReservationSystemTestUtil.getNewReservationId());
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .startsWith("Missing reservation definition. Please try again by specifying
a reservation definition."));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationInvalidDeadline() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(1, 1, 1, 0, 3);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message
+          .startsWith("The specified deadline: 0 is the past"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationInvalidRR() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(0, 0, 1, 5, 3);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message
+          .startsWith("No resources have been specified to reserve"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationEmptyRR() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(1, 0, 1, 5, 3);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert.assertTrue(message
+          .startsWith("No resources have been specified to reserve"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationInvalidDuration() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(1, 1, 1, 3, 4);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .contains("must  be greater or equal to the minimum resource duration"));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testUpdateReservationExceedsGangSize() {
+    ReservationUpdateRequest request =
+        createSimpleReservationUpdateRequest(1, 1, 1, 5, 4);
+    Resource resource = Resource.newInstance(512, 1);
+    when(plan.getTotalCapacity()).thenReturn(resource);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationUpdateRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .startsWith("The size of the largest gang in the reservation refinition"));
+      Assert.assertTrue(message.contains("exceed the capacity available "));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testDeleteReservationNormal() {
+    ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    request.setReservationId(reservationID);
+    ReservationAllocation reservation = mock(ReservationAllocation.class);
+    when(plan.getReservationById(reservationID)).thenReturn(reservation);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
+    } catch (YarnException e) {
+      Assert.fail(e.getMessage());
+    }
+    Assert.assertNotNull(plan);
+  }
+
+  @Test
+  public void testDeleteReservationNoID() {
+    ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .startsWith("Missing reservation id. Please try again by specifying a reservation
id."));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testDeleteReservationDoesnotExist() {
+    ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
+    ReservationId rId = ReservationSystemTestUtil.getNewReservationId();
+    request.setReservationId(rId);
+    when(rSystem.getQueueForReservation(rId)).thenReturn(null);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message.equals(MessageFormat
+              .format(
+                  "The specified reservation with ID: {0} is unknown. Please try again with
a valid reservation.",
+                  rId)));
+      LOG.info(message);
+    }
+  }
+
+  @Test
+  public void testDeleteReservationInvalidPlan() {
+    ReservationDeleteRequest request = new ReservationDeleteRequestPBImpl();
+    ReservationId reservationID =
+        ReservationSystemTestUtil.getNewReservationId();
+    request.setReservationId(reservationID);
+    when(rSystem.getPlan(PLAN_NAME)).thenReturn(null);
+    Plan plan = null;
+    try {
+      plan = rrValidator.validateReservationDeleteRequest(rSystem, request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertNull(plan);
+      String message = e.getMessage();
+      Assert
+          .assertTrue(message
+              .endsWith(" is not associated with any valid plan. Please try again with a
valid reservation."));
+      LOG.info(message);
+    }
+  }
+
+  private ReservationSubmissionRequest createSimpleReservationSubmissionRequest(
+      int numRequests, int numContainers, long arrival, long deadline,
+      long duration) {
+    // create a request with a single atomic ask
+    ReservationSubmissionRequest request =
+        new ReservationSubmissionRequestPBImpl();
+    ReservationDefinition rDef = new ReservationDefinitionPBImpl();
+    rDef.setArrival(arrival);
+    rDef.setDeadline(deadline);
+    if (numRequests > 0) {
+      ReservationRequests reqs = new ReservationRequestsPBImpl();
+      rDef.setReservationRequests(reqs);
+      if (numContainers > 0) {
+        ReservationRequest r =
+            ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+                numContainers, 1, duration);
+
+        reqs.setReservationResources(Collections.singletonList(r));
+        reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+      }
+    }
+    request.setQueue(PLAN_NAME);
+    request.setReservationDefinition(rDef);
+    return request;
+  }
+
+  private ReservationUpdateRequest createSimpleReservationUpdateRequest(
+      int numRequests, int numContainers, long arrival, long deadline,
+      long duration) {
+    // create a request with a single atomic ask
+    ReservationUpdateRequest request = new ReservationUpdateRequestPBImpl();
+    ReservationDefinition rDef = new ReservationDefinitionPBImpl();
+    rDef.setArrival(arrival);
+    rDef.setDeadline(deadline);
+    if (numRequests > 0) {
+      ReservationRequests reqs = new ReservationRequestsPBImpl();
+      rDef.setReservationRequests(reqs);
+      if (numContainers > 0) {
+        ReservationRequest r =
+            ReservationRequest.newInstance(Resource.newInstance(1024, 1),
+                numContainers, 1, duration);
+
+        reqs.setReservationResources(Collections.singletonList(r));
+        reqs.setInterpreter(ReservationRequestInterpreter.R_ALL);
+      }
+    }
+    request.setReservationDefinition(rDef);
+    request.setReservationId(ReservationSystemTestUtil.getNewReservationId());
+    return request;
+  }
+
+}


Mime
View raw message