hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vrush...@apache.org
Subject [04/54] [abbrv] hadoop git commit: YARN-6608. Backport all SLS improvements from trunk to branch-2. (Carlo Curino via wangda)
Date Fri, 20 Oct 2017 18:56:00 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java
index 3b539fa..420a1c9 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java
@@ -18,16 +18,17 @@
 
 package org.apache.hadoop.yarn.sls.scheduler;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
-    .FSAppAttempt;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
+import org.apache.hadoop.yarn.sls.SLSRunner;
 
 import com.codahale.metrics.Gauge;
-import org.apache.hadoop.yarn.sls.SLSRunner;
 
 @Private
 @Unstable
@@ -37,114 +38,131 @@ public class FairSchedulerMetrics extends SchedulerMetrics {
   private int totalVCores = Integer.MAX_VALUE;
   private boolean maxReset = false;
 
+  @VisibleForTesting
+  public enum Metric {
+    DEMAND("demand"),
+    USAGE("usage"),
+    MINSHARE("minshare"),
+    MAXSHARE("maxshare"),
+    FAIRSHARE("fairshare");
+
+    private String value;
+
+    Metric(String value) {
+      this.value = value;
+    }
+
+    @VisibleForTesting
+    public String getValue() {
+      return value;
+    }
+  }
+
   public FairSchedulerMetrics() {
     super();
-    appTrackedMetrics.add("demand.memory");
-    appTrackedMetrics.add("demand.vcores");
-    appTrackedMetrics.add("usage.memory");
-    appTrackedMetrics.add("usage.vcores");
-    appTrackedMetrics.add("minshare.memory");
-    appTrackedMetrics.add("minshare.vcores");
-    appTrackedMetrics.add("maxshare.memory");
-    appTrackedMetrics.add("maxshare.vcores");
-    appTrackedMetrics.add("fairshare.memory");
-    appTrackedMetrics.add("fairshare.vcores");
-    queueTrackedMetrics.add("demand.memory");
-    queueTrackedMetrics.add("demand.vcores");
-    queueTrackedMetrics.add("usage.memory");
-    queueTrackedMetrics.add("usage.vcores");
-    queueTrackedMetrics.add("minshare.memory");
-    queueTrackedMetrics.add("minshare.vcores");
-    queueTrackedMetrics.add("maxshare.memory");
-    queueTrackedMetrics.add("maxshare.vcores");
-    queueTrackedMetrics.add("fairshare.memory");
-    queueTrackedMetrics.add("fairshare.vcores");
+
+    for (Metric metric: Metric.values()) {
+      appTrackedMetrics.add(metric.value + ".memory");
+      appTrackedMetrics.add(metric.value + ".vcores");
+      queueTrackedMetrics.add(metric.value + ".memory");
+      queueTrackedMetrics.add(metric.value + ".vcores");
+    }
   }
-  
-  @Override
-  public void trackApp(ApplicationAttemptId appAttemptId, String oldAppId) {
-    super.trackApp(appAttemptId, oldAppId);
-    FairScheduler fair = (FairScheduler) scheduler;
-    final FSAppAttempt app = fair.getSchedulerApp(appAttemptId);
-    metrics.register("variable.app." + oldAppId + ".demand.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return app.getDemand().getMemorySize();
-        }
-      }
-    );
-    metrics.register("variable.app." + oldAppId + ".demand.vcores",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          return app.getDemand().getVirtualCores();
-        }
-      }
-    );
-    metrics.register("variable.app." + oldAppId + ".usage.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return app.getResourceUsage().getMemorySize();
-        }
-      }
-    );
-    metrics.register("variable.app." + oldAppId + ".usage.vcores",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          return app.getResourceUsage().getVirtualCores();
-        }
-      }
-    );
-    metrics.register("variable.app." + oldAppId + ".minshare.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return app.getMinShare().getMemorySize();
-        }
-      }
-    );
-    metrics.register("variable.app." + oldAppId + ".minshare.vcores",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return app.getMinShare().getMemorySize();
-        }
-      }
-    );
-    metrics.register("variable.app." + oldAppId + ".maxshare.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return Math.min(app.getMaxShare().getMemorySize(), totalMemoryMB);
+
+  private long getMemorySize(Schedulable schedulable, Metric metric) {
+    if (schedulable != null) {
+      switch (metric) {
+      case DEMAND:
+        return schedulable.getDemand().getMemorySize();
+      case USAGE:
+        return schedulable.getResourceUsage().getMemorySize();
+      case MINSHARE:
+        return schedulable.getMinShare().getMemorySize();
+      case MAXSHARE:
+        return schedulable.getMaxShare().getMemorySize();
+      case FAIRSHARE:
+        return schedulable.getFairShare().getMemorySize();
+      default:
+        return 0L;
+      }
+    }
+
+    return 0L;
+  }
+
+  private int getVirtualCores(Schedulable schedulable, Metric metric) {
+    if (schedulable != null) {
+      switch (metric) {
+      case DEMAND:
+        return schedulable.getDemand().getVirtualCores();
+      case USAGE:
+        return schedulable.getResourceUsage().getVirtualCores();
+      case MINSHARE:
+        return schedulable.getMinShare().getVirtualCores();
+      case MAXSHARE:
+        return schedulable.getMaxShare().getVirtualCores();
+      case FAIRSHARE:
+        return schedulable.getFairShare().getVirtualCores();
+      default:
+        return 0;
+      }
+    }
+
+    return 0;
+  }
+
+  private void registerAppMetrics(final ApplicationId appId, String oldAppId,
+      final Metric metric) {
+    metrics.register(
+        "variable.app." + oldAppId + "." + metric.value + ".memory",
+        new Gauge<Long>() {
+          @Override
+          public Long getValue() {
+            return getMemorySize((FSAppAttempt)getSchedulerAppAttempt(appId),
+                metric);
+          }
         }
-      }
     );
-    metrics.register("variable.app." + oldAppId + ".maxshare.vcores",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          return Math.min(app.getMaxShare().getVirtualCores(), totalVCores);
+
+    metrics.register(
+        "variable.app." + oldAppId + "." + metric.value + ".vcores",
+        new Gauge<Integer>() {
+          @Override
+          public Integer getValue() {
+            return getVirtualCores((FSAppAttempt)getSchedulerAppAttempt(appId),
+                metric);
+          }
         }
-      }
     );
-    metrics.register("variable.app." + oldAppId + ".fairshare.memory",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          return app.getFairShare().getVirtualCores();
+  }
+
+  @Override
+  public void trackApp(ApplicationId appId, String oldAppId) {
+    super.trackApp(appId, oldAppId);
+
+    for (Metric metric: Metric.values()) {
+      registerAppMetrics(appId, oldAppId, metric);
+    }
+  }
+
+  private void registerQueueMetrics(final FSQueue queue, final Metric metric) {
+    metrics.register(
+        "variable.queue." + queue.getName() + "." + metric.value + ".memory",
+        new Gauge<Long>() {
+          @Override
+          public Long getValue() {
+            return getMemorySize(queue, metric);
+          }
         }
-      }
     );
-    metrics.register("variable.app." + oldAppId + ".fairshare.vcores",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          return app.getFairShare().getVirtualCores();
+    metrics.register(
+        "variable.queue." + queue.getName() + "." + metric.value + ".vcores",
+        new Gauge<Integer>() {
+          @Override
+          public Integer getValue() {
+            return getVirtualCores(queue, metric);
+          }
         }
-      }
     );
   }
 
