myriad-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From smare...@apache.org
Subject [05/20] incubator-myriad git commit: spacing changes
Date Wed, 28 Oct 2015 16:07:37 GMT
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/policy/NodeScaleDownPolicy.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/policy/NodeScaleDownPolicy.java b/myriad-scheduler/src/main/java/com/ebay/myriad/policy/NodeScaleDownPolicy.java
index e6db190..e62e228 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/policy/NodeScaleDownPolicy.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/policy/NodeScaleDownPolicy.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,10 +27,11 @@ import java.util.List;
  */
 public interface NodeScaleDownPolicy {
 
-    /**
-     * Apply a scale down policy to the given list of taskIDs.
-     * @param taskIDs
-     */
-    public void apply(List<Protos.TaskID> taskIDs);
+  /**
+   * Apply a scale down policy to the given list of taskIDs.
+   *
+   * @param taskIDs
+   */
+  public void apply(List<Protos.TaskID> taskIDs);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
index f156bd0..bf1ab16 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,22 +27,21 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 
 /**
- * Implementation assumes NM binaries will be downloaded 
+ * Implementation assumes NM binaries will be downloaded
  */
 public class DownloadNMExecutorCLGenImpl extends NMExecutorCLGenImpl {
 
   private static final Logger LOGGER = LoggerFactory.
-    getLogger(DownloadNMExecutorCLGenImpl.class);
+      getLogger(DownloadNMExecutorCLGenImpl.class);
 
   private final String nodeManagerUri;
 
-  public DownloadNMExecutorCLGenImpl(MyriadConfiguration cfg,
-    String nodeManagerUri) {
+  public DownloadNMExecutorCLGenImpl(MyriadConfiguration cfg, String nodeManagerUri) {
     super(cfg);
     this.nodeManagerUri = nodeManagerUri;
   }
 
-@Override
+  @Override
   public String generateCommandLine(ServiceResourceProfile profile, Ports ports) {
     StringBuilder cmdLine = new StringBuilder();
     LOGGER.info("Using remote distribution");
@@ -72,14 +71,12 @@ public class DownloadNMExecutorCLGenImpl extends NMExecutorCLGenImpl {
     //We need the current directory to be writable by frameworkUser for capsuleExecutor to create directories.
     //Best to simply give owenership to the user running the executor but we don't want to use -R as this
     //will silently remove the suid bit on container executor.
-   cmdLine.append(" && sudo chown ").append(cfg.getFrameworkUser().get()).append(" .");
+    cmdLine.append(" && sudo chown ").append(cfg.getFrameworkUser().get()).append(" .");
 
     //Place the hadoop config where in the HADOOP_CONF_DIR where it will be read by the NodeManager
     //The url for the resource manager config is: http(s)://hostname:port/conf so fetcher.cpp downloads the
     //config file to conf, It's an xml file with the parameters of yarn-site.xml, core-site.xml and hdfs.xml.
-    cmdLine.append(" && cp conf ")
-      .append(cfg.getYarnEnvironment().get("YARN_HOME"))
-      .append("/etc/hadoop/yarn-site.xml;");
+    cmdLine.append(" && cp conf ").append(cfg.getYarnEnvironment().get("YARN_HOME")).append("/etc/hadoop/yarn-site.xml;");
   }
 
   protected void appendUser(StringBuilder cmdLine) {
@@ -89,12 +86,11 @@ public class DownloadNMExecutorCLGenImpl extends NMExecutorCLGenImpl {
   private static String getFileName(String uri) {
     int lastSlash = uri.lastIndexOf('/');
     if (lastSlash == -1) {
-        return uri;
+      return uri;
     } else {
-        String fileName = uri.substring(lastSlash + 1);
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(fileName),
-          "URI should not have a slash at the end");
-        return fileName;
+      String fileName = uri.substring(lastSlash + 1);
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(fileName), "URI should not have a slash at the end");
+      return fileName;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExecutorCommandLineGenerator.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExecutorCommandLineGenerator.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExecutorCommandLineGenerator.java
index 9dcb2bf..e5cd43b 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExecutorCommandLineGenerator.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExecutorCommandLineGenerator.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,10 +20,10 @@
 package com.ebay.myriad.scheduler;
 
 /**
- * Interface to plugin multiple implementations for executor command generation  
+ * Interface to plugin multiple implementations for executor command generation
  */
 public interface ExecutorCommandLineGenerator {
-    String generateCommandLine(ServiceResourceProfile profile, Ports ports);
-    
-    String getConfigurationUrl();
+  String generateCommandLine(ServiceResourceProfile profile, Ports ports);
+
+  String getConfigurationUrl();
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExtendedResourceProfile.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExtendedResourceProfile.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExtendedResourceProfile.java
index 114b611..9e2c063 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExtendedResourceProfile.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExtendedResourceProfile.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,18 +23,15 @@ import com.google.gson.Gson;
 /**
  * Extended ServiceResourceProfile for services that need to pass set of resources downstream
  * currently the only such service is NodeManager
- *
  */
 public class ExtendedResourceProfile extends ServiceResourceProfile {
 
   private NMProfile childProfile;
 
   /**
-   * 
    * @param childProfile - should be null
    * @param cpu
-   * @param mem
-   * will throw NullPoiterException if childProfile is null
+   * @param mem          will throw NullPoiterException if childProfile is null
    */
   public ExtendedResourceProfile(NMProfile childProfile, Double cpu, Double mem) {
     super(childProfile.getName(), cpu, mem);
@@ -49,7 +46,7 @@ public class ExtendedResourceProfile extends ServiceResourceProfile {
   public void setChildProfile(NMProfile nmProfile) {
     this.childProfile = nmProfile;
   }
-  
+
   @Override
   public String getName() {
     return childProfile.getName();
@@ -69,7 +66,7 @@ public class ExtendedResourceProfile extends ServiceResourceProfile {
   public Double getAggregateMemory() {
     return memory + childProfile.getMemory();
   }
-  
+
   @Override
   public Double getAggregateCpu() {
     return cpus + childProfile.getCpus();
@@ -77,7 +74,7 @@ public class ExtendedResourceProfile extends ServiceResourceProfile {
 
   @Override
   public String toString() {
-      Gson gson = new Gson();
-      return gson.toJson(this);
+    Gson gson = new Gson();
+    return gson.toJson(this);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadDriver.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadDriver.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadDriver.java
index 4aa4ee3..83c4cd2 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadDriver.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadDriver.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,37 +30,37 @@ import org.slf4j.LoggerFactory;
  * Driver for Myriad scheduler.
  */
 public class MyriadDriver {
-    private static final Logger LOGGER = LoggerFactory.getLogger(MyriadDriver.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(MyriadDriver.class);
 
-    private final SchedulerDriver driver;
+  private final SchedulerDriver driver;
 
-    @Inject
-    public MyriadDriver(SchedulerDriver driver) {
-      this.driver = driver;
-    }
+  @Inject
+  public MyriadDriver(SchedulerDriver driver) {
+    this.driver = driver;
+  }
 
-    public Status start() {
-        LOGGER.info("Starting driver");
-        Status status = driver.start();
-        LOGGER.info("Driver started with status: {}", status);
-        return status;
-    }
+  public Status start() {
+    LOGGER.info("Starting driver");
+    Status status = driver.start();
+    LOGGER.info("Driver started with status: {}", status);
+    return status;
+  }
 
-    public Status kill(final TaskID taskId) {
-        LOGGER.info("Killing task {}", taskId);
-        Status status = driver.killTask(taskId);
-        LOGGER.info("Task {} killed with status: {}", taskId, status);
-        return status;
-    }
+  public Status kill(final TaskID taskId) {
+    LOGGER.info("Killing task {}", taskId);
+    Status status = driver.killTask(taskId);
+    LOGGER.info("Task {} killed with status: {}", taskId, status);
+    return status;
+  }
 
-    public Status abort() {
-        LOGGER.info("Aborting driver");
-        Status status = driver.abort();
-        LOGGER.info("Driver aborted with status: {}", status);
-        return status;
-    }
+  public Status abort() {
+    LOGGER.info("Aborting driver");
+    Status status = driver.abort();
+    LOGGER.info("Driver aborted with status: {}", status);
+    return status;
+  }
 
-    public SchedulerDriver getDriver() {
-        return driver;
-    }
+  public SchedulerDriver getDriver() {
+    return driver;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadDriverManager.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadDriverManager.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadDriverManager.java
index d4e19d6..4d669f4 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadDriverManager.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadDriverManager.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -33,74 +33,71 @@ import java.util.concurrent.locks.ReentrantLock;
  * Manager for the myriad scheduler driver
  */
 public class MyriadDriverManager {
-    private static final Logger LOGGER = LoggerFactory.getLogger(MyriadDriverManager.class);
-    private final Lock driverLock;
-    private MyriadDriver driver;
-    private Status driverStatus;
+  private static final Logger LOGGER = LoggerFactory.getLogger(MyriadDriverManager.class);
+  private final Lock driverLock;
+  private MyriadDriver driver;
+  private Status driverStatus;
 
-    @Inject
-    public MyriadDriverManager(MyriadDriver driver) {
-        this.driver = driver;
-        this.driverLock = new ReentrantLock();
-        this.driverStatus = Protos.Status.DRIVER_NOT_STARTED;
-    }
+  @Inject
+  public MyriadDriverManager(MyriadDriver driver) {
+    this.driver = driver;
+    this.driverLock = new ReentrantLock();
+    this.driverStatus = Protos.Status.DRIVER_NOT_STARTED;
+  }
 
-    public Status startDriver() {
-        this.driverLock.lock();
-        try {
-            Preconditions.checkState(this.isStartable());
-            LOGGER.info("Starting driver...");
-            this.driverStatus = driver.start();
-            LOGGER.info("Driver started with status: {}", this.driverStatus);
-        } finally {
-            this.driverLock.unlock();
-        }
-        return this.driverStatus;
+  public Status startDriver() {
+    this.driverLock.lock();
+    try {
+      Preconditions.checkState(this.isStartable());
+      LOGGER.info("Starting driver...");
+      this.driverStatus = driver.start();
+      LOGGER.info("Driver started with status: {}", this.driverStatus);
+    } finally {
+      this.driverLock.unlock();
     }
+    return this.driverStatus;
+  }
 
-    public Status stopDriver() {
-        this.driverLock.lock();
-        try {
-            if (isRunning()) {
-                LOGGER.info("Aborting driver...");
-                this.driverStatus = this.driver.abort();
-                LOGGER.info("Aborted driver with status: {}", this.driverStatus);
-            }
-        } finally {
-            this.driverLock.unlock();
-        }
-        return driverStatus;
+  public Status stopDriver() {
+    this.driverLock.lock();
+    try {
+      if (isRunning()) {
+        LOGGER.info("Aborting driver...");
+        this.driverStatus = this.driver.abort();
+        LOGGER.info("Aborted driver with status: {}", this.driverStatus);
+      }
+    } finally {
+      this.driverLock.unlock();
     }
+    return driverStatus;
+  }
 
-    public Status kill(final TaskID taskId) {
-        LOGGER.info("Killing task {}", taskId);
-        this.driverLock.lock();
-        try {
-            if (isRunning()) {
-                this.driverStatus = driver.kill(taskId);
-                LOGGER.info("Task {} killed with status: {}", taskId,
-                        this.driverStatus);
-            } else {
-                LOGGER.warn("Cannot kill task, driver is not running");
-            }
-        } finally {
-            this.driverLock.unlock();
-        }
-
-        return driverStatus;
+  public Status kill(final TaskID taskId) {
+    LOGGER.info("Killing task {}", taskId);
+    this.driverLock.lock();
+    try {
+      if (isRunning()) {
+        this.driverStatus = driver.kill(taskId);
+        LOGGER.info("Task {} killed with status: {}", taskId, this.driverStatus);
+      } else {
+        LOGGER.warn("Cannot kill task, driver is not running");
+      }
+    } finally {
+      this.driverLock.unlock();
     }
 
-    public Status getDriverStatus() {
-        return this.driverStatus;
-    }
+    return driverStatus;
+  }
 
-    private boolean isStartable() {
-        return this.driver != null
-                && this.driverStatus == Status.DRIVER_NOT_STARTED;
-    }
+  public Status getDriverStatus() {
+    return this.driverStatus;
+  }
 
-    private boolean isRunning() {
-        return this.driver != null
-                && this.driverStatus == Status.DRIVER_RUNNING;
-    }
+  private boolean isStartable() {
+    return this.driver != null && this.driverStatus == Status.DRIVER_NOT_STARTED;
+  }
+
+  private boolean isRunning() {
+    return this.driver != null && this.driverStatus == Status.DRIVER_RUNNING;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
index 903120a..25369df 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -43,182 +43,169 @@ import java.util.Set;
  * Myriad scheduler operations
  */
 public class MyriadOperations {
-    private static final Logger LOGGER = LoggerFactory.getLogger(MyriadOperations.class);
-    private final SchedulerState schedulerState;
-
-    private MyriadConfiguration cfg;
-    private NodeScaleDownPolicy nodeScaleDownPolicy;
-
-    @Inject
-    public MyriadOperations(MyriadConfiguration cfg,
-                            SchedulerState schedulerState,
-                            NodeScaleDownPolicy nodeScaleDownPolicy) {
-      this.cfg = cfg;
-      this.schedulerState = schedulerState;
-      this.nodeScaleDownPolicy = nodeScaleDownPolicy;
-    }
+  private static final Logger LOGGER = LoggerFactory.getLogger(MyriadOperations.class);
+  private final SchedulerState schedulerState;
 
-    public void flexUpCluster(ServiceResourceProfile serviceResourceProfile, int instances, Constraint constraint) {
-        Collection<NodeTask> nodes = new HashSet<>();
-        for (int i = 0; i < instances; i++) {
-          NodeTask nodeTask = new NodeTask(serviceResourceProfile, constraint);
-          nodeTask.setTaskPrefix(NodeManagerConfiguration.NM_TASK_PREFIX);
-          nodes.add(nodeTask);
-        }
+  private MyriadConfiguration cfg;
+  private NodeScaleDownPolicy nodeScaleDownPolicy;
 
-        LOGGER.info("Adding {} NM instances to cluster", nodes.size());
-        this.schedulerState.addNodes(nodes);
+  @Inject
+  public MyriadOperations(MyriadConfiguration cfg, SchedulerState schedulerState, NodeScaleDownPolicy nodeScaleDownPolicy) {
+    this.cfg = cfg;
+    this.schedulerState = schedulerState;
+    this.nodeScaleDownPolicy = nodeScaleDownPolicy;
+  }
+
+  public void flexUpCluster(ServiceResourceProfile serviceResourceProfile, int instances, Constraint constraint) {
+    Collection<NodeTask> nodes = new HashSet<>();
+    for (int i = 0; i < instances; i++) {
+      NodeTask nodeTask = new NodeTask(serviceResourceProfile, constraint);
+      nodeTask.setTaskPrefix(NodeManagerConfiguration.NM_TASK_PREFIX);
+      nodes.add(nodeTask);
     }
 
-    public void flexDownCluster(ServiceResourceProfile serviceResourceProfile, Constraint constraint, int numInstancesToScaleDown) {
-        // Flex down Pending tasks, if any
-        int numPendingTasksScaledDown = flexDownPendingTasks(
-            serviceResourceProfile, constraint, numInstancesToScaleDown);
-
-        // Flex down Staging tasks, if any
-        int numStagingTasksScaledDown = flexDownStagingTasks(
-            serviceResourceProfile, constraint, numInstancesToScaleDown - numPendingTasksScaledDown);
-
-        // Flex down Active tasks, if any
-        int numActiveTasksScaledDown = flexDownActiveTasks(
-            serviceResourceProfile, constraint, numInstancesToScaleDown - numPendingTasksScaledDown - numStagingTasksScaledDown);
-
-        if (numActiveTasksScaledDown + numStagingTasksScaledDown + numPendingTasksScaledDown == 0) {
-          LOGGER.info("No Node Managers with profile '{}' and constraint '{}' found for scaling down.",
-              serviceResourceProfile.getName(), constraint == null ? "null" : constraint.toString());
-        } else {
-          LOGGER.info("Flexed down {} active, {} staging  and {} pending Node Managers with " +
-              "'{}' profile and constraint '{}'.", numActiveTasksScaledDown, numStagingTasksScaledDown,
-              numPendingTasksScaledDown, serviceResourceProfile.getName(), constraint == null ? "null" : constraint.toString());
-        }
+    LOGGER.info("Adding {} NM instances to cluster", nodes.size());
+    this.schedulerState.addNodes(nodes);
+  }
+
+  public void flexDownCluster(ServiceResourceProfile serviceResourceProfile, Constraint constraint, int numInstancesToScaleDown) {
+    // Flex down Pending tasks, if any
+    int numPendingTasksScaledDown = flexDownPendingTasks(serviceResourceProfile, constraint, numInstancesToScaleDown);
+
+    // Flex down Staging tasks, if any
+    int numStagingTasksScaledDown = flexDownStagingTasks(serviceResourceProfile, constraint, numInstancesToScaleDown - numPendingTasksScaledDown);
+
+    // Flex down Active tasks, if any
+    int numActiveTasksScaledDown = flexDownActiveTasks(serviceResourceProfile, constraint, numInstancesToScaleDown - numPendingTasksScaledDown - numStagingTasksScaledDown);
+
+    if (numActiveTasksScaledDown + numStagingTasksScaledDown + numPendingTasksScaledDown == 0) {
+      LOGGER.info("No Node Managers with profile '{}' and constraint '{}' found for scaling down.", serviceResourceProfile.getName(), constraint == null ? "null" : constraint.toString());
+    } else {
+      LOGGER.info("Flexed down {} active, {} staging  and {} pending Node Managers with " + "'{}' profile and constraint '{}'.", numActiveTasksScaledDown, numStagingTasksScaledDown, numPendingTasksScaledDown, serviceResourceProfile.getName(),
+          constraint == null ? "null" : constraint.toString());
     }
+  }
 
-    /**
-     * Flexup a service
-     * @param instances
-     * @param serviceName
-     */
-    public void flexUpAService(int instances, String serviceName) throws MyriadBadConfigurationException {
-      final ServiceConfiguration auxTaskConf = cfg.getServiceConfiguration(serviceName);
-      
-      int totalflexInstances = instances + getFlexibleInstances(serviceName);
-      Integer maxInstances = auxTaskConf.getMaxInstances().orNull();
-      if (maxInstances != null && maxInstances > 0) {
-        // check number of instances
-        // sum of active, staging, pending should be < maxInstances
-        if (totalflexInstances > maxInstances) {
-          LOGGER.error("Current number of active, staging, pending and requested instances: {}"
-              + ", while it is greater then max instances allowed: {}", totalflexInstances, maxInstances);
-            throw new MyriadBadConfigurationException("Current number of active, staging, pending instances and requested: "
-            + totalflexInstances + ", while it is greater then max instances allowed: " + maxInstances);          
-        }
+  /**
+   * Flexup a service
+   *
+   * @param instances
+   * @param serviceName
+   */
+  public void flexUpAService(int instances, String serviceName) throws MyriadBadConfigurationException {
+    final ServiceConfiguration auxTaskConf = cfg.getServiceConfiguration(serviceName);
+
+    int totalflexInstances = instances + getFlexibleInstances(serviceName);
+    Integer maxInstances = auxTaskConf.getMaxInstances().orNull();
+    if (maxInstances != null && maxInstances > 0) {
+      // check number of instances
+      // sum of active, staging, pending should be < maxInstances
+      if (totalflexInstances > maxInstances) {
+        LOGGER.error("Current number of active, staging, pending and requested instances: {}" + ", while it is greater then max instances allowed: {}", totalflexInstances, maxInstances);
+        throw new MyriadBadConfigurationException("Current number of active, staging, pending instances and requested: " + totalflexInstances + ", while it is greater then max instances allowed: " + maxInstances);
       }
+    }
 
-      final Double cpu = auxTaskConf.getCpus().or(ServiceConfiguration.DEFAULT_CPU);
-      final Double mem = auxTaskConf.getJvmMaxMemoryMB().or(ServiceConfiguration.DEFAULT_MEMORY);
-      
-      Collection<NodeTask> nodes = new HashSet<>();
-      for (int i = 0; i < instances; i++) {
-        NodeTask nodeTask = new NodeTask(new ServiceResourceProfile(serviceName, cpu, mem), null);
-        nodeTask.setTaskPrefix(serviceName);
-        nodes.add(nodeTask);
-      }
+    final Double cpu = auxTaskConf.getCpus().or(ServiceConfiguration.DEFAULT_CPU);
+    final Double mem = auxTaskConf.getJvmMaxMemoryMB().or(ServiceConfiguration.DEFAULT_MEMORY);
 
-      LOGGER.info("Adding {} {} instances to cluster", nodes.size(), serviceName);
-      this.schedulerState.addNodes(nodes);
+    Collection<NodeTask> nodes = new HashSet<>();
+    for (int i = 0; i < instances; i++) {
+      NodeTask nodeTask = new NodeTask(new ServiceResourceProfile(serviceName, cpu, mem), null);
+      nodeTask.setTaskPrefix(serviceName);
+      nodes.add(nodeTask);
     }
-    
-    /**
-     * Flexing down any service defined in the configuration
-     * @param numInstancesToScaleDown
-     * @param serviceName - name of the service
-     */
-    public void flexDownAService(int numInstancesToScaleDown, String serviceName) {
-      LOGGER.info("About to flex down {} instances of {}", numInstancesToScaleDown, serviceName);
-
-      int numScaledDown = 0;
-      
-      // Flex down Pending tasks, if any
-      if (numScaledDown < numInstancesToScaleDown) {
-        Collection<Protos.TaskID> pendingTasks = this.schedulerState.getPendingTaskIds(serviceName);
-
-        for (Protos.TaskID taskId : pendingTasks) {
-            this.schedulerState.makeTaskKillable(taskId);
-            numScaledDown++;
-            if (numScaledDown >= numInstancesToScaleDown) {
-                break;
-            }
+
+    LOGGER.info("Adding {} {} instances to cluster", nodes.size(), serviceName);
+    this.schedulerState.addNodes(nodes);
+  }
+
+  /**
+   * Flexing down any service defined in the configuration
+   *
+   * @param numInstancesToScaleDown
+   * @param serviceName             - name of the service
+   */
+  public void flexDownAService(int numInstancesToScaleDown, String serviceName) {
+    LOGGER.info("About to flex down {} instances of {}", numInstancesToScaleDown, serviceName);
+
+    int numScaledDown = 0;
+
+    // Flex down Pending tasks, if any
+    if (numScaledDown < numInstancesToScaleDown) {
+      Collection<Protos.TaskID> pendingTasks = this.schedulerState.getPendingTaskIds(serviceName);
+
+      for (Protos.TaskID taskId : pendingTasks) {
+        this.schedulerState.makeTaskKillable(taskId);
+        numScaledDown++;
+        if (numScaledDown >= numInstancesToScaleDown) {
+          break;
         }
       }
-      int numPendingTasksScaledDown = numScaledDown;
-      
-      // Flex down Staging tasks, if any
-      if (numScaledDown < numInstancesToScaleDown) {
-          Collection<Protos.TaskID> stagingTasks = this.schedulerState.getStagingTaskIds(serviceName);
-
-          for (Protos.TaskID taskId : stagingTasks) {
-              this.schedulerState.makeTaskKillable(taskId);
-              numScaledDown++;
-              if (numScaledDown >= numInstancesToScaleDown) {
-                  break;
-              }
-          }
-      }
-      int numStagingTasksScaledDown = numScaledDown - numPendingTasksScaledDown;
-
-      Set<NodeTask> activeTasks = this.schedulerState.getActiveTasksByType(serviceName);
-      if (numScaledDown < numInstancesToScaleDown) {
-        for (NodeTask nodeTask : activeTasks) {
-          this.schedulerState.makeTaskKillable(nodeTask.getTaskStatus().getTaskId());
-          numScaledDown++;
-          if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Marked NodeTask {} on host {} for kill.",
-                nodeTask.getTaskStatus().getTaskId(), nodeTask.getHostname());
-          }
-          if (numScaledDown >= numInstancesToScaleDown) {
-            break;
-          }
+    }
+    int numPendingTasksScaledDown = numScaledDown;
+
+    // Flex down Staging tasks, if any
+    if (numScaledDown < numInstancesToScaleDown) {
+      Collection<Protos.TaskID> stagingTasks = this.schedulerState.getStagingTaskIds(serviceName);
+
+      for (Protos.TaskID taskId : stagingTasks) {
+        this.schedulerState.makeTaskKillable(taskId);
+        numScaledDown++;
+        if (numScaledDown >= numInstancesToScaleDown) {
+          break;
         }
       }
-      
-      LOGGER.info("Flexed down {} of {} instances including {} staging instances, and {} pending instances of {}",
-              numScaledDown, numInstancesToScaleDown, numStagingTasksScaledDown, numPendingTasksScaledDown, serviceName);
     }
-    
-    private int flexDownPendingTasks(ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) {
-      return numInstancesToScaleDown > 0 ? flexDownTasks(schedulerState.getPendingTaskIDsForProfile(profile),
-          profile, constraint, numInstancesToScaleDown) : 0;
+    int numStagingTasksScaledDown = numScaledDown - numPendingTasksScaledDown;
+
+    Set<NodeTask> activeTasks = this.schedulerState.getActiveTasksByType(serviceName);
+    if (numScaledDown < numInstancesToScaleDown) {
+      for (NodeTask nodeTask : activeTasks) {
+        this.schedulerState.makeTaskKillable(nodeTask.getTaskStatus().getTaskId());
+        numScaledDown++;
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug("Marked NodeTask {} on host {} for kill.", nodeTask.getTaskStatus().getTaskId(), nodeTask.getHostname());
+        }
+        if (numScaledDown >= numInstancesToScaleDown) {
+          break;
+        }
+      }
     }
 
-    private int flexDownStagingTasks(ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) {
-      return numInstancesToScaleDown > 0 ? flexDownTasks(schedulerState.getStagingTaskIDsForProfile(profile),
-          profile, constraint, numInstancesToScaleDown) : 0;
-    }
+    LOGGER.info("Flexed down {} of {} instances including {} staging instances, and {} pending instances of {}", numScaledDown, numInstancesToScaleDown, numStagingTasksScaledDown, numPendingTasksScaledDown, serviceName);
+  }
 
-    private int flexDownActiveTasks(ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) {
-      if (numInstancesToScaleDown > 0) {
-        List<Protos.TaskID> activeTasksForProfile = Lists.newArrayList(schedulerState.getActiveTaskIDsForProfile(profile));
-        nodeScaleDownPolicy.apply(activeTasksForProfile);
-        return flexDownTasks(activeTasksForProfile, profile, constraint, numInstancesToScaleDown);
-      }
-      return 0;
+  private int flexDownPendingTasks(ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) {
+    return numInstancesToScaleDown > 0 ? flexDownTasks(schedulerState.getPendingTaskIDsForProfile(profile), profile, constraint, numInstancesToScaleDown) : 0;
+  }
+
+  private int flexDownStagingTasks(ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) {
+    return numInstancesToScaleDown > 0 ? flexDownTasks(schedulerState.getStagingTaskIDsForProfile(profile), profile, constraint, numInstancesToScaleDown) : 0;
+  }
+
+  private int flexDownActiveTasks(ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) {
+    if (numInstancesToScaleDown > 0) {
+      List<Protos.TaskID> activeTasksForProfile = Lists.newArrayList(schedulerState.getActiveTaskIDsForProfile(profile));
+      nodeScaleDownPolicy.apply(activeTasksForProfile);
+      return flexDownTasks(activeTasksForProfile, profile, constraint, numInstancesToScaleDown);
     }
+    return 0;
+  }
 
-  private int flexDownTasks(Collection<Protos.TaskID> taskIDs, ServiceResourceProfile profile,
-                              Constraint constraint, int numInstancesToScaleDown) {
-      int numInstancesScaledDown = 0;
-      for (Protos.TaskID taskID : taskIDs) {
-        NodeTask nodeTask = schedulerState.getTask(taskID);
-        if (nodeTask.getProfile().getName().equals(profile.getName()) &&
-            meetsConstraint(nodeTask, constraint)) {
-          this.schedulerState.makeTaskKillable(taskID);
-          numInstancesScaledDown++;
-          if (numInstancesScaledDown == numInstancesToScaleDown) {
-            break;
-          }
+  private int flexDownTasks(Collection<Protos.TaskID> taskIDs, ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) {
+    int numInstancesScaledDown = 0;
+    for (Protos.TaskID taskID : taskIDs) {
+      NodeTask nodeTask = schedulerState.getTask(taskID);
+      if (nodeTask.getProfile().getName().equals(profile.getName()) && meetsConstraint(nodeTask, constraint)) {
+        this.schedulerState.makeTaskKillable(taskID);
+        numInstancesScaledDown++;
+        if (numInstancesScaledDown == numInstancesToScaleDown) {
+          break;
         }
       }
-      return numInstancesScaledDown;
+    }
+    return numInstancesScaledDown;
   }
 
   private boolean meetsConstraint(NodeTask nodeTask, Constraint constraint) {
@@ -240,12 +227,10 @@ public class MyriadOperations {
       }
     }
     return true;
-  } 
- 
+  }
+
   public Integer getFlexibleInstances(String taskPrefix) {
-      return this.schedulerState.getActiveTaskIds(taskPrefix).size()
-              + this.schedulerState.getStagingTaskIds(taskPrefix).size()
-              + this.schedulerState.getPendingTaskIds(taskPrefix).size();
+    return this.schedulerState.getActiveTaskIds(taskPrefix).size() + this.schedulerState.getStagingTaskIds(taskPrefix).size() + this.schedulerState.getPendingTaskIds(taskPrefix).size();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadScheduler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadScheduler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadScheduler.java
index 62ee4c6..db44648 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadScheduler.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadScheduler.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -42,155 +42,124 @@ import java.util.List;
  * Myriad Scheduler
  */
 public class MyriadScheduler implements Scheduler {
-    private DisruptorManager disruptorManager;
+  private DisruptorManager disruptorManager;
 
-    @Inject
-    public MyriadScheduler(final MyriadConfiguration cfg,
-                           final DisruptorManager disruptorManager) {
-        this.disruptorManager = disruptorManager;
-    }
+  @Inject
+  public MyriadScheduler(final MyriadConfiguration cfg, final DisruptorManager disruptorManager) {
+    this.disruptorManager = disruptorManager;
+  }
 
-    @Override
-    public void registered(final SchedulerDriver driver,
-                           final Protos.FrameworkID frameworkId,
-                           final Protos.MasterInfo masterInfo) {
-        disruptorManager.getRegisteredEventDisruptor().publishEvent(
-                new EventTranslator<RegisteredEvent>() {
-                    @Override
-                    public void translateTo(RegisteredEvent event, long sequence) {
-                        event.setDriver(driver);
-                        event.setFrameworkId(frameworkId);
-                        event.setMasterInfo(masterInfo);
-                    }
-                });
-    }
+  @Override
+  public void registered(final SchedulerDriver driver, final Protos.FrameworkID frameworkId, final Protos.MasterInfo masterInfo) {
+    disruptorManager.getRegisteredEventDisruptor().publishEvent(new EventTranslator<RegisteredEvent>() {
+      @Override
+      public void translateTo(RegisteredEvent event, long sequence) {
+        event.setDriver(driver);
+        event.setFrameworkId(frameworkId);
+        event.setMasterInfo(masterInfo);
+      }
+    });
+  }
 
-    @Override
-    public void reregistered(final SchedulerDriver driver,
-                             final Protos.MasterInfo masterInfo) {
-        disruptorManager.getReRegisteredEventDisruptor().publishEvent(
-                new EventTranslator<ReRegisteredEvent>() {
-                    @Override
-                    public void translateTo(ReRegisteredEvent event,
-                                            long sequence) {
-                        event.setDriver(driver);
-                        event.setMasterInfo(masterInfo);
-                    }
-                });
-    }
+  @Override
+  public void reregistered(final SchedulerDriver driver, final Protos.MasterInfo masterInfo) {
+    disruptorManager.getReRegisteredEventDisruptor().publishEvent(new EventTranslator<ReRegisteredEvent>() {
+      @Override
+      public void translateTo(ReRegisteredEvent event, long sequence) {
+        event.setDriver(driver);
+        event.setMasterInfo(masterInfo);
+      }
+    });
+  }
 
-    @Override
-    public void resourceOffers(final SchedulerDriver driver,
-                               final List<Protos.Offer> offers) {
-        disruptorManager.getResourceOffersEventDisruptor().publishEvent(
-                new EventTranslator<ResourceOffersEvent>() {
-                    @Override
-                    public void translateTo(ResourceOffersEvent event,
-                                            long sequence) {
-                        event.setDriver(driver);
-                        event.setOffers(offers);
-                    }
-                });
-    }
+  @Override
+  public void resourceOffers(final SchedulerDriver driver, final List<Protos.Offer> offers) {
+    disruptorManager.getResourceOffersEventDisruptor().publishEvent(new EventTranslator<ResourceOffersEvent>() {
+      @Override
+      public void translateTo(ResourceOffersEvent event, long sequence) {
+        event.setDriver(driver);
+        event.setOffers(offers);
+      }
+    });
+  }
 
-    @Override
-    public void offerRescinded(final SchedulerDriver driver,
-                               final Protos.OfferID offerId) {
-        disruptorManager.getOfferRescindedEventDisruptor().publishEvent(
-                new EventTranslator<OfferRescindedEvent>() {
-                    @Override
-                    public void translateTo(OfferRescindedEvent event,
-                                            long sequence) {
-                        event.setDriver(driver);
-                        event.setOfferId(offerId);
-                    }
-                });
-    }
+  @Override
+  public void offerRescinded(final SchedulerDriver driver, final Protos.OfferID offerId) {
+    disruptorManager.getOfferRescindedEventDisruptor().publishEvent(new EventTranslator<OfferRescindedEvent>() {
+      @Override
+      public void translateTo(OfferRescindedEvent event, long sequence) {
+        event.setDriver(driver);
+        event.setOfferId(offerId);
+      }
+    });
+  }
 
-    @Override
-    public void statusUpdate(final SchedulerDriver driver,
-                             final Protos.TaskStatus status) {
-        disruptorManager.getStatusUpdateEventDisruptor().publishEvent(
-                new EventTranslator<StatusUpdateEvent>() {
-                    @Override
-                    public void translateTo(StatusUpdateEvent event,
-                                            long sequence) {
-                        event.setDriver(driver);
-                        event.setStatus(status);
-                    }
-                });
-    }
+  @Override
+  public void statusUpdate(final SchedulerDriver driver, final Protos.TaskStatus status) {
+    disruptorManager.getStatusUpdateEventDisruptor().publishEvent(new EventTranslator<StatusUpdateEvent>() {
+      @Override
+      public void translateTo(StatusUpdateEvent event, long sequence) {
+        event.setDriver(driver);
+        event.setStatus(status);
+      }
+    });
+  }
 
-    @Override
-    public void frameworkMessage(final SchedulerDriver driver,
-                                 final Protos.ExecutorID executorId,
-                                 final Protos.SlaveID slaveId,
-                                 final byte[] bytes) {
-        disruptorManager.getFrameworkMessageEventDisruptor().publishEvent(
-                new EventTranslator<FrameworkMessageEvent>() {
-                    @Override
-                    public void translateTo(FrameworkMessageEvent event,
-                                            long sequence) {
-                        event.setDriver(driver);
-                        event.setBytes(bytes);
-                        event.setExecutorId(executorId);
-                        event.setSlaveId(slaveId);
-                    }
-                });
-    }
+  @Override
+  public void frameworkMessage(final SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final byte[] bytes) {
+    disruptorManager.getFrameworkMessageEventDisruptor().publishEvent(new EventTranslator<FrameworkMessageEvent>() {
+      @Override
+      public void translateTo(FrameworkMessageEvent event, long sequence) {
+        event.setDriver(driver);
+        event.setBytes(bytes);
+        event.setExecutorId(executorId);
+        event.setSlaveId(slaveId);
+      }
+    });
+  }
 
-    @Override
-    public void disconnected(final SchedulerDriver driver) {
-        disruptorManager.getDisconnectedEventDisruptor().publishEvent(
-                new EventTranslator<DisconnectedEvent>() {
-                    @Override
-                    public void translateTo(DisconnectedEvent event,
-                                            long sequence) {
-                        event.setDriver(driver);
-                    }
-                });
-    }
+  @Override
+  public void disconnected(final SchedulerDriver driver) {
+    disruptorManager.getDisconnectedEventDisruptor().publishEvent(new EventTranslator<DisconnectedEvent>() {
+      @Override
+      public void translateTo(DisconnectedEvent event, long sequence) {
+        event.setDriver(driver);
+      }
+    });
+  }
 
-    @Override
-    public void slaveLost(final SchedulerDriver driver,
-                          final Protos.SlaveID slaveId) {
-        disruptorManager.getSlaveLostEventDisruptor().publishEvent(
-                new EventTranslator<SlaveLostEvent>() {
-                    @Override
-                    public void translateTo(SlaveLostEvent event, long sequence) {
-                        event.setDriver(driver);
-                        event.setSlaveId(slaveId);
-                    }
-                });
-    }
+  @Override
+  public void slaveLost(final SchedulerDriver driver, final Protos.SlaveID slaveId) {
+    disruptorManager.getSlaveLostEventDisruptor().publishEvent(new EventTranslator<SlaveLostEvent>() {
+      @Override
+      public void translateTo(SlaveLostEvent event, long sequence) {
+        event.setDriver(driver);
+        event.setSlaveId(slaveId);
+      }
+    });
+  }
 
-    @Override
-    public void executorLost(final SchedulerDriver driver,
-                             final Protos.ExecutorID executorId,
-                             final Protos.SlaveID slaveId,
-                             final int exitStatus) {
-        disruptorManager.getExecutorLostEventDisruptor().publishEvent(
-                new EventTranslator<ExecutorLostEvent>() {
-                    @Override
-                    public void translateTo(ExecutorLostEvent event,
-                                            long sequence) {
-                        event.setDriver(driver);
-                        event.setExecutorId(executorId);
-                        event.setSlaveId(slaveId);
-                        event.setExitStatus(exitStatus);
-                    }
-                });
-    }
+  @Override
+  public void executorLost(final SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final int exitStatus) {
+    disruptorManager.getExecutorLostEventDisruptor().publishEvent(new EventTranslator<ExecutorLostEvent>() {
+      @Override
+      public void translateTo(ExecutorLostEvent event, long sequence) {
+        event.setDriver(driver);
+        event.setExecutorId(executorId);
+        event.setSlaveId(slaveId);
+        event.setExitStatus(exitStatus);
+      }
+    });
+  }
 
-    @Override
-    public void error(final SchedulerDriver driver, final String message) {
-        disruptorManager.getErrorEventDisruptor().publishEvent(
-                new EventTranslator<ErrorEvent>() {
-                    @Override
-                    public void translateTo(ErrorEvent event, long sequence) {
-                        event.setDriver(driver);
-                        event.setMessage(message);
-                    }
-                });
-    }
+  @Override
+  public void error(final SchedulerDriver driver, final String message) {
+    disruptorManager.getErrorEventDisruptor().publishEvent(new EventTranslator<ErrorEvent>() {
+      @Override
+      public void translateTo(ErrorEvent event, long sequence) {
+        event.setDriver(driver);
+        event.setMessage(message);
+      }
+    });
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java
index aae2098..55bdf79 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,68 +29,48 @@ import org.slf4j.LoggerFactory;
 import com.ebay.myriad.configuration.MyriadConfiguration;
 
 /**
- * Implementation assumes NM binaries already deployed 
+ * Implementation assumes NM binaries already deployed
  */
 public class NMExecutorCLGenImpl implements ExecutorCommandLineGenerator {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(NMExecutorCLGenImpl.class);
 
-  public static final String ENV_YARN_NODEMANAGER_OPTS =
-    "YARN_NODEMANAGER_OPTS";
-  public static final String KEY_YARN_NM_CGROUPS_PATH =
-    "yarn.nodemanager.cgroups.path";
-  public static final String KEY_YARN_RM_HOSTNAME =
-    "yarn.resourcemanager.hostname";
+  public static final String ENV_YARN_NODEMANAGER_OPTS = "YARN_NODEMANAGER_OPTS";
+  public static final String KEY_YARN_NM_CGROUPS_PATH = "yarn.nodemanager.cgroups.path";
+  public static final String KEY_YARN_RM_HOSTNAME = "yarn.resourcemanager.hostname";
 
   /**
    * YARN container executor class.
    */
-  public static final String KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS =
-    "yarn.nodemanager.container-executor.class";
+  public static final String KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS = "yarn.nodemanager.container-executor.class";
   // TODO (mohit): Should it be configurable ?
-  public static final String VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS =
-    "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor";
-  public static final String DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS =
-    "org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor";
+  public static final String VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS = "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor";
+  public static final String DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS = "org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor";
 
   /**
    * YARN class to help handle LCE resources
    */
-  public static final String KEY_YARN_NM_LCE_RH_CLASS =
-    "yarn.nodemanager.linux-container-executor.resources-handler.class";
+  public static final String KEY_YARN_NM_LCE_RH_CLASS = "yarn.nodemanager.linux-container-executor.resources-handler.class";
 
   // TODO (mohit): Should it be configurable ?
-  public static final String VAL_YARN_NM_LCE_RH_CLASS =
-    "org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler";
-  public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY =
-    "yarn.nodemanager.linux-container-executor.cgroups.hierarchy";
-  public static final String VAL_YARN_NM_LCE_CGROUPS_HIERARCHY =
-    "mesos/$TASK_DIR";
-  public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT =
-    "yarn.nodemanager.linux-container-executor.cgroups.mount";
-  public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH =
-    "yarn.nodemanager.linux-container-executor.cgroups.mount-path";
+  public static final String VAL_YARN_NM_LCE_RH_CLASS = "org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler";
+  public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = "yarn.nodemanager.linux-container-executor.cgroups.hierarchy";
+  public static final String VAL_YARN_NM_LCE_CGROUPS_HIERARCHY = "mesos/$TASK_DIR";
+  public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT = "yarn.nodemanager.linux-container-executor.cgroups.mount";
+  public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH = "yarn.nodemanager.linux-container-executor.cgroups.mount-path";
   public static final String VAL_YARN_NM_LCE_CGROUPS_MOUNT_PATH = "/sys/fs/cgroup";
-  public static final String KEY_YARN_NM_LCE_GROUP =
-    "yarn.nodemanager.linux-container-executor.group";
-  public static final String KEY_YARN_NM_LCE_PATH =
-    "yarn.nodemanager.linux-container-executor.path";
+  public static final String KEY_YARN_NM_LCE_GROUP = "yarn.nodemanager.linux-container-executor.group";
+  public static final String KEY_YARN_NM_LCE_PATH = "yarn.nodemanager.linux-container-executor.path";
   public static final String KEY_YARN_HOME = "yarn.home";
-  public static final String KEY_NM_RESOURCE_CPU_VCORES =
-    "nodemanager.resource.cpu-vcores";
-  public static final String KEY_NM_RESOURCE_MEM_MB =
-    "nodemanager.resource.memory-mb";
-  public static final String YARN_NM_CMD =
-      " $YARN_HOME/bin/yarn nodemanager";
+  public static final String KEY_NM_RESOURCE_CPU_VCORES = "nodemanager.resource.cpu-vcores";
+  public static final String KEY_NM_RESOURCE_MEM_MB = "nodemanager.resource.memory-mb";
+  public static final String YARN_NM_CMD = " $YARN_HOME/bin/yarn nodemanager";
   public static final String KEY_NM_ADDRESS = "myriad.yarn.nodemanager.address";
-  public static final String KEY_NM_LOCALIZER_ADDRESS =
-    "myriad.yarn.nodemanager.localizer.address";
-  public static final String KEY_NM_WEBAPP_ADDRESS =
-    "myriad.yarn.nodemanager.webapp.address";
-  public static final String KEY_NM_SHUFFLE_PORT =
-    "myriad.mapreduce.shuffle.port";
-
-  private static final String ALL_LOCAL_IPV4ADDR =  "0.0.0.0:";
+  public static final String KEY_NM_LOCALIZER_ADDRESS = "myriad.yarn.nodemanager.localizer.address";
+  public static final String KEY_NM_WEBAPP_ADDRESS = "myriad.yarn.nodemanager.webapp.address";
+  public static final String KEY_NM_SHUFFLE_PORT = "myriad.mapreduce.shuffle.port";
+
+  private static final String ALL_LOCAL_IPV4ADDR = "0.0.0.0:";
   private static final String PROPERTY_FORMAT = "-D%s=%s";
 
   private Map<String, String> environment = new HashMap<>();
@@ -125,49 +105,38 @@ public class NMExecutorCLGenImpl implements ExecutorCommandLineGenerator {
     }
 
     if (cfg.getNodeManagerConfiguration().getCgroups().or(Boolean.FALSE)) {
-      addYarnNodemanagerOpt(KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS,
-          VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS);
+      addYarnNodemanagerOpt(KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS, VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS);
       addYarnNodemanagerOpt(KEY_YARN_NM_LCE_RH_CLASS, VAL_YARN_NM_LCE_RH_CLASS);
 
-        // TODO: Configure hierarchy
-        addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_HIERARCHY,
-          VAL_YARN_NM_LCE_CGROUPS_HIERARCHY);
-        addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_MOUNT, "true");
-        // TODO: Make it configurable
-        addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH,
-          VAL_YARN_NM_LCE_CGROUPS_MOUNT_PATH);
-        addYarnNodemanagerOpt(KEY_YARN_NM_LCE_GROUP, "root");
-        if (environment.containsKey("YARN_HOME")) {
-          addYarnNodemanagerOpt(KEY_YARN_HOME, environment.get("YARN_HOME"));
-        }
+      // TODO: Configure hierarchy
+      addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_HIERARCHY, VAL_YARN_NM_LCE_CGROUPS_HIERARCHY);
+      addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_MOUNT, "true");
+      // TODO: Make it configurable
+      addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH, VAL_YARN_NM_LCE_CGROUPS_MOUNT_PATH);
+      addYarnNodemanagerOpt(KEY_YARN_NM_LCE_GROUP, "root");
+      if (environment.containsKey("YARN_HOME")) {
+        addYarnNodemanagerOpt(KEY_YARN_HOME, environment.get("YARN_HOME"));
+      }
     } else {
-        // Otherwise configure to use Default
-      addYarnNodemanagerOpt(KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS,
-          DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS);
+      // Otherwise configure to use Default
+      addYarnNodemanagerOpt(KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS, DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS);
     }
-    addYarnNodemanagerOpt(KEY_NM_RESOURCE_CPU_VCORES,
-      Integer.toString(profile.getCpus().intValue()));
-    addYarnNodemanagerOpt(KEY_NM_RESOURCE_MEM_MB,
-      Integer.toString(profile.getMemory().intValue()));
-    addYarnNodemanagerOpt(KEY_NM_ADDRESS, ALL_LOCAL_IPV4ADDR +
-      Long.valueOf(ports.getRpcPort()).toString());
-    addYarnNodemanagerOpt(KEY_NM_LOCALIZER_ADDRESS,
-      ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getLocalizerPort()).toString()); 
-    addYarnNodemanagerOpt(KEY_NM_WEBAPP_ADDRESS,
-      ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getWebAppHttpPort()).toString());
-    addYarnNodemanagerOpt(KEY_NM_SHUFFLE_PORT,
-      Long.valueOf(ports.getShufflePort()).toString());
+    addYarnNodemanagerOpt(KEY_NM_RESOURCE_CPU_VCORES, Integer.toString(profile.getCpus().intValue()));
+    addYarnNodemanagerOpt(KEY_NM_RESOURCE_MEM_MB, Integer.toString(profile.getMemory().intValue()));
+    addYarnNodemanagerOpt(KEY_NM_ADDRESS, ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getRpcPort()).toString());
+    addYarnNodemanagerOpt(KEY_NM_LOCALIZER_ADDRESS, ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getLocalizerPort()).toString());
+    addYarnNodemanagerOpt(KEY_NM_WEBAPP_ADDRESS, ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getWebAppHttpPort()).toString());
+    addYarnNodemanagerOpt(KEY_NM_SHUFFLE_PORT, Long.valueOf(ports.getShufflePort()).toString());
   }
 
   protected void appendEnvForNM(StringBuilder cmdLine) {
     cmdLine.append(" env ");
     for (Map.Entry<String, String> env : environment.entrySet()) {
-      cmdLine.append(env.getKey()).append("=").append("\"")
-          .append(env.getValue()).append("\" ");
+      cmdLine.append(env.getKey()).append("=").append("\"").append(env.getValue()).append("\" ");
     }
   }
 
-  protected void appendCgroupsCmds(StringBuilder cmdLine) { 
+  protected void appendCgroupsCmds(StringBuilder cmdLine) {
     if (cfg.getNodeManagerConfiguration().getCgroups().or(Boolean.FALSE)) {
       cmdLine.append(" export TASK_DIR=`basename $PWD`;");
       cmdLine.append(" chmod +x /sys/fs/cgroup/cpu/mesos/$TASK_DIR;");
@@ -182,12 +151,12 @@ public class NMExecutorCLGenImpl implements ExecutorCommandLineGenerator {
 
   protected void addYarnNodemanagerOpt(String propertyName, String propertyValue) {
     String envOpt = String.format(PROPERTY_FORMAT, propertyName, propertyValue);
-      if (environment.containsKey(ENV_YARN_NODEMANAGER_OPTS)) {
-          String existingOpts = environment.get(ENV_YARN_NODEMANAGER_OPTS);
-          environment.put(ENV_YARN_NODEMANAGER_OPTS, existingOpts + " " + envOpt);
-      } else {
-          environment.put(ENV_YARN_NODEMANAGER_OPTS, envOpt);
-      }
+    if (environment.containsKey(ENV_YARN_NODEMANAGER_OPTS)) {
+      String existingOpts = environment.get(ENV_YARN_NODEMANAGER_OPTS);
+      environment.put(ENV_YARN_NODEMANAGER_OPTS, existingOpts + " " + envOpt);
+    } else {
+      environment.put(ENV_YARN_NODEMANAGER_OPTS, envOpt);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMPorts.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMPorts.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMPorts.java
index beb0920..39a9369 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMPorts.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMPorts.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,57 +27,52 @@ import java.util.Map;
  * Helper class for dynamically assigning ports to nodemanager
  */
 public class NMPorts implements Ports {
-    private static final String NM_RPC_PORT_KEY = "nm.rpc.port";
-    private static final String NM_LOCALIZER_PORT_KEY = "nm.localizer.port";
-    private static final String NM_WEBAPP_HTTP_PORT_KEY = "nm.webapp.http.port";
-    private static final String NM_HTTP_SHUFFLE_PORT_KEY = "nm.http.shuffle.port";
+  private static final String NM_RPC_PORT_KEY = "nm.rpc.port";
+  private static final String NM_LOCALIZER_PORT_KEY = "nm.localizer.port";
+  private static final String NM_WEBAPP_HTTP_PORT_KEY = "nm.webapp.http.port";
+  private static final String NM_HTTP_SHUFFLE_PORT_KEY = "nm.http.shuffle.port";
 
-    private static final String [] NM_PORT_KEYS = {
-            NM_RPC_PORT_KEY,
-            NM_LOCALIZER_PORT_KEY,
-            NM_WEBAPP_HTTP_PORT_KEY,
-            NM_HTTP_SHUFFLE_PORT_KEY
-    };
+  private static final String[] NM_PORT_KEYS = {NM_RPC_PORT_KEY, NM_LOCALIZER_PORT_KEY, NM_WEBAPP_HTTP_PORT_KEY, NM_HTTP_SHUFFLE_PORT_KEY};
 
-    private Map<String, Long> portsMap = new HashMap<>(NM_PORT_KEYS.length);
+  private Map<String, Long> portsMap = new HashMap<>(NM_PORT_KEYS.length);
 
-    public NMPorts(Long [] ports){
-        Preconditions.checkState(ports.length == NM_PORT_KEYS.length, "NMPorts: array \"ports\" is of unexpected length");
-        for (int i = 0; i < NM_PORT_KEYS.length; i++) {
-            portsMap.put(NM_PORT_KEYS[i], ports[i]);
-        }
+  public NMPorts(Long[] ports) {
+    Preconditions.checkState(ports.length == NM_PORT_KEYS.length, "NMPorts: array \"ports\" is of unexpected length");
+    for (int i = 0; i < NM_PORT_KEYS.length; i++) {
+      portsMap.put(NM_PORT_KEYS[i], ports[i]);
     }
+  }
 
-    public long getRpcPort() {
-        return portsMap.get(NM_RPC_PORT_KEY);
-    }
+  public long getRpcPort() {
+    return portsMap.get(NM_RPC_PORT_KEY);
+  }
 
-    public long getLocalizerPort() {
-        return portsMap.get(NM_LOCALIZER_PORT_KEY);
-    }
+  public long getLocalizerPort() {
+    return portsMap.get(NM_LOCALIZER_PORT_KEY);
+  }
 
-    public long getWebAppHttpPort() {
-        return portsMap.get(NM_WEBAPP_HTTP_PORT_KEY);
-    }
+  public long getWebAppHttpPort() {
+    return portsMap.get(NM_WEBAPP_HTTP_PORT_KEY);
+  }
 
-    public long getShufflePort() {
-        return portsMap.get(NM_HTTP_SHUFFLE_PORT_KEY);
-    }
+  public long getShufflePort() {
+    return portsMap.get(NM_HTTP_SHUFFLE_PORT_KEY);
+  }
 
-    public static int expectedNumPorts() {
-        return NM_PORT_KEYS.length;
-    }
+  public static int expectedNumPorts() {
+    return NM_PORT_KEYS.length;
+  }
 
-    /**
-     * @return a string representation of NMPorts
-     */
-    @Override
-    public String toString(){
-        StringBuilder sb = new StringBuilder().append("{");
-        for (String key: NM_PORT_KEYS) {
-            sb.append(key).append(": ").append(portsMap.get(key).toString()).append(", ");
-        }
-        sb.replace(sb.length() - 2, sb.length(), "}");
-        return sb.toString();
+  /**
+   * @return a string representation of NMPorts
+   */
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder().append("{");
+    for (String key : NM_PORT_KEYS) {
+      sb.append(key).append(": ").append(portsMap.get(key).toString()).append(", ");
     }
+    sb.replace(sb.length() - 2, sb.length(), "}");
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMProfile.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMProfile.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMProfile.java
index 6c148cf..aed0e06 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMProfile.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMProfile.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,40 +24,40 @@ import com.google.gson.Gson;
  * Node Manager Profile
  */
 public class NMProfile {
-    private String name;
-
-    /**
-     * Number of CPU advertised to YARN Resource Manager.
-     */
-    private Long cpus;
-
-    /**
-     * Memory in MB advertised to YARN Resource Manager.
-     */
-    private Long memory;
-
-    public NMProfile(String name, Long cpus, Long memory) {
-        this.name = name;
-        this.cpus = cpus;
-        this.memory = memory;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public Long getCpus() {
-        return cpus;
-    }
-
-    public Long getMemory() {
-        return memory;
-    }
-
-    @Override
-    public String toString() {
-        Gson gson = new Gson();
-        return gson.toJson(this);
-    }
+  private String name;
+
+  /**
+   * Number of CPU advertised to YARN Resource Manager.
+   */
+  private Long cpus;
+
+  /**
+   * Memory in MB advertised to YARN Resource Manager.
+   */
+  private Long memory;
+
+  public NMProfile(String name, Long cpus, Long memory) {
+    this.name = name;
+    this.cpus = cpus;
+    this.memory = memory;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public Long getCpus() {
+    return cpus;
+  }
+
+  public Long getMemory() {
+    return memory;
+  }
+
+  @Override
+  public String toString() {
+    Gson gson = new Gson();
+    return gson.toJson(this);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMProfileManager.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMProfileManager.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMProfileManager.java
index 61bcd78..c6829b1 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMProfileManager.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMProfileManager.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,27 +29,26 @@ import java.util.concurrent.ConcurrentHashMap;
  * Node Manager Profile Manager
  */
 public class NMProfileManager {
-    private static final Logger LOGGER = LoggerFactory.getLogger(NMProfileManager.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(NMProfileManager.class);
 
-    private Map<String, NMProfile> profiles = new ConcurrentHashMap<>();
+  private Map<String, NMProfile> profiles = new ConcurrentHashMap<>();
 
-    public NMProfile get(String name) {
-        return profiles.get(name);
-    }
+  public NMProfile get(String name) {
+    return profiles.get(name);
+  }
 
-    public void add(NMProfile profile) {
-        LOGGER.info("Adding profile {} with CPU: {} and Memory: {}",
-                profile.getName(), profile.getCpus(), profile.getMemory());
+  public void add(NMProfile profile) {
+    LOGGER.info("Adding profile {} with CPU: {} and Memory: {}", profile.getName(), profile.getCpus(), profile.getMemory());
 
-        profiles.put(profile.getName(), profile);
-    }
+    profiles.put(profile.getName(), profile);
+  }
 
-    public boolean exists(String name) {
-        return this.profiles.containsKey(name);
-    }
+  public boolean exists(String name) {
+    return this.profiles.containsKey(name);
+  }
 
-    public String toString() {
-        Gson gson = new Gson();
-        return gson.toJson(this);
-    }
+  public String toString() {
+    Gson gson = new Gson();
+    return gson.toJson(this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMTaskFactoryAnnotation.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMTaskFactoryAnnotation.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMTaskFactoryAnnotation.java
index 659fd7c..29d961b 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMTaskFactoryAnnotation.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMTaskFactoryAnnotation.java
@@ -1,20 +1,20 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package com.ebay.myriad.scheduler;
 
@@ -28,7 +28,9 @@ import static java.lang.annotation.ElementType.METHOD;
 
 /**
  * NMTaskFactory annotation that allows to bind TaskFactory to NM specific implementation
- *
  */
-@BindingAnnotation @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-public @interface NMTaskFactoryAnnotation {}
\ No newline at end of file
+@BindingAnnotation
+@Target({FIELD, PARAMETER, METHOD})
+@Retention(RUNTIME)
+public @interface NMTaskFactoryAnnotation {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Ports.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Ports.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Ports.java
index a23924d..b862e75 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Ports.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Ports.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,7 +20,6 @@ package com.ebay.myriad.scheduler;
 
 /**
  * Generic interface to represent ports
- *
  */
 public interface Ports {
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java
index 0c9a035..6747c5a 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -33,74 +33,72 @@ import org.slf4j.LoggerFactory;
  * configured rules and policies.
  */
 public class Rebalancer implements Runnable {
-    private static final Logger LOGGER = LoggerFactory.getLogger(Rebalancer.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(Rebalancer.class);
 
-    private final SchedulerState schedulerState;
-    private final MyriadOperations myriadOperations;
-    private final ServiceProfileManager profileManager;
+  private final SchedulerState schedulerState;
+  private final MyriadOperations myriadOperations;
+  private final ServiceProfileManager profileManager;
 
-    @Inject
-    public Rebalancer(SchedulerState schedulerState,
-                      MyriadOperations myriadOperations,
-                      ServiceProfileManager profileManager) {
-        this.schedulerState = schedulerState;
-        this.myriadOperations = myriadOperations;
-        this.profileManager = profileManager;
-    }
-
-    @Override
-    public void run() {
-      final Set<Protos.TaskID> activeIds = schedulerState.getActiveTaskIds(NodeManagerConfiguration.NM_TASK_PREFIX);
-      final Set<Protos.TaskID> pendingIds = schedulerState.getPendingTaskIds(NodeManagerConfiguration.NM_TASK_PREFIX);
-        LOGGER.info("Active {}, Pending {}", activeIds.size(), pendingIds.size());
-        if (activeIds.size() < 1 && pendingIds.size() < 1) {
-            myriadOperations.flexUpCluster(profileManager.get("small"), 1, null);
-        }
-//            RestAdapter restAdapter = new RestAdapter.Builder()
-//                    .setEndpoint("http://" + host + ":" + port)
-//                    .setLogLevel(LogLevel.FULL).build();
-//            YARNResourceManagerService service = restAdapter
-//                    .create(YARNResourceManagerService.class);
-//
-//            ClusterMetrics metrics = service.metrics().getClusterMetrics();
-//            AppsResponse appsResponse = service.apps("ACCEPTED");
-//
-//            int acceptedApps = 0;
-//
-//            if (appsResponse == null || appsResponse.getApps() == null
-//                    || appsResponse.getApps().getApps() == null) {
-//                acceptedApps = 0;
-//            } else {
-//                acceptedApps = appsResponse.getApps().getApps().size();
-//            }
-//            LOGGER.info("Metrics: {}", metrics);
-//            LOGGER.info("Apps: {}", appsResponse);
-//
-//            long availableMB = metrics.getAvailableMB();
-//            long allocatedMB = metrics.getAllocatedMB();
-//            long reservedMB = metrics.getReservedMB();
-//            int activeNodes = metrics.getActiveNodes();
-//            int unhealthyNodes = metrics.getUnhealthyNodes();
-//            int appsPending = metrics.getAppsPending();
-//            int appsRunning = metrics.getAppsRunning();
+  @Inject
+  public Rebalancer(SchedulerState schedulerState, MyriadOperations myriadOperations, ServiceProfileManager profileManager) {
+    this.schedulerState = schedulerState;
+    this.myriadOperations = myriadOperations;
+    this.profileManager = profileManager;
+  }
 
-//            if (activeNodes == 0 && appsPending > 0) {
-//                LOGGER.info(
-//                        "Flexing up for condition: activeNodes ({}) == 0 && appsPending ({}) > 0",
-//                        activeNodes, appsPending);
-//                this.myriadOperations.flexUpCluster(clusterId, 1, "small");
-//            } else if (appsPending == 0 && appsRunning == 0 && activeNodes > 0) {
-//                LOGGER.info(
-//                        "Flexing down for condition: appsPending ({}) == 0 && appsRunning ({}) == 0 && activeNodes ({}) > 0",
-//                        appsPending, appsRunning, activeNodes);
-//                this.myriadOperations.flexDownCluster(cluster, 1);
-//            } else if (acceptedApps > 0) {
-//                LOGGER.info("Flexing up for condition: acceptedApps ({}) > 0",
-//                        acceptedApps);
-//                this.myriadOperations.flexUpCluster(clusterId, 1, "small");
-//            } else {
-//                LOGGER.info("Nothing to rebalance");
-//                this.schedulerState.releaseLock(clusterId);
-//            }
+  @Override
+  public void run() {
+    final Set<Protos.TaskID> activeIds = schedulerState.getActiveTaskIds(NodeManagerConfiguration.NM_TASK_PREFIX);
+    final Set<Protos.TaskID> pendingIds = schedulerState.getPendingTaskIds(NodeManagerConfiguration.NM_TASK_PREFIX);
+    LOGGER.info("Active {}, Pending {}", activeIds.size(), pendingIds.size());
+    if (activeIds.size() < 1 && pendingIds.size() < 1) {
+      myriadOperations.flexUpCluster(profileManager.get("small"), 1, null);
     }
+    //            RestAdapter restAdapter = new RestAdapter.Builder()
+    //                    .setEndpoint("http://" + host + ":" + port)
+    //                    .setLogLevel(LogLevel.FULL).build();
+    //            YARNResourceManagerService service = restAdapter
+    //                    .create(YARNResourceManagerService.class);
+    //
+    //            ClusterMetrics metrics = service.metrics().getClusterMetrics();
+    //            AppsResponse appsResponse = service.apps("ACCEPTED");
+    //
+    //            int acceptedApps = 0;
+    //
+    //            if (appsResponse == null || appsResponse.getApps() == null
+    //                    || appsResponse.getApps().getApps() == null) {
+    //                acceptedApps = 0;
+    //            } else {
+    //                acceptedApps = appsResponse.getApps().getApps().size();
+    //            }
+    //            LOGGER.info("Metrics: {}", metrics);
+    //            LOGGER.info("Apps: {}", appsResponse);
+    //
+    //            long availableMB = metrics.getAvailableMB();
+    //            long allocatedMB = metrics.getAllocatedMB();
+    //            long reservedMB = metrics.getReservedMB();
+    //            int activeNodes = metrics.getActiveNodes();
+    //            int unhealthyNodes = metrics.getUnhealthyNodes();
+    //            int appsPending = metrics.getAppsPending();
+    //            int appsRunning = metrics.getAppsRunning();
+
+    //            if (activeNodes == 0 && appsPending > 0) {
+    //                LOGGER.info(
+    //                        "Flexing up for condition: activeNodes ({}) == 0 && appsPending ({}) > 0",
+    //                        activeNodes, appsPending);
+    //                this.myriadOperations.flexUpCluster(clusterId, 1, "small");
+    //            } else if (appsPending == 0 && appsRunning == 0 && activeNodes > 0) {
+    //                LOGGER.info(
+    //                        "Flexing down for condition: appsPending ({}) == 0 && appsRunning ({}) == 0 && activeNodes ({}) > 0",
+    //                        appsPending, appsRunning, activeNodes);
+    //                this.myriadOperations.flexDownCluster(cluster, 1);
+    //            } else if (acceptedApps > 0) {
+    //                LOGGER.info("Flexing up for condition: acceptedApps ({}) > 0",
+    //                        acceptedApps);
+    //                this.myriadOperations.flexUpCluster(clusterId, 1, "small");
+    //            } else {
+    //                LOGGER.info("Nothing to rebalance");
+    //                this.schedulerState.releaseLock(clusterId);
+    //            }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ReconcileService.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ReconcileService.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ReconcileService.java
index 4622df1..be47bef 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ReconcileService.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ReconcileService.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,52 +34,52 @@ import java.util.Date;
  * {@link ReconcileService} is responsible for reconciling tasks with the mesos master
  */
 public class ReconcileService {
-    private static final Logger LOGGER = LoggerFactory.getLogger(ReconcileService.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(ReconcileService.class);
 
-    public static final long DEFAULT_RECONCILATION_DELAY_MS = 10000;
-    public static final long MAX_RECONCILE_ATTEMPTS = 10;
+  public static final long DEFAULT_RECONCILATION_DELAY_MS = 10000;
+  public static final long MAX_RECONCILE_ATTEMPTS = 10;
 
-    private SchedulerState state;
-    private MyriadConfiguration cfg;
-    private Date lastReconcileTime;
+  private SchedulerState state;
+  private MyriadConfiguration cfg;
+  private Date lastReconcileTime;
 
-    @Inject
-    public ReconcileService(SchedulerState state, MyriadConfiguration cfg) {
-        this.state = state;
-        this.cfg = cfg;
-    }
+  @Inject
+  public ReconcileService(SchedulerState state, MyriadConfiguration cfg) {
+    this.state = state;
+    this.cfg = cfg;
+  }
 
-    public void reconcile(SchedulerDriver driver) {
-        Collection<Protos.TaskStatus> taskStatuses = state.getTaskStatuses();
+  public void reconcile(SchedulerDriver driver) {
+    Collection<Protos.TaskStatus> taskStatuses = state.getTaskStatuses();
 
-        if (taskStatuses.size() == 0) {
-            return;
-        }
-        LOGGER.info("Reconciling {} tasks.", taskStatuses.size());
+    if (taskStatuses.size() == 0) {
+      return;
+    }
+    LOGGER.info("Reconciling {} tasks.", taskStatuses.size());
 
-        driver.reconcileTasks(taskStatuses);
+    driver.reconcileTasks(taskStatuses);
 
-        lastReconcileTime = new Date();
+    lastReconcileTime = new Date();
 
-        int attempt = 1;
+    int attempt = 1;
 
-        while (attempt <= MAX_RECONCILE_ATTEMPTS) {
-            try {
-                // TODO(mohit): Using exponential backoff here, maybe backoff strategy should be configurable.
-                Thread.sleep(DEFAULT_RECONCILATION_DELAY_MS * attempt);
-            } catch (InterruptedException e) {
-                LOGGER.error("Interrupted", e);
-            }
-            Collection<Protos.TaskStatus> notYetReconciled = new ArrayList<>();
-            for (Protos.TaskStatus status : state.getTaskStatuses()) {
-                if (status.getTimestamp() < lastReconcileTime.getTime()) {
-                    notYetReconciled.add(status);
-                }
-            }
-            LOGGER.info("Reconcile attempt {} for {} tasks", attempt, notYetReconciled.size());
-            driver.reconcileTasks(notYetReconciled);
-            lastReconcileTime = new Date();
-            attempt++;
+    while (attempt <= MAX_RECONCILE_ATTEMPTS) {
+      try {
+        // TODO(mohit): Using exponential backoff here, maybe backoff strategy should be configurable.
+        Thread.sleep(DEFAULT_RECONCILATION_DELAY_MS * attempt);
+      } catch (InterruptedException e) {
+        LOGGER.error("Interrupted", e);
+      }
+      Collection<Protos.TaskStatus> notYetReconciled = new ArrayList<>();
+      for (Protos.TaskStatus status : state.getTaskStatuses()) {
+        if (status.getTimestamp() < lastReconcileTime.getTime()) {
+          notYetReconciled.add(status);
         }
+      }
+      LOGGER.info("Reconcile attempt {} for {} tasks", attempt, notYetReconciled.size());
+      driver.reconcileTasks(notYetReconciled);
+      lastReconcileTime = new Date();
+      attempt++;
     }
+  }
 }


Mime
View raw message