hadoop-yarn-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bikas Saha (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (YARN-569) CapacityScheduler: support for preemption (using a capacity monitor)
Date Tue, 11 Jun 2013 03:25:21 GMT

    [ https://issues.apache.org/jira/browse/YARN-569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13680176#comment-13680176
] 

Bikas Saha commented on YARN-569:
---------------------------------

Sorry for the delayed response.

This doesnt seem to affect the fair scheduler or does it? If not, then it can be misleading
for users.
{code}
+  public static final String RM_SCHEDULER_ENABLE_PREEMPTION =
{code}

Missing default?
{code}
+  /** List of ScheduleEditPolicy classes affecting scheduler preemption. */
+  public static final String RM_SCHEDULER_PREEMPTION_POLICIES =
+    RM_PREFIX + "scheduler.preemption.policies";
{code}

Why cast when one has generic T?
{code}
+    public RMContainerPreemptEventDispatcher(ResourceScheduler scheduler) {
+      this.scheduler = (T) scheduler;
+    }
{code}

How do we envisage multiple policies working together without stepping on each other? Better
off limiting to 1?
{code}
+        for (ScheduleEditPolicy policy : policies) {
+          LOG.info("LOADING ScheduleEditPolicy:" + policy.toString());
+          policy.init(conf, this.rmContext.getDispatcher().getEventHandler(),
+              (PreemptableResourceScheduler) scheduler);
+          // preemption service, periodically check whether we need to
+          // preempt to guarantee capacity constraints
+          ScheduleMonitor mon = new ScheduleMonitor(policy);
+          addService(mon);
+
+        }
{code}

Might be a personal choice but ScheduleMonitor or ScheduleEditPolicy would sound better if
they used Scheduling instead of Schedule.

Why would we want to get this from the policy (which seems natural) as well as be able to
set it. If it needs to be configurable then it can be done via the policy config right?
{code}
+  protected void setMonitorInterval(int monitorInterval) {
+    this.monitorInterval = monitorInterval;
+  }
{code}

Having multiple threads named "Preemption Checker" will probably not help debugging.

Not joining the thread to make sure its cleaned up?
{code}
+  public void stop() {
+    stopped = true;
+    if (checkerThread != null) {
+      checkerThread.interrupt();
+    }
{code}

Nothing else other than this seems to be synchronized. Then why this?
{code}
+  private class PreepmtionChecker implements Runnable {
+    @Override
+    public void run() {+      while (!stopped && !Thread.currentThread().isInterrupted())
{
+        synchronized (ScheduleMonitor.this) {
{code}

Couldnt quite grok this. What is delta? What is 0.5? A percentage? Whats the math behind the
calculation? Should it be "even absent preemption" instead of "even absent natural termination"?
Is this applied before or after TOTAL_PREEMPTION_PER_ROUND?
{code}
+  /**
+   * Given a computed preemption target, account for containers naturally
+   * expiring and preempt only this percentage of the delta. This determines
+   * the rate of geometric convergence into the deadzone ({@link
+   * #MAX_IGNORED_OVER_CAPACITY}). For example, a termination factor of 0.5
+   * will reclaim almost 95% of resources within 5 * {@link
+   * #WAIT_TIME_BEFORE_KILL}, even absent natural termination. */
+  public static final String NATURAL_TERMINATION_FACTOR =
{code}

In which config file do these above configurations go when defined by the admin? Shouldnt
they be defined in the config defaults of that file? e.g. capacity-scheduler.xml? If they
get it from the scheduler config then we probably shouldnt pass it a configuration object
during init.

RMContainer already has the ApplicationAttemptId inside it. No need for extra args.
{code}
+  void preemptContainer(ApplicationAttemptId aid, RMContainer container);
{code}

Why no lock here when the other new methods have a lock? Do we not care that the app remains
in applications during the duration of the operations?
{code}
+  @Override
+  @Lock(Lock.NoLock.class)
+  public void preemptContainer(ApplicationAttemptId aid, RMContainer cont) {
+    if(LOG.isDebugEnabled()){
+      LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() + 
+          " container: " + cont.toString());
+    }
+    FiCaSchedulerApp app = applications.get(aid);
+    if (app != null) {
+      app.addPreemptContainer(cont.getContainerId());
+    }
+  }
{code}

UnmodifiableSet?
{code}
+  // need to access the list of apps from the preemption monitor
+  public Set<FiCaSchedulerApp> getApplications() {
+    return activeApplications;
+  }
{code}

containersToPreempt?
{code}
+  private final Set<ContainerId> containerToPreempt =
{code}

There is one critical difference between old and new behavior. The new code will not send
the finish event to the container if its not part of the liveContainers. This probably is
wrong. Secondly, the parent/queue metrics etc are not updated also. I am not sure if this
book-keeping is actually designed to be in sync with liveContainers - which is what the new
code enforces it to be. Same comment for the hierarchical callers of this method who now use
the new boolean return value of this method to do similar book-keeping.
{code}
-  synchronized public void containerCompleted(RMContainer rmContainer,
+  synchronized public boolean containerCompleted(RMContainer rmContainer,
{code}

This checks is principally to handle completed containers right? Any chance the preemption
is sent to a container that is still waiting to be added to liveContainers?
{code}
+  public synchronized void addPreemptContainer(ContainerId cont){
+    // ignore already completed containers
+    if (liveContainers.containsKey(cont)) {
+      containerToPreempt.add(cont);
+    }
+  }
{code}

We should probably create and use an invalid priority value instead of using the highest priority
value.
{code}
+    ResourceRequest rr = ResourceRequest.newInstance(
+        Priority.newInstance(0), ResourceRequest.ANY,
+        minimumAllocation, numCont);
{code}

FicaSchedulerNode.unreserveResource(). Checks have been added for the reserved container but
will the code reach that point if there was no reservation actually left on that node? In
the same vein, can it happen that the node has a new reservation that was made out of band
of the preemption logic cycle. Hence, the reserved container on the node would exist but could
be from a different application. 

Havent looked deeply at the logic of ProportionalCapacityPreemptionPolicy. The overall flow
looks ok and the code can be turned off /patched separately. Will get to it soon.
                
> CapacityScheduler: support for preemption (using a capacity monitor)
> --------------------------------------------------------------------
>
>                 Key: YARN-569
>                 URL: https://issues.apache.org/jira/browse/YARN-569
>             Project: Hadoop YARN
>          Issue Type: Sub-task
>          Components: capacityscheduler
>            Reporter: Carlo Curino
>            Assignee: Carlo Curino
>         Attachments: 3queues.pdf, CapScheduler_with_preemption.pdf, preemption.2.patch,
YARN-569.1.patch, YARN-569.2.patch, YARN-569.3.patch, YARN-569.4.patch, YARN-569.patch, YARN-569.patch
>
>
> There is a tension between the fast-pace reactive role of the CapacityScheduler, which
needs to respond quickly to 
> applications resource requests, and node updates, and the more introspective, time-based
considerations 
> needed to observe and correct for capacity balance. To this purpose we opted instead
of hacking the delicate
> mechanisms of the CapacityScheduler directly to add support for preemption by means of
a "Capacity Monitor",
> which can be run optionally as a separate service (much like the NMLivelinessMonitor).
> The capacity monitor (similarly to equivalent functionalities in the fairness scheduler)
operates running on intervals 
> (e.g., every 3 seconds), observe the state of the assignment of resources to queues from
the capacity scheduler, 
> performs off-line computation to determine if preemption is needed, and how best to "edit"
the current schedule to 
> improve capacity, and generates events that produce four possible actions:
> # Container de-reservations
> # Resource-based preemptions
> # Container-based preemptions
> # Container killing
> The actions listed above are progressively more costly, and it is up to the policy to
use them as desired to achieve the rebalancing goals. 
> Note that due to the "lag" in the effect of these actions the policy should operate at
the macroscopic level (e.g., preempt tens of containers
> from a queue) and not trying to tightly and consistently micromanage container allocations.

> ------------- Preemption policy  (ProportionalCapacityPreemptionPolicy): -------------

> Preemption policies are by design pluggable, in the following we present an initial policy
(ProportionalCapacityPreemptionPolicy) we have been experimenting with.  The ProportionalCapacityPreemptionPolicy
behaves as follows:
> # it gathers from the scheduler the state of the queues, in particular, their current
capacity, guaranteed capacity and pending requests (*)
> # if there are pending requests from queues that are under capacity it computes a new
ideal balanced state (**)
> # it computes the set of preemptions needed to repair the current schedule and achieve
capacity balance (accounting for natural completion rates, and 
> respecting bounds on the amount of preemption we allow for each round)
> # it selects which applications to preempt from each over-capacity queue (the last one
in the FIFO order)
> # it remove reservations from the most recently assigned app until the amount of resource
to reclaim is obtained, or until no more reservations exits
> # (if not enough) it issues preemptions for containers from the same applications (reverse
chronological order, last assigned container first) again until necessary or until no containers
except the AM container are left,
> # (if not enough) it moves onto unreserve and preempt from the next application. 
> # containers that have been asked to preempt are tracked across executions. If a containers
is among the one to be preempted for more than a certain time, the container is moved in a
the list of containers to be forcibly killed. 
> Notes:
> (*) at the moment, in order to avoid double-counting of the requests, we only look at
the "ANY" part of pending resource requests, which means we might not preempt on behalf of
AMs that ask only for specific locations but not any. 
> (**) The ideal balance state is one in which each queue has at least its guaranteed capacity,
and the spare capacity is distributed among queues (that wants some) as a weighted fair share.
Where the weighting is based on the guaranteed capacity of a queue, and the function runs
to a fix point.  
> Tunables of the ProportionalCapacityPreemptionPolicy:
> # 	observe-only mode (i.e., log the actions it would take, but behave as read-only)
> # how frequently to run the policy
> # how long to wait between preemption and kill of a container
> # which fraction of the containers I would like to obtain should I preempt (has to do
with the natural rate at which containers are returned)
> # deadzone size, i.e., what % of over-capacity should I ignore (if we are off perfect
balance by some small % we ignore it)
> # overall amount of preemption we can afford for each run of the policy (in terms of
total cluster capacity)
> In our current experiments this set of tunables seem to be a good start to shape the
preemption action properly. More sophisticated preemption policies could take into account
different type of applications running, job priorities, cost of preemption, integral of capacity
imbalance. This is very much a control-theory kind of problem, and some of the lessons on
designing and tuning controllers are likely to apply.
> Generality:
> The monitor-based scheduler edit, and the preemption mechanisms we introduced here are
designed to be more general than enforcing capacity/fairness, in fact, we are considering
other monitors that leverage the same idea of "schedule edits" to target different global
properties (e.g., allocate enough resources to guarantee deadlines for important jobs, or
data-locality optimizations, IO-balancing among nodes, etc...).
> Note that by default the preemption policy we describe is disabled in the patch.
> Depends on YARN-45 and YARN-567, is related to YARN-568

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message