@@ -153,68 +171,25 @@ public class FairSchedulerMetrics extends SchedulerMetrics {
     trackedQueues.add(queueName);
     FairScheduler fair = (FairScheduler) scheduler;
     final FSQueue queue = fair.getQueueManager().getQueue(queueName);
-    metrics.register("variable.queue." + queueName + ".demand.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return queue.getDemand().getMemorySize();
-        }
-      }
-    );
-    metrics.register("variable.queue." + queueName + ".demand.vcores",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          return queue.getDemand().getVirtualCores();
-        }
-      }
-    );
-    metrics.register("variable.queue." + queueName + ".usage.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return queue.getResourceUsage().getMemorySize();
-        }
-      }
-    );
-    metrics.register("variable.queue." + queueName + ".usage.vcores",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          return queue.getResourceUsage().getVirtualCores();
-        }
-      }
-    );
-    metrics.register("variable.queue." + queueName + ".minshare.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return queue.getMinShare().getMemorySize();
-        }
-      }
-    );
-    metrics.register("variable.queue." + queueName + ".minshare.vcores",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          return queue.getMinShare().getVirtualCores();
-        }
-      }
-    );
+    registerQueueMetrics(queue, Metric.DEMAND);
+    registerQueueMetrics(queue, Metric.USAGE);
+    registerQueueMetrics(queue, Metric.MINSHARE);
+    registerQueueMetrics(queue, Metric.FAIRSHARE);
+
     metrics.register("variable.queue." + queueName + ".maxshare.memory",
       new Gauge<Long>() {
         @Override
         public Long getValue() {
-          if (! maxReset &&
-                  SLSRunner.simulateInfoMap.containsKey("Number of nodes") &&
-                  SLSRunner.simulateInfoMap.containsKey("Node memory (MB)") &&
-                  SLSRunner.simulateInfoMap.containsKey("Node VCores")) {
-            int numNMs = Integer.parseInt(
-                  SLSRunner.simulateInfoMap.get("Number of nodes").toString());
-            int numMemoryMB = Integer.parseInt(
-                  SLSRunner.simulateInfoMap.get("Node memory (MB)").toString());
-            int numVCores = Integer.parseInt(
-                  SLSRunner.simulateInfoMap.get("Node VCores").toString());
+          if (! maxReset
+              && SLSRunner.getSimulateInfoMap().containsKey("Number of nodes")
+              && SLSRunner.getSimulateInfoMap().containsKey("Node memory (MB)")
+              && SLSRunner.getSimulateInfoMap().containsKey("Node VCores")) {
+            int numNMs = Integer.parseInt(SLSRunner.getSimulateInfoMap()
+                .get("Number of nodes").toString());
+            int numMemoryMB = Integer.parseInt(SLSRunner.getSimulateInfoMap()
+                .get("Node memory (MB)").toString());
+            int numVCores = Integer.parseInt(SLSRunner.getSimulateInfoMap()
+                .get("Node VCores").toString());
 
             totalMemoryMB = numNMs * numMemoryMB;
             totalVCores = numNMs * numVCores;
@@ -233,36 +208,17 @@ public class FairSchedulerMetrics extends SchedulerMetrics {
         }
       }
     );
-    metrics.register("variable.queue." + queueName + ".fairshare.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return queue.getFairShare().getMemorySize();
-        }
-      }
-    );
-    metrics.register("variable.queue." + queueName + ".fairshare.vcores",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          return queue.getFairShare().getVirtualCores();
-        }
-      }
-    );
   }
 
   @Override
   public void untrackQueue(String queueName) {
     trackedQueues.remove(queueName);
-    metrics.remove("variable.queue." + queueName + ".demand.memory");
-    metrics.remove("variable.queue." + queueName + ".demand.vcores");
-    metrics.remove("variable.queue." + queueName + ".usage.memory");
-    metrics.remove("variable.queue." + queueName + ".usage.vcores");
-    metrics.remove("variable.queue." + queueName + ".minshare.memory");
-    metrics.remove("variable.queue." + queueName + ".minshare.vcores");
-    metrics.remove("variable.queue." + queueName + ".maxshare.memory");
-    metrics.remove("variable.queue." + queueName + ".maxshare.vcores");
-    metrics.remove("variable.queue." + queueName + ".fairshare.memory");
-    metrics.remove("variable.queue." + queueName + ".fairshare.vcores");
+
+    for (Metric metric: Metric.values()) {
+      metrics.remove("variable.queue." + queueName + "." +
+          metric.value + ".memory");
+      metrics.remove("variable.queue." + queueName + "." +
+          metric.value + ".vcores");
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
deleted file mode 100644
index 1ba6acc..0000000
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
+++ /dev/null
@@ -1,973 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.sls.scheduler;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.QueueACL;
-import org.apache.hadoop.yarn.api.records.QueueInfo;
-import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-import org.apache.hadoop.yarn.sls.SLSRunner;
-import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
-import org.apache.hadoop.yarn.sls.web.SLSWebApp;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.CsvReporter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.SlidingWindowReservoir;
-import com.codahale.metrics.Timer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Private
-@Unstable
-public class ResourceSchedulerWrapper
-    extends AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>
-    implements SchedulerWrapper,ResourceScheduler,Configurable {
-  private static final String EOL = System.getProperty("line.separator");
-  private static final int SAMPLING_SIZE = 60;
-  private ScheduledExecutorService pool;
-  // counters for scheduler allocate/handle operations
-  private Counter schedulerAllocateCounter;
-  private Counter schedulerHandleCounter;
-  private Map<SchedulerEventType, Counter> schedulerHandleCounterMap;
-  // Timers for scheduler allocate/handle operations
-  private Timer schedulerAllocateTimer;
-  private Timer schedulerHandleTimer;
-  private Map<SchedulerEventType, Timer> schedulerHandleTimerMap;
-  private List<Histogram> schedulerHistogramList;
-  private Map<Histogram, Timer> histogramTimerMap;
-  private Lock samplerLock;
-  private Lock queueLock;
-
-  private Configuration conf;
-  private ResourceScheduler scheduler;
-  private Map<ApplicationId, String> appQueueMap =
-          new ConcurrentHashMap<ApplicationId, String>();
-  private BufferedWriter jobRuntimeLogBW;
-
-  // Priority of the ResourceSchedulerWrapper shutdown hook.
-  public static final int SHUTDOWN_HOOK_PRIORITY = 30;
-
-  // web app
-  private SLSWebApp web;
-
-  private Map<ContainerId, Resource> preemptionContainerMap =
-          new ConcurrentHashMap<ContainerId, Resource>();
-
-  // metrics
-  private MetricRegistry metrics;
-  private SchedulerMetrics schedulerMetrics;
-  private boolean metricsON;
-  private String metricsOutputDir;
-  private BufferedWriter metricsLogBW;
-  private boolean running = false;
-  private static Map<Class, Class> defaultSchedulerMetricsMap =
-          new HashMap<Class, Class>();
-  static {
-    defaultSchedulerMetricsMap.put(FairScheduler.class,
-            FairSchedulerMetrics.class);
-    defaultSchedulerMetricsMap.put(FifoScheduler.class,
-            FifoSchedulerMetrics.class);
-    defaultSchedulerMetricsMap.put(CapacityScheduler.class,
-            CapacitySchedulerMetrics.class);
-  }
-  // must set by outside
-  private Set<String> queueSet;
-  private Set<String> trackedAppSet;
-
-  public final Logger LOG =
-      LoggerFactory.getLogger(ResourceSchedulerWrapper.class);
-
-  public ResourceSchedulerWrapper() {
-    super(ResourceSchedulerWrapper.class.getName());
-    samplerLock = new ReentrantLock();
-    queueLock = new ReentrantLock();
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-    // set scheduler
-    Class<? extends ResourceScheduler> klass = conf.getClass(
-        SLSConfiguration.RM_SCHEDULER, null, ResourceScheduler.class);
-
-    scheduler = ReflectionUtils.newInstance(klass, conf);
-    // start metrics
-    metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true);
-    if (metricsON) {
-      try {
-        initMetrics();
-      } catch (Exception e) {
-        e.printStackTrace();
-      }
-    }
-
-    ShutdownHookManager.get().addShutdownHook(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          if (metricsLogBW != null)  {
-            metricsLogBW.write("]");
-            metricsLogBW.close();
-          }
-          if (web != null) {
-            web.stop();
-          }
-          tearDown();
-        } catch (Exception e) {
-          e.printStackTrace();
-        }
-      }
-    }, SHUTDOWN_HOOK_PRIORITY);
-  }
-
-  @Override
-  public Allocation allocate(ApplicationAttemptId attemptId,
-      List<ResourceRequest> resourceRequests, List<ContainerId> containerIds,
-      List<String> strings, List<String> strings2,
-      ContainerUpdates updateRequests) {
-    if (metricsON) {
-      final Timer.Context context = schedulerAllocateTimer.time();
-      Allocation allocation = null;
-      try {
-        allocation = scheduler.allocate(attemptId, resourceRequests,
-                containerIds, strings, strings2, updateRequests);
-        return allocation;
-      } finally {
-        context.stop();
-        schedulerAllocateCounter.inc();
-        try {
-          updateQueueWithAllocateRequest(allocation, attemptId,
-                  resourceRequests, containerIds);
-        } catch (IOException e) {
-          e.printStackTrace();
-        }
-      }
-    } else {
-      return scheduler.allocate(attemptId,
-              resourceRequests, containerIds, strings, strings2, updateRequests);
-    }
-  }
-
-  @Override
-  public void handle(SchedulerEvent schedulerEvent) {
-    // metrics off
-    if (! metricsON) {
-      scheduler.handle(schedulerEvent);
-      return;
-    }
-    if(!running)    running = true;
-
-    // metrics on
-    Timer.Context handlerTimer = null;
-    Timer.Context operationTimer = null;
-
-    NodeUpdateSchedulerEventWrapper eventWrapper;
-    try {
-      //if (schedulerEvent instanceof NodeUpdateSchedulerEvent) {
-      if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE
-              && schedulerEvent instanceof NodeUpdateSchedulerEvent) {
-        eventWrapper = new NodeUpdateSchedulerEventWrapper(
-                (NodeUpdateSchedulerEvent)schedulerEvent);
-        schedulerEvent = eventWrapper;
-        updateQueueWithNodeUpdate(eventWrapper);
-      } else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
-          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
-        // check if having AM Container, update resource usage information
-        AppAttemptRemovedSchedulerEvent appRemoveEvent =
-            (AppAttemptRemovedSchedulerEvent) schedulerEvent;
-        ApplicationAttemptId appAttemptId =
-                appRemoveEvent.getApplicationAttemptID();
-        String queue = appQueueMap.get(appAttemptId.getApplicationId());
-        SchedulerAppReport app = scheduler.getSchedulerAppInfo(appAttemptId);
-        if (! app.getLiveContainers().isEmpty()) {  // have 0 or 1
-          // should have one container which is AM container
-          RMContainer rmc = app.getLiveContainers().iterator().next();
-          updateQueueMetrics(queue,
-                  rmc.getContainer().getResource().getMemorySize(),
-                  rmc.getContainer().getResource().getVirtualCores());
-        }
-      }
-
-      handlerTimer = schedulerHandleTimer.time();
-      operationTimer = schedulerHandleTimerMap
-              .get(schedulerEvent.getType()).time();
-
-      scheduler.handle(schedulerEvent);
-    } finally {
-      if (handlerTimer != null)     handlerTimer.stop();
-      if (operationTimer != null)   operationTimer.stop();
-      schedulerHandleCounter.inc();
-      schedulerHandleCounterMap.get(schedulerEvent.getType()).inc();
-
-      if (schedulerEvent.getType() == SchedulerEventType.APP_REMOVED
-          && schedulerEvent instanceof AppRemovedSchedulerEvent) {
-        SLSRunner.decreaseRemainingApps();
-        AppRemovedSchedulerEvent appRemoveEvent =
-                (AppRemovedSchedulerEvent) schedulerEvent;
-        appQueueMap.remove(appRemoveEvent.getApplicationID());
-      } else if (schedulerEvent.getType() == SchedulerEventType.APP_ADDED
-          && schedulerEvent instanceof AppAddedSchedulerEvent) {
-        AppAddedSchedulerEvent appAddEvent =
-                (AppAddedSchedulerEvent) schedulerEvent;
-        String queueName = appAddEvent.getQueue();
-        appQueueMap.put(appAddEvent.getApplicationId(), queueName);
-      }
-    }
-  }
-
-  private void updateQueueWithNodeUpdate(
-          NodeUpdateSchedulerEventWrapper eventWrapper) {
-    RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode();
-    List<UpdatedContainerInfo> containerList = node.getContainerUpdates();
-    for (UpdatedContainerInfo info : containerList) {
-      for (ContainerStatus status : info.getCompletedContainers()) {
-        ContainerId containerId = status.getContainerId();
-        SchedulerAppReport app = scheduler.getSchedulerAppInfo(
-                containerId.getApplicationAttemptId());
-
-        if (app == null) {
-          // this happens for the AM container
-          // The app have already removed when the NM sends the release
-          // information.
-          continue;
-        }
-
-        String queue =
-            appQueueMap.get(containerId.getApplicationAttemptId()
-              .getApplicationId());
-        int releasedMemory = 0, releasedVCores = 0;
-        if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
-          for (RMContainer rmc : app.getLiveContainers()) {
-            if (rmc.getContainerId() == containerId) {
-              releasedMemory += rmc.getContainer().getResource().getMemorySize();
-              releasedVCores += rmc.getContainer()
-                      .getResource().getVirtualCores();
-              break;
-            }
-          }
-        } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
-          if (preemptionContainerMap.containsKey(containerId)) {
-            Resource preResource = preemptionContainerMap.get(containerId);
-            releasedMemory += preResource.getMemorySize();
-            releasedVCores += preResource.getVirtualCores();
-            preemptionContainerMap.remove(containerId);
-          }
-        }
-        // update queue counters
-        updateQueueMetrics(queue, releasedMemory, releasedVCores);
-      }
-    }
-  }
-
-  private void updateQueueWithAllocateRequest(Allocation allocation,
-                        ApplicationAttemptId attemptId,
-                        List<ResourceRequest> resourceRequests,
-                        List<ContainerId> containerIds) throws IOException {
-    // update queue information
-    Resource pendingResource = Resources.createResource(0, 0);
-    Resource allocatedResource = Resources.createResource(0, 0);
-    String queueName = appQueueMap.get(attemptId.getApplicationId());
-    // container requested
-    for (ResourceRequest request : resourceRequests) {
-      if (request.getResourceName().equals(ResourceRequest.ANY)) {
-        Resources.addTo(pendingResource,
-                Resources.multiply(request.getCapability(),
-                        request.getNumContainers()));
-      }
-    }
-    // container allocated
-    for (Container container : allocation.getContainers()) {
-      Resources.addTo(allocatedResource, container.getResource());
-      Resources.subtractFrom(pendingResource, container.getResource());
-    }
-    // container released from AM
-    SchedulerAppReport report = scheduler.getSchedulerAppInfo(attemptId);
-    for (ContainerId containerId : containerIds) {
-      Container container = null;
-      for (RMContainer c : report.getLiveContainers()) {
-        if (c.getContainerId().equals(containerId)) {
-          container = c.getContainer();
-          break;
-        }
-      }
-      if (container != null) {
-        // released allocated containers
-        Resources.subtractFrom(allocatedResource, container.getResource());
-      } else {
-        for (RMContainer c : report.getReservedContainers()) {
-          if (c.getContainerId().equals(containerId)) {
-            container = c.getContainer();
-            break;
-          }
-        }
-        if (container != null) {
-          // released reserved containers
-          Resources.subtractFrom(pendingResource, container.getResource());
-        }
-      }
-    }
-    // containers released/preemption from scheduler
-    Set<ContainerId> preemptionContainers = new HashSet<ContainerId>();
-    if (allocation.getContainerPreemptions() != null) {
-      preemptionContainers.addAll(allocation.getContainerPreemptions());
-    }
-    if (allocation.getStrictContainerPreemptions() != null) {
-      preemptionContainers.addAll(allocation.getStrictContainerPreemptions());
-    }
-    if (! preemptionContainers.isEmpty()) {
-      for (ContainerId containerId : preemptionContainers) {
-        if (! preemptionContainerMap.containsKey(containerId)) {
-          Container container = null;
-          for (RMContainer c : report.getLiveContainers()) {
-            if (c.getContainerId().equals(containerId)) {
-              container = c.getContainer();
-              break;
-            }
-          }
-          if (container != null) {
-            preemptionContainerMap.put(containerId, container.getResource());
-          }
-        }
-
-      }
-    }
-
-    // update metrics
-    SortedMap<String, Counter> counterMap = metrics.getCounters();
-    String names[] = new String[]{
-            "counter.queue." + queueName + ".pending.memory",
-            "counter.queue." + queueName + ".pending.cores",
-            "counter.queue." + queueName + ".allocated.memory",
-            "counter.queue." + queueName + ".allocated.cores"};
-    long values[] = new long[]{pendingResource.getMemorySize(),
-            pendingResource.getVirtualCores(),
-            allocatedResource.getMemorySize(), allocatedResource.getVirtualCores()};
-    for (int i = names.length - 1; i >= 0; i --) {
-      if (! counterMap.containsKey(names[i])) {
-        metrics.counter(names[i]);
-        counterMap = metrics.getCounters();
-      }
-      counterMap.get(names[i]).inc(values[i]);
-    }
-
-    queueLock.lock();
-    try {
-      if (! schedulerMetrics.isTracked(queueName)) {
-        schedulerMetrics.trackQueue(queueName);
-      }
-    } finally {
-      queueLock.unlock();
-    }
-  }
-
-  private void tearDown() throws IOException {
-    // close job runtime writer
-    if (jobRuntimeLogBW != null) {
-      jobRuntimeLogBW.close();
-    }
-    // shut pool
-    if (pool != null)  pool.shutdown();
-  }
-
-  @SuppressWarnings("unchecked")
-  private void initMetrics() throws Exception {
-    metrics = new MetricRegistry();
-    // configuration
-    metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR);
-    int metricsWebAddressPort = conf.getInt(
-            SLSConfiguration.METRICS_WEB_ADDRESS_PORT,
-            SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT);
-    // create SchedulerMetrics for current scheduler
-    String schedulerMetricsType = conf.get(scheduler.getClass().getName());
-    Class schedulerMetricsClass = schedulerMetricsType == null?
-            defaultSchedulerMetricsMap.get(scheduler.getClass()) :
-            Class.forName(schedulerMetricsType);
-    schedulerMetrics = (SchedulerMetrics)ReflectionUtils
-            .newInstance(schedulerMetricsClass, new Configuration());
-    schedulerMetrics.init(scheduler, metrics);
-
-    // register various metrics
-    registerJvmMetrics();
-    registerClusterResourceMetrics();
-    registerContainerAppNumMetrics();
-    registerSchedulerMetrics();
-
-    // .csv output
-    initMetricsCSVOutput();
-
-    // start web app to provide real-time tracking
-    web = new SLSWebApp(this, metricsWebAddressPort);
-    web.start();
-
-    // a thread to update histogram timer
-    pool = new ScheduledThreadPoolExecutor(2);
-    pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000,
-            TimeUnit.MILLISECONDS);
-
-    // a thread to output metrics for real-tiem tracking
-    pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000,
-            TimeUnit.MILLISECONDS);
-
-    // application running information
-    jobRuntimeLogBW =
-        new BufferedWriter(new OutputStreamWriter(new FileOutputStream(
-            metricsOutputDir + "/jobruntime.csv"), "UTF-8"));
-    jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," +
-            "simulate_start_time,simulate_end_time" + EOL);
-    jobRuntimeLogBW.flush();
-  }
-
-  private void registerJvmMetrics() {
-    // add JVM gauges
-    metrics.register("variable.jvm.free.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return Runtime.getRuntime().freeMemory();
-        }
-      }
-    );
-    metrics.register("variable.jvm.max.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return Runtime.getRuntime().maxMemory();
-        }
-      }
-    );
-    metrics.register("variable.jvm.total.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return Runtime.getRuntime().totalMemory();
-        }
-      }
-    );
-  }
-
-  private void registerClusterResourceMetrics() {
-    metrics.register("variable.cluster.allocated.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
-            return 0L;
-          } else {
-            return scheduler.getRootQueueMetrics().getAllocatedMB();
-          }
-        }
-      }
-    );
-    metrics.register("variable.cluster.allocated.vcores",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
-            return 0;
-          } else {
-            return scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
-          }
-        }
-      }
-    );
-    metrics.register("variable.cluster.available.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
-            return 0L;
-          } else {
-            return scheduler.getRootQueueMetrics().getAvailableMB();
-          }
-        }
-      }
-    );
-    metrics.register("variable.cluster.available.vcores",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
-            return 0;
-          } else {
-            return scheduler.getRootQueueMetrics().getAvailableVirtualCores();
-          }
-        }
-      }
-    );
-  }
-
-  private void registerContainerAppNumMetrics() {
-    metrics.register("variable.running.application",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          if (scheduler == null || scheduler.getRootQueueMetrics() == null) {
-            return 0;
-          } else {
-            return scheduler.getRootQueueMetrics().getAppsRunning();
-          }
-        }
-      }
-    );
-    metrics.register("variable.running.container",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
-            return 0;
-          } else {
-            return scheduler.getRootQueueMetrics().getAllocatedContainers();
-          }
-        }
-      }
-    );
-  }
-
-  private void registerSchedulerMetrics() {
-    samplerLock.lock();
-    try {
-      // counters for scheduler operations
-      schedulerAllocateCounter = metrics.counter(
-              "counter.scheduler.operation.allocate");
-      schedulerHandleCounter = metrics.counter(
-              "counter.scheduler.operation.handle");
-      schedulerHandleCounterMap = new HashMap<SchedulerEventType, Counter>();
-      for (SchedulerEventType e : SchedulerEventType.values()) {
-        Counter counter = metrics.counter(
-                "counter.scheduler.operation.handle." + e);
-        schedulerHandleCounterMap.put(e, counter);
-      }
-      // timers for scheduler operations
-      int timeWindowSize = conf.getInt(
-              SLSConfiguration.METRICS_TIMER_WINDOW_SIZE,
-              SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT);
-      schedulerAllocateTimer = new Timer(
-              new SlidingWindowReservoir(timeWindowSize));
-      schedulerHandleTimer = new Timer(
-              new SlidingWindowReservoir(timeWindowSize));
-      schedulerHandleTimerMap = new HashMap<SchedulerEventType, Timer>();
-      for (SchedulerEventType e : SchedulerEventType.values()) {
-        Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize));
-        schedulerHandleTimerMap.put(e, timer);
-      }
-      // histogram for scheduler operations (Samplers)
-      schedulerHistogramList = new ArrayList<Histogram>();
-      histogramTimerMap = new HashMap<Histogram, Timer>();
-      Histogram schedulerAllocateHistogram = new Histogram(
-              new SlidingWindowReservoir(SAMPLING_SIZE));
-      metrics.register("sampler.scheduler.operation.allocate.timecost",
-              schedulerAllocateHistogram);
-      schedulerHistogramList.add(schedulerAllocateHistogram);
-      histogramTimerMap.put(schedulerAllocateHistogram, schedulerAllocateTimer);
-      Histogram schedulerHandleHistogram = new Histogram(
-              new SlidingWindowReservoir(SAMPLING_SIZE));
-      metrics.register("sampler.scheduler.operation.handle.timecost",
-              schedulerHandleHistogram);
-      schedulerHistogramList.add(schedulerHandleHistogram);
-      histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer);
-      for (SchedulerEventType e : SchedulerEventType.values()) {
-        Histogram histogram = new Histogram(
-                new SlidingWindowReservoir(SAMPLING_SIZE));
-        metrics.register(
-                "sampler.scheduler.operation.handle." + e + ".timecost",
-                histogram);
-        schedulerHistogramList.add(histogram);
-        histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e));
-      }
-    } finally {
-      samplerLock.unlock();
-    }
-  }
-
-  private void initMetricsCSVOutput() {
-    int timeIntervalMS = conf.getInt(
-            SLSConfiguration.METRICS_RECORD_INTERVAL_MS,
-            SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT);
-    File dir = new File(metricsOutputDir + "/metrics");
-    if(! dir.exists()
-            && ! dir.mkdirs()) {
-      LOG.error("Cannot create directory {}", dir.getAbsoluteFile());
-    }
-    final CsvReporter reporter = CsvReporter.forRegistry(metrics)
-            .formatFor(Locale.US)
-            .convertRatesTo(TimeUnit.SECONDS)
-            .convertDurationsTo(TimeUnit.MILLISECONDS)
-            .build(new File(metricsOutputDir + "/metrics"));
-    reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS);
-  }
-
-  class HistogramsRunnable implements Runnable {
-    @Override
-    public void run() {
-      samplerLock.lock();
-      try {
-        for (Histogram histogram : schedulerHistogramList) {
-          Timer timer = histogramTimerMap.get(histogram);
-          histogram.update((int) timer.getSnapshot().getMean());
-        }
-      } finally {
-        samplerLock.unlock();
-      }
-    }
-  }
-
-  class MetricsLogRunnable implements Runnable {
-    private boolean firstLine = true;
-    public MetricsLogRunnable() {
-      try {
-        metricsLogBW =
-            new BufferedWriter(new OutputStreamWriter(new FileOutputStream(
-                metricsOutputDir + "/realtimetrack.json"), "UTF-8"));
-        metricsLogBW.write("[");
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-
-    @Override
-    public void run() {
-      if(running) {
-        // all WebApp to get real tracking json
-        String metrics = web.generateRealTimeTrackingMetrics();
-        // output
-        try {
-          if(firstLine) {
-            metricsLogBW.write(metrics + EOL);
-            firstLine = false;
-          } else {
-            metricsLogBW.write("," + metrics + EOL);
-          }
-          metricsLogBW.flush();
-        } catch (IOException e) {
-          e.printStackTrace();
-        }
-      }
-    }
-  }
-
-  // the following functions are used by AMSimulator
-  public void addAMRuntime(ApplicationId appId,
-                           long traceStartTimeMS, long traceEndTimeMS,
-                           long simulateStartTimeMS, long simulateEndTimeMS) {
-    if (metricsON) {
-      try {
-        // write job runtime information
-        StringBuilder sb = new StringBuilder();
-        sb.append(appId).append(",").append(traceStartTimeMS).append(",")
-            .append(traceEndTimeMS).append(",").append(simulateStartTimeMS)
-            .append(",").append(simulateEndTimeMS);
-        jobRuntimeLogBW.write(sb.toString() + EOL);
-        jobRuntimeLogBW.flush();
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-  }
-
-  private void updateQueueMetrics(String queue,
-                                  long releasedMemory, int releasedVCores) {
-    // update queue counters
-    SortedMap<String, Counter> counterMap = metrics.getCounters();
-    if (releasedMemory != 0) {
-      String name = "counter.queue." + queue + ".allocated.memory";
-      if (! counterMap.containsKey(name)) {
-        metrics.counter(name);
-        counterMap = metrics.getCounters();
-      }
-      counterMap.get(name).inc(-releasedMemory);
-    }
-    if (releasedVCores != 0) {
-      String name = "counter.queue." + queue + ".allocated.cores";
-      if (! counterMap.containsKey(name)) {
-        metrics.counter(name);
-        counterMap = metrics.getCounters();
-      }
-      counterMap.get(name).inc(-releasedVCores);
-    }
-  }
-
-  public void setQueueSet(Set<String> queues) {
-    this.queueSet = queues;
-  }
-
-  public Set<String> getQueueSet() {
-    return this.queueSet;
-  }
-
-  public void setTrackedAppSet(Set<String> apps) {
-    this.trackedAppSet = apps;
-  }
-
-  public Set<String> getTrackedAppSet() {
-    return this.trackedAppSet;
-  }
-
-  public MetricRegistry getMetrics() {
-    return metrics;
-  }
-
-  public SchedulerMetrics getSchedulerMetrics() {
-    return schedulerMetrics;
-  }
-
-  // API open to out classes
-  public void addTrackedApp(ApplicationAttemptId appAttemptId,
-                            String oldAppId) {
-    if (metricsON) {
-      schedulerMetrics.trackApp(appAttemptId, oldAppId);
-    }
-  }
-
-  public void removeTrackedApp(ApplicationAttemptId appAttemptId,
-                               String oldAppId) {
-    if (metricsON) {
-      schedulerMetrics.untrackApp(appAttemptId, oldAppId);
-    }
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void serviceInit(Configuration conf) throws Exception {
-    ((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>)
-        scheduler).init(conf);
-    super.serviceInit(conf);
-    initScheduler(conf);
-  }
-
-  private synchronized void initScheduler(Configuration configuration) throws
-  IOException {
-    this.applications =
-        new ConcurrentHashMap<ApplicationId,
-        SchedulerApplication<SchedulerApplicationAttempt>>();
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void serviceStart() throws Exception {
-    ((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>)
-        scheduler).start();
-    super.serviceStart();
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void serviceStop() throws Exception {
-    ((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>)
-        scheduler).stop();
-    super.serviceStop();
-  }
-
-  @Override
-  public void setRMContext(RMContext rmContext) {
-    scheduler.setRMContext(rmContext);
-  }
-
-  @Override
-  public void reinitialize(Configuration conf, RMContext rmContext)
-      throws IOException {
-    scheduler.reinitialize(conf, rmContext);
-  }
-
-  @Override
-  public void recover(RMStateStore.RMState rmState) throws Exception {
-    scheduler.recover(rmState);
-  }
-
-  @Override
-  public QueueInfo getQueueInfo(String s, boolean b, boolean b2)
-          throws IOException {
-    return scheduler.getQueueInfo(s, b, b2);
-  }
-
-  @Override
-  public List<QueueUserACLInfo> getQueueUserAclInfo() {
-    return scheduler.getQueueUserAclInfo();
-  }
-
-  @Override
-  public Resource getMinimumResourceCapability() {
-    return scheduler.getMinimumResourceCapability();
-  }
-
-  @Override
-  public Resource getMaximumResourceCapability() {
-    return scheduler.getMaximumResourceCapability();
-  }
-
-  @Override
-  public ResourceCalculator getResourceCalculator() {
-    return scheduler.getResourceCalculator();
-  }
-
-  @Override
-  public int getNumClusterNodes() {
-    return scheduler.getNumClusterNodes();
-  }
-
-  @Override
-  public SchedulerNodeReport getNodeReport(NodeId nodeId) {
-    return scheduler.getNodeReport(nodeId);
-  }
-
-  @Override
-  public SchedulerAppReport getSchedulerAppInfo(
-          ApplicationAttemptId attemptId) {
-    return scheduler.getSchedulerAppInfo(attemptId);
-  }
-
-  @Override
-  public QueueMetrics getRootQueueMetrics() {
-    return scheduler.getRootQueueMetrics();
-  }
-
-  @Override
-  public synchronized boolean checkAccess(UserGroupInformation callerUGI,
-      QueueACL acl, String queueName) {
-    return scheduler.checkAccess(callerUGI, acl, queueName);
-  }
-
-  @Override
-  public ApplicationResourceUsageReport getAppResourceUsageReport(
-      ApplicationAttemptId appAttemptId) {
-    return scheduler.getAppResourceUsageReport(appAttemptId);
-  }
-
-  @Override
-  public List<ApplicationAttemptId> getAppsInQueue(String queue) {
-    return scheduler.getAppsInQueue(queue);
-  }
-
-  @Override
-  public RMContainer getRMContainer(ContainerId containerId) {
-    return null;
-  }
-
-  @Override
-  public String moveApplication(ApplicationId appId, String newQueue)
-      throws YarnException {
-    return scheduler.moveApplication(appId, newQueue);
-  }
-
-  @Override
-  @LimitedPrivate("yarn")
-  @Unstable
-  public Resource getClusterResource() {
-    return super.getClusterResource();
-  }
-
-  @Override
-  public synchronized List<Container> getTransferredContainers(
-      ApplicationAttemptId currentAttempt) {
-    return new ArrayList<Container>();
-  }
-
-  @Override
-  public Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>>
-      getSchedulerApplications() {
-    return new HashMap<ApplicationId,
-        SchedulerApplication<SchedulerApplicationAttempt>>();
-  }
-
-  @Override
-  protected void completedContainerInternal(RMContainer rmContainer,
-      ContainerStatus containerStatus, RMContainerEventType event) {
-    // do nothing
-  }
-
-  @Override
-  public Priority checkAndGetApplicationPriority(Priority priority,
-      UserGroupInformation user, String queueName, ApplicationId applicationId)
-      throws YarnException {
-    // TODO Dummy implementation.
-    return Priority.newInstance(0);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
index 2625cb7..108c2bc 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
@@ -17,176 +17,104 @@
  */
 package org.apache.hadoop.yarn.sls.scheduler;
 
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.CsvReporter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.SlidingWindowReservoir;
-import com.codahale.metrics.Timer;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.sls.SLSRunner;
 import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
-import org.apache.hadoop.yarn.sls.web.SLSWebApp;
 import org.apache.hadoop.yarn.util.resource.Resources;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import com.codahale.metrics.Timer;
+
+@Private
+@Unstable
 public class SLSCapacityScheduler extends CapacityScheduler implements
         SchedulerWrapper,Configurable {
-  private static final String EOL = System.getProperty("line.separator");
-  private static final int SAMPLING_SIZE = 60;
-  private ScheduledExecutorService pool;
-  // counters for scheduler allocate/handle operations
-  private Counter schedulerAllocateCounter;
-  private Counter schedulerHandleCounter;
-  private Map<SchedulerEventType, Counter> schedulerHandleCounterMap;
-  // Timers for scheduler allocate/handle operations
-  private Timer schedulerAllocateTimer;
-  private Timer schedulerHandleTimer;
-  private Map<SchedulerEventType, Timer> schedulerHandleTimerMap;
-  private List<Histogram> schedulerHistogramList;
-  private Map<Histogram, Timer> histogramTimerMap;
-  private Lock samplerLock;
-  private Lock queueLock;
 
   private Configuration conf;
-
+ 
   private Map<ApplicationAttemptId, String> appQueueMap =
           new ConcurrentHashMap<ApplicationAttemptId, String>();
-  private BufferedWriter jobRuntimeLogBW;
-
-  // Priority of the ResourceSchedulerWrapper shutdown hook.
-  public static final int SHUTDOWN_HOOK_PRIORITY = 30;
-
-  // web app
-  private SLSWebApp web;
 
   private Map<ContainerId, Resource> preemptionContainerMap =
           new ConcurrentHashMap<ContainerId, Resource>();
 
   // metrics
-  private MetricRegistry metrics;
   private SchedulerMetrics schedulerMetrics;
   private boolean metricsON;
-  private String metricsOutputDir;
-  private BufferedWriter metricsLogBW;
-  private boolean running = false;
-  private static Map<Class, Class> defaultSchedulerMetricsMap =
-          new HashMap<Class, Class>();
-  static {
-    defaultSchedulerMetricsMap.put(FairScheduler.class,
-            FairSchedulerMetrics.class);
-    defaultSchedulerMetricsMap.put(FifoScheduler.class,
-            FifoSchedulerMetrics.class);
-    defaultSchedulerMetricsMap.put(CapacityScheduler.class,
-            CapacitySchedulerMetrics.class);
-  }
-  // must set by outside
-  private Set<String> queueSet;
-  private Set<String> trackedAppSet;
+  private Tracker tracker;
 
-  public final Logger LOG = LoggerFactory.getLogger(SLSCapacityScheduler.class);
+  public Tracker getTracker() {
+    return tracker;
+  }
 
   public SLSCapacityScheduler() {
-    samplerLock = new ReentrantLock();
-    queueLock = new ReentrantLock();
+    tracker = new Tracker();
   }
 
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
     super.setConf(conf);
-    // start metrics
     metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true);
     if (metricsON) {
       try {
-        initMetrics();
+        schedulerMetrics = SchedulerMetrics.getInstance(conf,
+            CapacityScheduler.class);
+        schedulerMetrics.init(this, conf);
       } catch (Exception e) {
         e.printStackTrace();
       }
     }
-
-    ShutdownHookManager.get().addShutdownHook(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          if (metricsLogBW != null)  {
-            metricsLogBW.write("]");
-            metricsLogBW.close();
-          }
-          if (web != null) {
-            web.stop();
-          }
-          tearDown();
-        } catch (Exception e) {
-          e.printStackTrace();
-        }
-      }
-    }, SHUTDOWN_HOOK_PRIORITY);
   }
 
   @Override
   public Allocation allocate(ApplicationAttemptId attemptId,
-                             List<ResourceRequest> resourceRequests,
-                             List<ContainerId> containerIds,
-                             List<String> strings, List<String> strings2,
-                             ContainerUpdates updateRequests) {
+      List<ResourceRequest> resourceRequests, List<ContainerId> containerIds,
+      List<String> strings, List<String> strings2,
+      ContainerUpdates updateRequests) {
     if (metricsON) {
-      final Timer.Context context = schedulerAllocateTimer.time();
+      final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer()
+          .time();
       Allocation allocation = null;
       try {
-        allocation = super.allocate(attemptId, resourceRequests,
-                containerIds, strings, strings2, updateRequests);
+        allocation = super
+            .allocate(attemptId, resourceRequests, containerIds, strings,
+                strings2, updateRequests);
         return allocation;
       } finally {
         context.stop();
-        schedulerAllocateCounter.inc();
+        schedulerMetrics.increaseSchedulerAllocationCounter();
         try {
           updateQueueWithAllocateRequest(allocation, attemptId,
                   resourceRequests, containerIds);
@@ -195,81 +123,83 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
         }
       }
     } else {
-      return super.allocate(attemptId,
-              resourceRequests, containerIds, strings, strings2, updateRequests);
+      return super.allocate(attemptId, resourceRequests, containerIds, strings,
+          strings2, updateRequests);
     }
   }
 
   @Override
   public void handle(SchedulerEvent schedulerEvent) {
-	    // metrics off
-	    if (! metricsON) {
-	      super.handle(schedulerEvent);
-	      return;
-	    }
-	    if(!running)    running = true;
-
-	    // metrics on
-	    Timer.Context handlerTimer = null;
-	    Timer.Context operationTimer = null;
-
-	    NodeUpdateSchedulerEventWrapper eventWrapper;
-	    try {
-	      //if (schedulerEvent instanceof NodeUpdateSchedulerEvent) {
-	      if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE
-	              && schedulerEvent instanceof NodeUpdateSchedulerEvent) {
-	        eventWrapper = new NodeUpdateSchedulerEventWrapper(
-	                (NodeUpdateSchedulerEvent)schedulerEvent);
-	        schedulerEvent = eventWrapper;
-	        updateQueueWithNodeUpdate(eventWrapper);
-	      } else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
-	          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
-	        // check if having AM Container, update resource usage information
-	        AppAttemptRemovedSchedulerEvent appRemoveEvent =
-	            (AppAttemptRemovedSchedulerEvent) schedulerEvent;
-	        ApplicationAttemptId appAttemptId =
-	                appRemoveEvent.getApplicationAttemptID();
-	        String queue = appQueueMap.get(appAttemptId);
-	        SchedulerAppReport app = super.getSchedulerAppInfo(appAttemptId);
-	        if (! app.getLiveContainers().isEmpty()) {  // have 0 or 1
-	          // should have one container which is AM container
-	          RMContainer rmc = app.getLiveContainers().iterator().next();
-	          updateQueueMetrics(queue,
-	                  rmc.getContainer().getResource().getMemory(),
-	                  rmc.getContainer().getResource().getVirtualCores());
-	        }
-	      }
-
-	      handlerTimer = schedulerHandleTimer.time();
-	      operationTimer = schedulerHandleTimerMap
-	              .get(schedulerEvent.getType()).time();
-
-	      super.handle(schedulerEvent);
-	    } finally {
-	      if (handlerTimer != null)     handlerTimer.stop();
-	      if (operationTimer != null)   operationTimer.stop();
-	      schedulerHandleCounter.inc();
-	      schedulerHandleCounterMap.get(schedulerEvent.getType()).inc();
-
-	      if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
-	          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
-	        SLSRunner.decreaseRemainingApps();
-	        AppAttemptRemovedSchedulerEvent appRemoveEvent =
-	                (AppAttemptRemovedSchedulerEvent) schedulerEvent;
-	        ApplicationAttemptId appAttemptId =
-	                appRemoveEvent.getApplicationAttemptID();
-	        appQueueMap.remove(appRemoveEvent.getApplicationAttemptID());
-	      } else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_ADDED
-	          && schedulerEvent instanceof AppAttemptAddedSchedulerEvent) {
-	        AppAttemptAddedSchedulerEvent appAddEvent =
-	                (AppAttemptAddedSchedulerEvent) schedulerEvent;
-          SchedulerApplication app =
-              applications.get(appAddEvent.getApplicationAttemptId()
-                  .getApplicationId());
-          appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue()
-              .getQueueName());
-	      }
-	    }
+    if (!metricsON) {
+      super.handle(schedulerEvent);
+      return;
+    }
+
+    if (!schedulerMetrics.isRunning()) {
+      schedulerMetrics.setRunning(true);
+    }
+
+    Timer.Context handlerTimer = null;
+    Timer.Context operationTimer = null;
+
+    NodeUpdateSchedulerEventWrapper eventWrapper;
+    try {
+      if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE
+          && schedulerEvent instanceof NodeUpdateSchedulerEvent) {
+        eventWrapper = new NodeUpdateSchedulerEventWrapper(
+            (NodeUpdateSchedulerEvent)schedulerEvent);
+        schedulerEvent = eventWrapper;
+        updateQueueWithNodeUpdate(eventWrapper);
+      } else if (schedulerEvent.getType() ==
+          SchedulerEventType.APP_ATTEMPT_REMOVED
+          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
+        // check if having AM Container, update resource usage information
+        AppAttemptRemovedSchedulerEvent appRemoveEvent =
+            (AppAttemptRemovedSchedulerEvent) schedulerEvent;
+        ApplicationAttemptId appAttemptId =
+            appRemoveEvent.getApplicationAttemptID();
+        String queue = appQueueMap.get(appAttemptId);
+        SchedulerAppReport app = super.getSchedulerAppInfo(appAttemptId);
+        if (!app.getLiveContainers().isEmpty()) {  // have 0 or 1
+          // should have one container which is AM container
+          RMContainer rmc = app.getLiveContainers().iterator().next();
+          schedulerMetrics.updateQueueMetricsByRelease(
+              rmc.getContainer().getResource(), queue);
+        }
+      }
+
+      handlerTimer = schedulerMetrics.getSchedulerHandleTimer().time();
+      operationTimer = schedulerMetrics.getSchedulerHandleTimer(
+          schedulerEvent.getType()).time();
+
+      super.handle(schedulerEvent);
+    } finally {
+      if (handlerTimer != null) {
+        handlerTimer.stop();
+      }
+      if (operationTimer != null) {
+        operationTimer.stop();
+      }
+      schedulerMetrics.increaseSchedulerHandleCounter(schedulerEvent.getType());
+
+      if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
+          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
+        SLSRunner.decreaseRemainingApps();
+        AppAttemptRemovedSchedulerEvent appRemoveEvent =
+            (AppAttemptRemovedSchedulerEvent) schedulerEvent;
+        appQueueMap.remove(appRemoveEvent.getApplicationAttemptID());
+      } else if (schedulerEvent.getType() ==
+          SchedulerEventType.APP_ATTEMPT_ADDED
+          && schedulerEvent instanceof AppAttemptAddedSchedulerEvent) {
+        AppAttemptAddedSchedulerEvent appAddEvent =
+            (AppAttemptAddedSchedulerEvent) schedulerEvent;
+        SchedulerApplication app =
+            applications.get(appAddEvent.getApplicationAttemptId()
+                .getApplicationId());
+        appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue()
+            .getQueueName());
+      }
+    }
   }
 
   private void updateQueueWithNodeUpdate(
@@ -294,7 +224,7 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
         if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
           for (RMContainer rmc : app.getLiveContainers()) {
             if (rmc.getContainerId() == containerId) {
-              releasedMemory += rmc.getContainer().getResource().getMemory();
+              releasedMemory += rmc.getContainer().getResource().getMemorySize();
               releasedVCores += rmc.getContainer()
                       .getResource().getVirtualCores();
               break;
@@ -303,13 +233,14 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
         } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
           if (preemptionContainerMap.containsKey(containerId)) {
             Resource preResource = preemptionContainerMap.get(containerId);
-            releasedMemory += preResource.getMemory();
+            releasedMemory += preResource.getMemorySize();
             releasedVCores += preResource.getVirtualCores();
             preemptionContainerMap.remove(containerId);
           }
         }
         // update queue counters
-        updateQueueMetrics(queue, releasedMemory, releasedVCores);
+        schedulerMetrics.updateQueueMetricsByRelease(
+            Resource.newInstance(releasedMemory, releasedVCores), queue);
       }
     }
   }
