hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From inigo...@apache.org
Subject [09/13] hadoop git commit: YARN-5654. Not be able to run SLS with FairScheduler (yufeigu via rkanter)
Date Thu, 30 Mar 2017 02:35:50 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cfe3957/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java
index 8645a69..a8792e8 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java
@@ -18,40 +18,170 @@
 
 package org.apache.hadoop.yarn.sls.scheduler;
 
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.Locale;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Lock;
 
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.CsvReporter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.SlidingWindowReservoir;
+import com.codahale.metrics.Timer;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.MetricRegistry;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
+import org.apache.hadoop.yarn.sls.web.SLSWebApp;
+import org.apache.log4j.Logger;
 
 @Private
 @Unstable
 public abstract class SchedulerMetrics {
+  private static final String EOL = System.getProperty("line.separator");
+  private static final int SAMPLING_SIZE = 60;
+  private static final Logger LOG = Logger.getLogger(SchedulerMetrics.class);
+
   protected ResourceScheduler scheduler;
   protected Set<String> trackedQueues;
   protected MetricRegistry metrics;
   protected Set<String> appTrackedMetrics;
   protected Set<String> queueTrackedMetrics;
-  
+
+  private Configuration conf;
+  private ScheduledExecutorService pool;
+  private SLSWebApp web;
+
+  // metrics
+  private String metricsOutputDir;
+  private BufferedWriter metricsLogBW;
+  private BufferedWriter jobRuntimeLogBW;
+  private boolean running = false;
+
+  // counters for scheduler allocate/handle operations
+  private Counter schedulerAllocateCounter;
+  private Counter schedulerHandleCounter;
+  private Map<SchedulerEventType, Counter> schedulerHandleCounterMap;
+
+  // Timers for scheduler allocate/handle operations
+  private Timer schedulerAllocateTimer;
+  private Timer schedulerHandleTimer;
+  private Map<SchedulerEventType, Timer> schedulerHandleTimerMap;
+  private List<Histogram> schedulerHistogramList;
+  private Map<Histogram, Timer> histogramTimerMap;
+  private Lock samplerLock;
+  private Lock queueLock;
+
+  static Class getSchedulerMetricsClass(Configuration conf,
+      Class schedulerClass) throws ClassNotFoundException {
+    Class metricClass = null;
+    String schedulerMetricsType = conf.get(schedulerClass.getName());
+    if (schedulerMetricsType != null) {
+      metricClass = Class.forName(schedulerMetricsType);
+    }
+
+    if (schedulerClass.equals(FairScheduler.class)) {
+      metricClass = FairSchedulerMetrics.class;
+    } else if (schedulerClass.equals(CapacityScheduler.class)) {
+      metricClass = CapacitySchedulerMetrics.class;
+    } else if (schedulerClass.equals(FifoScheduler.class)) {
+      metricClass = FifoSchedulerMetrics.class;
+    }
+
+    return metricClass;
+  }
+
+  static SchedulerMetrics getInstance(Configuration conf, Class schedulerClass)
+      throws ClassNotFoundException {
+    Class schedulerMetricClass = getSchedulerMetricsClass(conf, schedulerClass);
+    return (SchedulerMetrics) ReflectionUtils
+        .newInstance(schedulerMetricClass, new Configuration());
+  }
+
   public SchedulerMetrics() {
+    metrics = new MetricRegistry();
+
     appTrackedMetrics = new HashSet<>();
     appTrackedMetrics.add("live.containers");
     appTrackedMetrics.add("reserved.containers");
+
     queueTrackedMetrics = new HashSet<>();
+    trackedQueues = new HashSet<>();
+
+    samplerLock = new ReentrantLock();
+    queueLock = new ReentrantLock();
   }
-  
-  public void init(ResourceScheduler scheduler, MetricRegistry metrics) {
-    this.scheduler = scheduler;
-    this.trackedQueues = new HashSet<>();
-    this.metrics = metrics;
+
+  void init(ResourceScheduler resourceScheduler, Configuration config)
+      throws Exception {
+    this.scheduler = resourceScheduler;
+    this.conf = config;
+
+    metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR);
+
+    // register various metrics
+    registerJvmMetrics();
+    registerClusterResourceMetrics();
+    registerContainerAppNumMetrics();
+    registerSchedulerMetrics();
+
+    // .csv output
+    initMetricsCSVOutput();
+
+    // start web app to provide real-time tracking
+    int metricsWebAddressPort = conf.getInt(
+        SLSConfiguration.METRICS_WEB_ADDRESS_PORT,
+        SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT);
+    web = new SLSWebApp((SchedulerWrapper)scheduler, metricsWebAddressPort);
+    web.start();
+
+    // a thread to update histogram timer
+    pool = new ScheduledThreadPoolExecutor(2);
+    pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000,
+        TimeUnit.MILLISECONDS);
+
+    // a thread to output metrics for real-tiem tracking
+    pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000,
+        TimeUnit.MILLISECONDS);
+
+    // application running information
+    jobRuntimeLogBW =
+        new BufferedWriter(new OutputStreamWriter(new FileOutputStream(
+            metricsOutputDir + "/jobruntime.csv"), "UTF-8"));
+    jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," +
+        "simulate_start_time,simulate_end_time" + EOL);
+    jobRuntimeLogBW.flush();
+  }
+
+  public MetricRegistry getMetrics() {
+    return metrics;
   }
 
   protected SchedulerApplicationAttempt getSchedulerAppAttempt(
@@ -117,7 +247,392 @@ public abstract class SchedulerMetrics {
   public Set<String> getAppTrackedMetrics() {
     return appTrackedMetrics;
   }
+
   public Set<String> getQueueTrackedMetrics() {
     return queueTrackedMetrics;
   }
+
+  private void registerJvmMetrics() {
+    // add JVM gauges
+    metrics.register("variable.jvm.free.memory",
+        new Gauge<Long>() {
+          @Override
+          public Long getValue() {
+            return Runtime.getRuntime().freeMemory();
+          }
+        }
+    );
+    metrics.register("variable.jvm.max.memory",
+        new Gauge<Long>() {
+          @Override
+          public Long getValue() {
+            return Runtime.getRuntime().maxMemory();
+          }
+        }
+    );
+    metrics.register("variable.jvm.total.memory",
+        new Gauge<Long>() {
+          @Override
+          public Long getValue() {
+            return Runtime.getRuntime().totalMemory();
+          }
+        }
+    );
+  }
+
+  private void registerClusterResourceMetrics() {
+    metrics.register("variable.cluster.allocated.memory",
+        new Gauge<Long>() {
+          @Override
+          public Long getValue() {
+            if (scheduler.getRootQueueMetrics() == null) {
+              return 0L;
+            } else {
+              return scheduler.getRootQueueMetrics().getAllocatedMB();
+            }
+          }
+        }
+    );
+    metrics.register("variable.cluster.allocated.vcores",
+        new Gauge<Integer>() {
+          @Override
+          public Integer getValue() {
+            if (scheduler.getRootQueueMetrics() == null) {
+              return 0;
+            } else {
+              return scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
+            }
+          }
+        }
+    );
+    metrics.register("variable.cluster.available.memory",
+        new Gauge<Long>() {
+          @Override
+          public Long getValue() {
+            if (scheduler.getRootQueueMetrics() == null) {
+              return 0L;
+            } else {
+              return scheduler.getRootQueueMetrics().getAvailableMB();
+            }
+          }
+        }
+    );
+    metrics.register("variable.cluster.available.vcores",
+        new Gauge<Integer>() {
+          @Override
+          public Integer getValue() {
+            if (scheduler.getRootQueueMetrics() == null) {
+              return 0;
+            } else {
+              return scheduler.getRootQueueMetrics().getAvailableVirtualCores();
+            }
+          }
+        }
+    );
+  }
+
+  private void registerContainerAppNumMetrics() {
+    metrics.register("variable.running.application",
+        new Gauge<Integer>() {
+          @Override
+          public Integer getValue() {
+            if (scheduler.getRootQueueMetrics() == null) {
+              return 0;
+            } else {
+              return scheduler.getRootQueueMetrics().getAppsRunning();
+            }
+          }
+        }
+    );
+    metrics.register("variable.running.container",
+        new Gauge<Integer>() {
+          @Override
+          public Integer getValue() {
+            if (scheduler.getRootQueueMetrics() == null) {
+              return 0;
+            } else {
+              return scheduler.getRootQueueMetrics().getAllocatedContainers();
+            }
+          }
+        }
+    );
+  }
+
+  private void registerSchedulerMetrics() {
+    samplerLock.lock();
+    try {
+      // counters for scheduler operations
+      schedulerAllocateCounter = metrics.counter(
+          "counter.scheduler.operation.allocate");
+      schedulerHandleCounter = metrics.counter(
+          "counter.scheduler.operation.handle");
+      schedulerHandleCounterMap = new HashMap<>();
+      for (SchedulerEventType e : SchedulerEventType.values()) {
+        Counter counter = metrics.counter(
+            "counter.scheduler.operation.handle." + e);
+        schedulerHandleCounterMap.put(e, counter);
+      }
+      // timers for scheduler operations
+      int timeWindowSize = conf.getInt(
+          SLSConfiguration.METRICS_TIMER_WINDOW_SIZE,
+          SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT);
+      schedulerAllocateTimer = new Timer(
+          new SlidingWindowReservoir(timeWindowSize));
+      schedulerHandleTimer = new Timer(
+          new SlidingWindowReservoir(timeWindowSize));
+      schedulerHandleTimerMap = new HashMap<>();
+      for (SchedulerEventType e : SchedulerEventType.values()) {
+        Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize));
+        schedulerHandleTimerMap.put(e, timer);
+      }
+      // histogram for scheduler operations (Samplers)
+      schedulerHistogramList = new ArrayList<>();
+      histogramTimerMap = new HashMap<>();
+      Histogram schedulerAllocateHistogram = new Histogram(
+          new SlidingWindowReservoir(SAMPLING_SIZE));
+      metrics.register("sampler.scheduler.operation.allocate.timecost",
+          schedulerAllocateHistogram);
+      schedulerHistogramList.add(schedulerAllocateHistogram);
+      histogramTimerMap.put(schedulerAllocateHistogram, schedulerAllocateTimer);
+      Histogram schedulerHandleHistogram = new Histogram(
+          new SlidingWindowReservoir(SAMPLING_SIZE));
+      metrics.register("sampler.scheduler.operation.handle.timecost",
+          schedulerHandleHistogram);
+      schedulerHistogramList.add(schedulerHandleHistogram);
+      histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer);
+      for (SchedulerEventType e : SchedulerEventType.values()) {
+        Histogram histogram = new Histogram(
+            new SlidingWindowReservoir(SAMPLING_SIZE));
+        metrics.register(
+            "sampler.scheduler.operation.handle." + e + ".timecost",
+            histogram);
+        schedulerHistogramList.add(histogram);
+        histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e));
+      }
+    } finally {
+      samplerLock.unlock();
+    }
+  }
+
+  private void initMetricsCSVOutput() {
+    int timeIntervalMS = conf.getInt(
+        SLSConfiguration.METRICS_RECORD_INTERVAL_MS,
+        SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT);
+    File dir = new File(metricsOutputDir + "/metrics");
+    if(!dir.exists() && !dir.mkdirs()) {
+      LOG.error("Cannot create directory " + dir.getAbsoluteFile());
+    }
+    final CsvReporter reporter = CsvReporter.forRegistry(metrics)
+        .formatFor(Locale.US)
+        .convertRatesTo(TimeUnit.SECONDS)
+        .convertDurationsTo(TimeUnit.MILLISECONDS)
+        .build(new File(metricsOutputDir + "/metrics"));
+    reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS);
+  }
+
+  boolean isRunning() {
+    return running;
+  }
+
+  void setRunning(boolean running) {
+    this.running = running;
+  }
+
+  class HistogramsRunnable implements Runnable {
+    @Override
+    public void run() {
+      samplerLock.lock();
+      try {
+        for (Histogram histogram : schedulerHistogramList) {
+          Timer timer = histogramTimerMap.get(histogram);
+          histogram.update((int) timer.getSnapshot().getMean());
+        }
+      } finally {
+        samplerLock.unlock();
+      }
+    }
+  }
+
+  class MetricsLogRunnable implements Runnable {
+    private boolean firstLine = true;
+
+    MetricsLogRunnable() {
+      try {
+        metricsLogBW =
+            new BufferedWriter(new OutputStreamWriter(new FileOutputStream(
+                metricsOutputDir + "/realtimetrack.json"), "UTF-8"));
+        metricsLogBW.write("[");
+      } catch (IOException e) {
+        LOG.info(e.getMessage());
+      }
+    }
+
+    @Override
+    public void run() {
+      if(running) {
+        // all WebApp to get real tracking json
+        String trackingMetrics = web.generateRealTimeTrackingMetrics();
+        // output
+        try {
+          if(firstLine) {
+            metricsLogBW.write(trackingMetrics + EOL);
+            firstLine = false;
+          } else {
+            metricsLogBW.write("," + trackingMetrics + EOL);
+          }
+          metricsLogBW.flush();
+        } catch (IOException e) {
+          LOG.info(e.getMessage());
+        }
+      }
+    }
+  }
+
+  void tearDown() throws Exception {
+    if (metricsLogBW != null)  {
+      metricsLogBW.write("]");
+      metricsLogBW.close();
+    }
+
+    if (web != null) {
+      web.stop();
+    }
+
+    if (jobRuntimeLogBW != null) {
+      jobRuntimeLogBW.close();
+    }
+
+    if (pool != null) {
+      pool.shutdown();
+    }
+  }
+
+  void increaseSchedulerAllocationCounter() {
+    schedulerAllocateCounter.inc();
+  }
+
+  void increaseSchedulerHandleCounter(SchedulerEventType schedulerEventType) {
+    schedulerHandleCounter.inc();
+    schedulerHandleCounterMap.get(schedulerEventType).inc();
+  }
+
+  Timer getSchedulerAllocateTimer() {
+    return schedulerAllocateTimer;
+  }
+
+  Timer getSchedulerHandleTimer() {
+    return schedulerHandleTimer;
+  }
+
+  Timer getSchedulerHandleTimer(SchedulerEventType schedulerEventType) {
+    return schedulerHandleTimerMap.get(schedulerEventType);
+  }
+
+  private enum QueueMetric {
+    PENDING_MEMORY("pending.memory"),
+    PENDING_VCORES("pending.cores"),
+    ALLOCATED_MEMORY("allocated.memory"),
+    ALLOCATED_VCORES("allocated.cores");
+
+    private String value;
+
+    QueueMetric(String value) {
+      this.value = value;
+    }
+  }
+
+  private String getQueueMetricName(String queue, QueueMetric metric) {
+    return "counter.queue." + queue + "." + metric.value;
+  }
+
+  private void traceQueueIfNotTraced(String queue) {
+    queueLock.lock();
+    try {
+      if (!isTracked(queue)) {
+        trackQueue(queue);
+      }
+    } finally {
+      queueLock.unlock();
+    }
+  }
+
+  void initQueueMetric(String queueName){
+    SortedMap<String, Counter> counterMap = metrics.getCounters();
+
+    for (QueueMetric queueMetric : QueueMetric.values()) {
+      String metricName = getQueueMetricName(queueName, queueMetric);
+      if (!counterMap.containsKey(metricName)) {
+        metrics.counter(metricName);
+        counterMap = metrics.getCounters();
+      }
+    }
+
+    traceQueueIfNotTraced(queueName);
+  }
+
+  void updateQueueMetrics(Resource pendingResource, Resource allocatedResource,
+      String queueName) {
+    SortedMap<String, Counter> counterMap = metrics.getCounters();
+    for(QueueMetric metric : QueueMetric.values()) {
+      String metricName = getQueueMetricName(queueName, metric);
+      if (!counterMap.containsKey(metricName)) {
+        metrics.counter(metricName);
+        counterMap = metrics.getCounters();
+      }
+
+      if (metric == QueueMetric.PENDING_MEMORY) {
+        counterMap.get(metricName).inc(pendingResource.getMemorySize());
+      } else if (metric == QueueMetric.PENDING_VCORES) {
+        counterMap.get(metricName).inc(pendingResource.getVirtualCores());
+      } else if (metric == QueueMetric.ALLOCATED_MEMORY) {
+        counterMap.get(metricName).inc(allocatedResource.getMemorySize());
+      } else if (metric == QueueMetric.ALLOCATED_VCORES){
+        counterMap.get(metricName).inc(allocatedResource.getVirtualCores());
+      }
+    }
+
+    traceQueueIfNotTraced(queueName);
+  }
+
+  void updateQueueMetricsByRelease(Resource releaseResource, String queue) {
+    SortedMap<String, Counter> counterMap = metrics.getCounters();
+    String name = getQueueMetricName(queue, QueueMetric.ALLOCATED_MEMORY);
+    if (!counterMap.containsKey(name)) {
+      metrics.counter(name);
+      counterMap = metrics.getCounters();
+    }
+    counterMap.get(name).inc(-releaseResource.getMemorySize());
+
+    String vcoreMetric =
+        getQueueMetricName(queue, QueueMetric.ALLOCATED_VCORES);
+    if (!counterMap.containsKey(vcoreMetric)) {
+      metrics.counter(vcoreMetric);
+      counterMap = metrics.getCounters();
+    }
+    counterMap.get(vcoreMetric).inc(-releaseResource.getVirtualCores());
+  }
+
+  public void addTrackedApp(ApplicationId appId,
+      String oldAppId) {
+    trackApp(appId, oldAppId);
+  }
+
+  public void removeTrackedApp(String oldAppId) {
+    untrackApp(oldAppId);
+  }
+
+  public void addAMRuntime(ApplicationId appId, long traceStartTimeMS,
+      long traceEndTimeMS, long simulateStartTimeMS, long simulateEndTimeMS) {
+    try {
+      // write job runtime information
+      StringBuilder sb = new StringBuilder();
+      sb.append(appId).append(",").append(traceStartTimeMS).append(",")
+          .append(traceEndTimeMS).append(",").append(simulateStartTimeMS)
+          .append(",").append(simulateEndTimeMS);
+      jobRuntimeLogBW.write(sb.toString() + EOL);
+      jobRuntimeLogBW.flush();
+    } catch (IOException e) {
+      LOG.info(e.getMessage());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cfe3957/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java
index 962b137..406f050 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java
@@ -17,28 +17,12 @@
  */
 package org.apache.hadoop.yarn.sls.scheduler;
 
-import java.util.Set;
-
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-
-import com.codahale.metrics.MetricRegistry;
 
 @Private
 @Unstable
 public interface SchedulerWrapper {
-
-  MetricRegistry getMetrics();
   SchedulerMetrics getSchedulerMetrics();
-  Set<String> getQueueSet();
-  void setQueueSet(Set<String> queues);
-  Set<String> getTrackedAppSet();
-  void setTrackedAppSet(Set<String> apps);
-  void addTrackedApp(ApplicationId appId, String oldAppId);
-  void removeTrackedApp(String oldAppId);
-  void addAMRuntime(ApplicationId appId,
-      long traceStartTimeMS, long traceEndTimeMS,
-      long simulateStartTimeMS, long simulateEndTimeMS);
-
+  Tracker getTracker();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cfe3957/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/Tracker.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/Tracker.java
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/Tracker.java
new file mode 100644
index 0000000..42a5c3c
--- /dev/null
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/Tracker.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.sls.scheduler;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+import java.util.Set;
+
+@Private
+@Unstable
+public class Tracker {
+  private Set<String> queueSet;
+  private Set<String> trackedAppSet;
+
+  public void setQueueSet(Set<String> queues) {
+    queueSet = queues;
+  }
+
+  public Set<String> getQueueSet() {
+    return queueSet;
+  }
+
+  public void setTrackedAppSet(Set<String> apps) {
+    trackedAppSet = apps;
+  }
+
+  public Set<String> getTrackedAppSet() {
+    return trackedAppSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cfe3957/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
index e5f7cd0..085edc0 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
 @Private
 @Unstable
 public class SLSUtils {
+  public static final int SHUTDOWN_HOOK_PRIORITY = 30;
 
   // hostname includes the network path and the host name. for example
   // "/default-rack/hostFoo" or "/coreSwitchA/TORSwitchB/hostBar".

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cfe3957/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java
index 33d4846..2d2ffc5 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java
@@ -107,12 +107,12 @@ public class SLSWebApp extends HttpServlet {
 
   public SLSWebApp(SchedulerWrapper wrapper, int metricsAddressPort) {
     this.wrapper = wrapper;
-    metrics = wrapper.getMetrics();
     handleOperTimecostHistogramMap =
             new HashMap<SchedulerEventType, Histogram>();
     queueAllocatedMemoryCounterMap = new HashMap<String, Counter>();
     queueAllocatedVCoresCounterMap = new HashMap<String, Counter>();
     schedulerMetrics = wrapper.getSchedulerMetrics();
+    metrics = schedulerMetrics.getMetrics();
     port = metricsAddressPort;
   }
 
@@ -226,7 +226,7 @@ public class SLSWebApp extends HttpServlet {
     response.setStatus(HttpServletResponse.SC_OK);
 
     // queues {0}
-    Set<String> queues = wrapper.getQueueSet();
+    Set<String> queues = wrapper.getTracker().getQueueSet();
     StringBuilder queueInfo = new StringBuilder();
 
     int i = 0;
@@ -265,7 +265,7 @@ public class SLSWebApp extends HttpServlet {
 
     // tracked queues {0}
     StringBuilder trackedQueueInfo = new StringBuilder();
-    Set<String> trackedQueues = wrapper.getQueueSet();
+    Set<String> trackedQueues = wrapper.getTracker().getQueueSet();
     for(String queue : trackedQueues) {
       trackedQueueInfo.append("<option value='Queue ").append(queue)
               .append("'>").append(queue).append("</option>");
@@ -273,7 +273,7 @@ public class SLSWebApp extends HttpServlet {
 
     // tracked apps {1}
     StringBuilder trackedAppInfo = new StringBuilder();
-    Set<String> trackedApps = wrapper.getTrackedAppSet();
+    Set<String> trackedApps = wrapper.getTracker().getTrackedAppSet();
     for(String job : trackedApps) {
       trackedAppInfo.append("<option value='Job ").append(job)
               .append("'>").append(job).append("</option>");
@@ -422,7 +422,7 @@ public class SLSWebApp extends HttpServlet {
     // allocated resource for each queue
     Map<String, Double> queueAllocatedMemoryMap = new HashMap<String, Double>();
     Map<String, Long> queueAllocatedVCoresMap = new HashMap<String, Long>();
-    for (String queue : wrapper.getQueueSet()) {
+    for (String queue : wrapper.getTracker().getQueueSet()) {
       // memory
       String key = "counter.queue." + queue + ".allocated.memory";
       if (! queueAllocatedMemoryCounterMap.containsKey(queue) &&
@@ -462,7 +462,7 @@ public class SLSWebApp extends HttpServlet {
             .append(",\"cluster.available.memory\":").append(availableMemoryGB)
             .append(",\"cluster.available.vcores\":").append(availableVCoresGB);
 
-    for (String queue : wrapper.getQueueSet()) {
+    for (String queue : wrapper.getTracker().getQueueSet()) {
       sb.append(",\"queue.").append(queue).append(".allocated.memory\":")
               .append(queueAllocatedMemoryMap.get(queue));
       sb.append(",\"queue.").append(queue).append(".allocated.vcores\":")

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cfe3957/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
index f0d8e6f..ca3d195 100644
--- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
@@ -22,37 +22,56 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
-import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
-import org.apache.hadoop.yarn.sls.scheduler.FairSchedulerMetrics;
-import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
+import org.apache.hadoop.yarn.sls.scheduler.*;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 
+@RunWith(Parameterized.class)
 public class TestAMSimulator {
   private ResourceManager rm;
   private YarnConfiguration conf;
   private Path metricOutputDir;
 
+  private Class slsScheduler;
+  private Class scheduler;
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][] {
+        {SLSFairScheduler.class, FairScheduler.class},
+        {SLSCapacityScheduler.class, CapacityScheduler.class}
+    });
+  }
+
+  public TestAMSimulator(Class slsScheduler, Class scheduler) {
+    this.slsScheduler = slsScheduler;
+    this.scheduler = scheduler;
+  }
+
   @Before
   public void setup() {
     createMetricOutputDir();
 
     conf = new YarnConfiguration();
     conf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricOutputDir.toString());
-    conf.set(YarnConfiguration.RM_SCHEDULER,
-        "org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper");
-    conf.set(SLSConfiguration.RM_SCHEDULER,
-        "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
+    conf.set(YarnConfiguration.RM_SCHEDULER, slsScheduler.getName());
+    conf.set(SLSConfiguration.RM_SCHEDULER, scheduler.getName());
     conf.setBoolean(SLSConfiguration.METRICS_SWITCH, true);
     rm = new ResourceManager();
     rm.init(conf);
@@ -76,15 +95,17 @@ public class TestAMSimulator {
   }
 
   private void verifySchedulerMetrics(String appId) {
-    SchedulerWrapper schedulerWrapper = (SchedulerWrapper)
-        rm.getResourceScheduler();
-    MetricRegistry metricRegistry = schedulerWrapper.getMetrics();
-    for (FairSchedulerMetrics.Metric metric :
-        FairSchedulerMetrics.Metric.values()) {
-      String key = "variable.app." + appId + "." + metric.getValue()
-          + ".memory";
-      Assert.assertTrue(metricRegistry.getGauges().containsKey(key));
-      Assert.assertNotNull(metricRegistry.getGauges().get(key).getValue());
+    if (scheduler.equals(FairScheduler.class)) {
+      SchedulerMetrics schedulerMetrics = ((SchedulerWrapper)
+          rm.getResourceScheduler()).getSchedulerMetrics();
+      MetricRegistry metricRegistry = schedulerMetrics.getMetrics();
+      for (FairSchedulerMetrics.Metric metric :
+          FairSchedulerMetrics.Metric.values()) {
+        String key = "variable.app." + appId + "." + metric.getValue() +
+            ".memory";
+        Assert.assertTrue(metricRegistry.getGauges().containsKey(key));
+        Assert.assertNotNull(metricRegistry.getGauges().get(key).getValue());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cfe3957/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java
b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java
index f9a3932..2f10f7d 100644
--- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java
@@ -21,26 +21,50 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
+import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler;
+import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(Parameterized.class)
 public class TestNMSimulator {
   private final int GB = 1024;
   private ResourceManager rm;
   private YarnConfiguration conf;
 
+  private Class slsScheduler;
+  private Class scheduler;
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][] {
+        {SLSFairScheduler.class, FairScheduler.class},
+        {SLSCapacityScheduler.class, CapacityScheduler.class}
+    });
+  }
+
+  public TestNMSimulator(Class slsScheduler, Class scheduler) {
+    this.slsScheduler = slsScheduler;
+    this.scheduler = scheduler;
+  }
+
   @Before
   public void setup() {
     conf = new YarnConfiguration();
-    conf.set(YarnConfiguration.RM_SCHEDULER,
-        "org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper");
-    conf.set(SLSConfiguration.RM_SCHEDULER,
-        "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
+    conf.set(YarnConfiguration.RM_SCHEDULER, slsScheduler.getName());
+    conf.set(SLSConfiguration.RM_SCHEDULER, scheduler.getName());
     conf.setBoolean(SLSConfiguration.METRICS_SWITCH, false);
     rm = new ResourceManager();
     rm.init(conf);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message