@@ -388,405 +319,54 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
     }
 
     // update metrics
-    SortedMap<String, Counter> counterMap = metrics.getCounters();
-    String names[] = new String[]{
-            "counter.queue." + queueName + ".pending.memory",
-            "counter.queue." + queueName + ".pending.cores",
-            "counter.queue." + queueName + ".allocated.memory",
-            "counter.queue." + queueName + ".allocated.cores"};
-    int values[] = new int[]{pendingResource.getMemory(),
-            pendingResource.getVirtualCores(),
-            allocatedResource.getMemory(), allocatedResource.getVirtualCores()};
-    for (int i = names.length - 1; i >= 0; i --) {
-      if (! counterMap.containsKey(names[i])) {
-        metrics.counter(names[i]);
-        counterMap = metrics.getCounters();
-      }
-      counterMap.get(names[i]).inc(values[i]);
-    }
-
-    queueLock.lock();
-    try {
-      if (! schedulerMetrics.isTracked(queueName)) {
-        schedulerMetrics.trackQueue(queueName);
-      }
-    } finally {
-      queueLock.unlock();
-    }
-  }
-
-  private void tearDown() throws IOException {
-    // close job runtime writer
-    if (jobRuntimeLogBW != null) {
-      jobRuntimeLogBW.close();
-    }
-    // shut pool
-    if (pool != null)  pool.shutdown();
-  }
-
-  @SuppressWarnings("unchecked")
-  private void initMetrics() throws Exception {
-    metrics = new MetricRegistry();
-    // configuration
-    metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR);
-    int metricsWebAddressPort = conf.getInt(
-            SLSConfiguration.METRICS_WEB_ADDRESS_PORT,
-            SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT);
-    // create SchedulerMetrics for current scheduler
-    String schedulerMetricsType = conf.get(CapacityScheduler.class.getName());
-    Class schedulerMetricsClass = schedulerMetricsType == null?
-            defaultSchedulerMetricsMap.get(CapacityScheduler.class) :
-            Class.forName(schedulerMetricsType);
-    schedulerMetrics = (SchedulerMetrics)ReflectionUtils
-            .newInstance(schedulerMetricsClass, new Configuration());
-    schedulerMetrics.init(this, metrics);
-
-    // register various metrics
-    registerJvmMetrics();
-    registerClusterResourceMetrics();
-    registerContainerAppNumMetrics();
-    registerSchedulerMetrics();
-
-    // .csv output
-    initMetricsCSVOutput();
-
-    // start web app to provide real-time tracking
-    web = new SLSWebApp(this, metricsWebAddressPort);
-    web.start();
-
-    // a thread to update histogram timer
-    pool = new ScheduledThreadPoolExecutor(2);
-    pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000,
-            TimeUnit.MILLISECONDS);
-
-    // a thread to output metrics for real-tiem tracking
-    pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000,
-            TimeUnit.MILLISECONDS);
-
-    // application running information
-    jobRuntimeLogBW = new BufferedWriter(
-        new OutputStreamWriter(new FileOutputStream(
-            metricsOutputDir + "/jobruntime.csv"), StandardCharsets.UTF_8));
-    jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," +
-            "simulate_start_time,simulate_end_time" + EOL);
-    jobRuntimeLogBW.flush();
-  }
-
-  private void registerJvmMetrics() {
-    // add JVM gauges
-    metrics.register("variable.jvm.free.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return Runtime.getRuntime().freeMemory();
-        }
-      }
-    );
-    metrics.register("variable.jvm.max.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return Runtime.getRuntime().maxMemory();
-        }
-      }
-    );
-    metrics.register("variable.jvm.total.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return Runtime.getRuntime().totalMemory();
-        }
-      }
-    );
-  }
-
-  private void registerClusterResourceMetrics() {
-    metrics.register("variable.cluster.allocated.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          if( getRootQueueMetrics() == null) {
-            return 0L;
-          } else {
-            return getRootQueueMetrics().getAllocatedMB();
-          }
-        }
-      }
-    );
-    metrics.register("variable.cluster.allocated.vcores",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          if(getRootQueueMetrics() == null) {
-            return 0;
-          } else {
-            return getRootQueueMetrics().getAllocatedVirtualCores();
-          }
-        }
-      }
-    );
-    metrics.register("variable.cluster.available.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          if(getRootQueueMetrics() == null) {
-            return 0L;
-          } else {
-            return getRootQueueMetrics().getAvailableMB();
-          }
-        }
-      }
-    );
-    metrics.register("variable.cluster.available.vcores",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          if(getRootQueueMetrics() == null) {
-            return 0;
-          } else {
-            return getRootQueueMetrics().getAvailableVirtualCores();
-          }
-        }
-      }
-    );
+    schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource,
+        queueName);
   }
 
-  private void registerContainerAppNumMetrics() {
-    metrics.register("variable.running.application",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          if(getRootQueueMetrics() == null) {
-            return 0;
-          } else {
-            return getRootQueueMetrics().getAppsRunning();
-          }
-        }
-      }
-    );
-    metrics.register("variable.running.container",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          if(getRootQueueMetrics() == null) {
-            return 0;
-          } else {
-            return getRootQueueMetrics().getAllocatedContainers();
-          }
-        }
-      }
-    );
-  }
-
-  private void registerSchedulerMetrics() {
-    samplerLock.lock();
-    try {
-      // counters for scheduler operations
-      schedulerAllocateCounter = metrics.counter(
-              "counter.scheduler.operation.allocate");
-      schedulerHandleCounter = metrics.counter(
-              "counter.scheduler.operation.handle");
-      schedulerHandleCounterMap = new HashMap<SchedulerEventType, Counter>();
-      for (SchedulerEventType e : SchedulerEventType.values()) {
-        Counter counter = metrics.counter(
-                "counter.scheduler.operation.handle." + e);
-        schedulerHandleCounterMap.put(e, counter);
-      }
-      // timers for scheduler operations
-      int timeWindowSize = conf.getInt(
-              SLSConfiguration.METRICS_TIMER_WINDOW_SIZE,
-              SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT);
-      schedulerAllocateTimer = new Timer(
-              new SlidingWindowReservoir(timeWindowSize));
-      schedulerHandleTimer = new Timer(
-              new SlidingWindowReservoir(timeWindowSize));
-      schedulerHandleTimerMap = new HashMap<SchedulerEventType, Timer>();
-      for (SchedulerEventType e : SchedulerEventType.values()) {
-        Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize));
-        schedulerHandleTimerMap.put(e, timer);
-      }
-      // histogram for scheduler operations (Samplers)
-      schedulerHistogramList = new ArrayList<Histogram>();
-      histogramTimerMap = new HashMap<Histogram, Timer>();
-      Histogram schedulerAllocateHistogram = new Histogram(
-              new SlidingWindowReservoir(SAMPLING_SIZE));
-      metrics.register("sampler.scheduler.operation.allocate.timecost",
-              schedulerAllocateHistogram);
-      schedulerHistogramList.add(schedulerAllocateHistogram);
-      histogramTimerMap.put(schedulerAllocateHistogram, schedulerAllocateTimer);
-      Histogram schedulerHandleHistogram = new Histogram(
-              new SlidingWindowReservoir(SAMPLING_SIZE));
-      metrics.register("sampler.scheduler.operation.handle.timecost",
-              schedulerHandleHistogram);
-      schedulerHistogramList.add(schedulerHandleHistogram);
-      histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer);
-      for (SchedulerEventType e : SchedulerEventType.values()) {
-        Histogram histogram = new Histogram(
-                new SlidingWindowReservoir(SAMPLING_SIZE));
-        metrics.register(
-                "sampler.scheduler.operation.handle." + e + ".timecost",
-                histogram);
-        schedulerHistogramList.add(histogram);
-        histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e));
-      }
-    } finally {
-      samplerLock.unlock();
+  private void initQueueMetrics(CSQueue queue) {
+    if (queue instanceof LeafQueue) {
+      schedulerMetrics.initQueueMetric(queue.getQueueName());
+      return;
     }
-  }
 
-  private void initMetricsCSVOutput() {
-    int timeIntervalMS = conf.getInt(
-            SLSConfiguration.METRICS_RECORD_INTERVAL_MS,
-            SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT);
-    File dir = new File(metricsOutputDir + "/metrics");
-    if(! dir.exists()
-            && ! dir.mkdirs()) {
-      LOG.error("Cannot create directory {}", dir.getAbsoluteFile());
+    for (CSQueue child : queue.getChildQueues()) {
+      initQueueMetrics(child);
     }
-    final CsvReporter reporter = CsvReporter.forRegistry(metrics)
-            .formatFor(Locale.US)
-            .convertRatesTo(TimeUnit.SECONDS)
-            .convertDurationsTo(TimeUnit.MILLISECONDS)
-            .build(new File(metricsOutputDir + "/metrics"));
-    reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS);
   }
+  @Override
+  public void serviceInit(Configuration configuration) throws Exception {
+    super.serviceInit(configuration);
 
-  class HistogramsRunnable implements Runnable {
-    @Override
-    public void run() {
-      samplerLock.lock();
-      try {
-        for (Histogram histogram : schedulerHistogramList) {
-          Timer timer = histogramTimerMap.get(histogram);
-          histogram.update((int) timer.getSnapshot().getMean());
-        }
-      } finally {
-        samplerLock.unlock();
-      }
-    }
-  }
-
-  class MetricsLogRunnable implements Runnable {
-    private boolean firstLine = true;
-    public MetricsLogRunnable() {
-      try {
-        metricsLogBW = new BufferedWriter(
-            new OutputStreamWriter(new FileOutputStream(
-                metricsOutputDir + "/realtimetrack.json"),
-                StandardCharsets.UTF_8));
-        metricsLogBW.write("[");
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-
-    @Override
-    public void run() {
-      if(running) {
-        // all WebApp to get real tracking json
-        String metrics = web.generateRealTimeTrackingMetrics();
-        // output
-        try {
-          if(firstLine) {
-            metricsLogBW.write(metrics + EOL);
-            firstLine = false;
-          } else {
-            metricsLogBW.write("," + metrics + EOL);
-          }
-          metricsLogBW.flush();
-        } catch (IOException e) {
-          e.printStackTrace();
-        }
-      }
+    if (metricsON) {
+      initQueueMetrics(getRootQueue());
     }
   }
 
-  // the following functions are used by AMSimulator
-  public void addAMRuntime(ApplicationId appId,
-                           long traceStartTimeMS, long traceEndTimeMS,
-                           long simulateStartTimeMS, long simulateEndTimeMS) {
-
+  @Override
+  public void serviceStop() throws Exception {
     try {
-      // write job runtime information
-      StringBuilder sb = new StringBuilder();
-      sb.append(appId).append(",").append(traceStartTimeMS).append(",")
-              .append(traceEndTimeMS).append(",").append(simulateStartTimeMS)
-              .append(",").append(simulateEndTimeMS);
-      jobRuntimeLogBW.write(sb.toString() + EOL);
-      jobRuntimeLogBW.flush();
-    } catch (IOException e) {
+      schedulerMetrics.tearDown();
+    } catch (Exception e) {
       e.printStackTrace();
     }
+    super.serviceStop();
   }
 
-  private void updateQueueMetrics(String queue,
-                                  int releasedMemory, int releasedVCores) {
-    // update queue counters
-    SortedMap<String, Counter> counterMap = metrics.getCounters();
-    if (releasedMemory != 0) {
-      String name = "counter.queue." + queue + ".allocated.memory";
-      if (! counterMap.containsKey(name)) {
-        metrics.counter(name);
-        counterMap = metrics.getCounters();
-      }
-      counterMap.get(name).inc(-releasedMemory);
-    }
-    if (releasedVCores != 0) {
-      String name = "counter.queue." + queue + ".allocated.cores";
-      if (! counterMap.containsKey(name)) {
-        metrics.counter(name);
-        counterMap = metrics.getCounters();
-      }
-      counterMap.get(name).inc(-releasedVCores);
-    }
-  }
-
-  public void setQueueSet(Set<String> queues) {
-    this.queueSet = queues;
-  }
-
-  public Set<String> getQueueSet() {
-    return this.queueSet;
-  }
-
-  public void setTrackedAppSet(Set<String> apps) {
-    this.trackedAppSet = apps;
-  }
-
-  public Set<String> getTrackedAppSet() {
-    return this.trackedAppSet;
-  }
-
-  public MetricRegistry getMetrics() {
-    return metrics;
-  }
 
   public SchedulerMetrics getSchedulerMetrics() {
     return schedulerMetrics;
   }
 
-  // API open to out classes
-  public void addTrackedApp(ApplicationAttemptId appAttemptId,
-                            String oldAppId) {
-    if (metricsON) {
-      schedulerMetrics.trackApp(appAttemptId, oldAppId);
-    }
-  }
-
-  public void removeTrackedApp(ApplicationAttemptId appAttemptId,
-                               String oldAppId) {
-    if (metricsON) {
-      schedulerMetrics.untrackApp(appAttemptId, oldAppId);
-    }
-  }
-
   @Override
   public Configuration getConf() {
     return conf;
   }
 
-
-
-
-}
-
+  public String getRealQueueName(String queue) throws YarnException {
+    if (getQueue(queue) == null) {
+      throw new YarnException("Can't find the queue by the given name: " + queue
+          + "! Please check if queue " + queue + " is in the allocation file.");
+    }
+    return getQueue(queue).getQueueName();
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message