myriad-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From smare...@apache.org
Subject [02/20] incubator-myriad git commit: spacing changes
Date Wed, 28 Oct 2015 16:07:34 GMT
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/BaseInterceptor.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/BaseInterceptor.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/BaseInterceptor.java
index 0e339d8..b3b37e5 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/BaseInterceptor.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/BaseInterceptor.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -33,9 +33,9 @@ import java.io.IOException;
  * required methods.
  */
 public class BaseInterceptor implements YarnSchedulerInterceptor {
-    // restrict the constructor
-    protected BaseInterceptor() {
-    }
+  // restrict the constructor
+  protected BaseInterceptor() {
+  }
 
   @Override
   public CallBackFilter getCallBackFilter() {
@@ -47,22 +47,22 @@ public class BaseInterceptor implements YarnSchedulerInterceptor {
     };
   }
 
-    @Override
-    public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException {
-    }
+  @Override
+  public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException {
+  }
 
-    @Override
-    public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) {
+  @Override
+  public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) {
 
-    }
+  }
 
-    @Override
-    public void beforeSchedulerEventHandled(SchedulerEvent event) {
+  @Override
+  public void beforeSchedulerEventHandled(SchedulerEvent event) {
 
-    }
+  }
 
-    @Override
-    public void afterSchedulerEventHandled(SchedulerEvent event) {
+  @Override
+  public void afterSchedulerEventHandled(SchedulerEvent event) {
 
-    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java
index 5b7e190..6ae8b7e 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -40,101 +40,100 @@ import java.util.Map;
 /**
  * An interceptor that wraps other interceptors. The Myriad{Fair,Capacity,Fifo}Scheduler classes
  * instantiate this class and allow interception of the Yarn scheduler events/method calls.
- *
+ * <p/>
  * The {@link CompositeInterceptor} allows other interceptors to be registered via {@link InterceptorRegistry}
  * and passes control to the registered interceptors whenever a event/method call is being intercepted.
- *
  */
 public class CompositeInterceptor implements YarnSchedulerInterceptor, InterceptorRegistry {
-    private static final Logger LOGGER = LoggerFactory.getLogger(CompositeInterceptor.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(CompositeInterceptor.class);
 
-    private Map<Class<?>, YarnSchedulerInterceptor> interceptors = Maps.newLinkedHashMap();
-    private YarnSchedulerInterceptor myriadInitInterceptor;
+  private Map<Class<?>, YarnSchedulerInterceptor> interceptors = Maps.newLinkedHashMap();
+  private YarnSchedulerInterceptor myriadInitInterceptor;
 
-    /**
-     * Called by Myriad{Fair,Capacity,Fifo}Scheduler classes. Creates an instance of
-     * {@link MyriadInitializationInterceptor}.
-     */
-    public CompositeInterceptor() {
-        this.myriadInitInterceptor = new MyriadInitializationInterceptor(this);
-    }
+  /**
+   * Called by Myriad{Fair,Capacity,Fifo}Scheduler classes. Creates an instance of
+   * {@link MyriadInitializationInterceptor}.
+   */
+  public CompositeInterceptor() {
+    this.myriadInitInterceptor = new MyriadInitializationInterceptor(this);
+  }
 
-    @VisibleForTesting
-    public void setMyriadInitInterceptor(YarnSchedulerInterceptor myriadInitInterceptor) {
-        this.myriadInitInterceptor = myriadInitInterceptor;
-    }
+  @VisibleForTesting
+  public void setMyriadInitInterceptor(YarnSchedulerInterceptor myriadInitInterceptor) {
+    this.myriadInitInterceptor = myriadInitInterceptor;
+  }
 
-    @Override
-    public void register(YarnSchedulerInterceptor interceptor) {
-      interceptors.put(interceptor.getClass(), interceptor);
-      LOGGER.info("Registered {} into the registry.", interceptor.getClass().getName());
-    }
+  @Override
+  public void register(YarnSchedulerInterceptor interceptor) {
+    interceptors.put(interceptor.getClass(), interceptor);
+    LOGGER.info("Registered {} into the registry.", interceptor.getClass().getName());
+  }
 
-    @Override
-    public CallBackFilter getCallBackFilter() {
-      return new CallBackFilter() {
-        @Override
-        public boolean allowCallBacksForNode(NodeId nodeManager) {
-          return true;
-        }
-      };
-    }
+  @Override
+  public CallBackFilter getCallBackFilter() {
+    return new CallBackFilter() {
+      @Override
+      public boolean allowCallBacksForNode(NodeId nodeManager) {
+        return true;
+      }
+    };
+  }
 
   /**
-     * Allows myriad to be initialized via {@link #myriadInitInterceptor}. After myriad is initialized,
-     * other interceptors will later register with this class via
-     * {@link InterceptorRegistry#register(YarnSchedulerInterceptor)}.
-     *
-     * @param conf
-     * @param yarnScheduler
-     * @param rmContext
-     * @throws IOException
-     */
-    @Override
-    public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException {
-        myriadInitInterceptor.init(conf, yarnScheduler, rmContext);
-    }
+   * Allows myriad to be initialized via {@link #myriadInitInterceptor}. After myriad is initialized,
+   * other interceptors will later register with this class via
+   * {@link InterceptorRegistry#register(YarnSchedulerInterceptor)}.
+   *
+   * @param conf
+   * @param yarnScheduler
+   * @param rmContext
+   * @throws IOException
+   */
+  @Override
+  public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException {
+    myriadInitInterceptor.init(conf, yarnScheduler, rmContext);
+  }
 
-    @Override
-    public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) {
-        for (YarnSchedulerInterceptor interceptor : interceptors.values()) {
-            if (interceptor.getCallBackFilter().allowCallBacksForNode(event.getNodeId())) {
-                interceptor.beforeRMNodeEventHandled(event, context);
-            }
-        }
+  @Override
+  public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) {
+    for (YarnSchedulerInterceptor interceptor : interceptors.values()) {
+      if (interceptor.getCallBackFilter().allowCallBacksForNode(event.getNodeId())) {
+        interceptor.beforeRMNodeEventHandled(event, context);
+      }
     }
+  }
 
-    @Override
-    public void beforeSchedulerEventHandled(SchedulerEvent event) {
-        for (YarnSchedulerInterceptor interceptor : interceptors.values()) {
-          final NodeId nodeId = getNodeIdForSchedulerEvent(event);
-          if (nodeId != null && interceptor.getCallBackFilter().allowCallBacksForNode(nodeId)) {
-              interceptor.beforeSchedulerEventHandled(event);
-          }
-        }
+  @Override
+  public void beforeSchedulerEventHandled(SchedulerEvent event) {
+    for (YarnSchedulerInterceptor interceptor : interceptors.values()) {
+      final NodeId nodeId = getNodeIdForSchedulerEvent(event);
+      if (nodeId != null && interceptor.getCallBackFilter().allowCallBacksForNode(nodeId)) {
+        interceptor.beforeSchedulerEventHandled(event);
+      }
     }
+  }
 
-    @Override
-    public void afterSchedulerEventHandled(SchedulerEvent event) {
-        for (YarnSchedulerInterceptor interceptor : interceptors.values()) {
-          NodeId nodeId = getNodeIdForSchedulerEvent(event);
-          if (nodeId != null && interceptor.getCallBackFilter().allowCallBacksForNode(nodeId)) {
-              interceptor.afterSchedulerEventHandled(event);
-            }
-        }
+  @Override
+  public void afterSchedulerEventHandled(SchedulerEvent event) {
+    for (YarnSchedulerInterceptor interceptor : interceptors.values()) {
+      NodeId nodeId = getNodeIdForSchedulerEvent(event);
+      if (nodeId != null && interceptor.getCallBackFilter().allowCallBacksForNode(nodeId)) {
+        interceptor.afterSchedulerEventHandled(event);
+      }
     }
+  }
 
   private NodeId getNodeIdForSchedulerEvent(SchedulerEvent event) {
-      switch (event.getType()) {
-        case NODE_ADDED:
-          return ((NodeAddedSchedulerEvent) event).getAddedRMNode().getNodeID();
-        case NODE_REMOVED:
-          return ((NodeRemovedSchedulerEvent) event).getRemovedRMNode().getNodeID();
-        case NODE_UPDATE:
-          return ((NodeUpdateSchedulerEvent) event).getRMNode().getNodeID();
-        case NODE_RESOURCE_UPDATE:
-          return ((NodeResourceUpdateSchedulerEvent) event).getRMNode().getNodeID();
-      }
-      return null;
+    switch (event.getType()) {
+      case NODE_ADDED:
+        return ((NodeAddedSchedulerEvent) event).getAddedRMNode().getNodeID();
+      case NODE_REMOVED:
+        return ((NodeRemovedSchedulerEvent) event).getRemovedRMNode().getNodeID();
+      case NODE_UPDATE:
+        return ((NodeUpdateSchedulerEvent) event).getRMNode().getNodeID();
+      case NODE_RESOURCE_UPDATE:
+        return ((NodeResourceUpdateSchedulerEvent) event).getRMNode().getNodeID();
     }
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java
index 433b6cd..3a504b1 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/InterceptorRegistry.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,6 @@ package com.ebay.myriad.scheduler.yarn.interceptor;
  */
 public interface InterceptorRegistry {
 
-    public void register(YarnSchedulerInterceptor interceptor);
+  public void register(YarnSchedulerInterceptor interceptor);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java
index d28259d..71580b3 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -32,27 +32,27 @@ import java.io.IOException;
  * Responsible for intializing myriad.
  */
 public class MyriadInitializationInterceptor extends BaseInterceptor {
-    private static final Logger LOGGER = LoggerFactory.getLogger(MyriadInitializationInterceptor.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(MyriadInitializationInterceptor.class);
 
-    private final InterceptorRegistry registry;
+  private final InterceptorRegistry registry;
 
-    public MyriadInitializationInterceptor(InterceptorRegistry registry) {
-        this.registry = registry;
-    }
+  public MyriadInitializationInterceptor(InterceptorRegistry registry) {
+    this.registry = registry;
+  }
 
-    /**
-     * Initialize Myriad plugin before RM's scheduler is initialized.
-     * This includes registration with Mesos master, initialization of
-     * the myriad web application, initializing guice modules etc.
-     */
-    @Override
-    public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException {
-        try {
-            Main.initialize(conf, yarnScheduler, rmContext, registry);
-        } catch (Exception e) {
-            // Abort bringing up RM
-            throw new RuntimeException("Failed to initialize myriad", e);
-        }
-        LOGGER.info("Initialized myriad.");
+  /**
+   * Initialize Myriad plugin before RM's scheduler is initialized.
+   * This includes registration with Mesos master, initialization of
+   * the myriad web application, initializing guice modules etc.
+   */
+  @Override
+  public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException {
+    try {
+      Main.initialize(conf, yarnScheduler, rmContext, registry);
+    } catch (Exception e) {
+      // Abort bringing up RM
+      throw new RuntimeException("Failed to initialize myriad", e);
     }
+    LOGGER.info("Initialized myriad.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java
index 0afcb7f..e28d052 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/YarnSchedulerInterceptor.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,62 +34,62 @@ import java.io.IOException;
  */
 public interface YarnSchedulerInterceptor {
 
+  /**
+   * Filters the method callbacks.
+   */
+  interface CallBackFilter {
     /**
-     * Filters the method callbacks.
-     */
-    interface CallBackFilter {
-      /**
-       * Method to determine if any other methods in {@link YarnSchedulerInterceptor}
-       * pertaining to a given node manager should be invoked or not.
-       *
-       * @param nodeManager NodeId of the Node Manager registered with RM.
-       * @return true to allow invoking further interceptor methods. false otherwise.
-       */
-      public boolean allowCallBacksForNode(NodeId nodeManager);
-    }
-
-    /**
-     * Return an instance of {@link CallBackFilter}. {@link CallBackFilter#allowCallBacksForNode(NodeId)}
-     * method is invoked to *determine* if any of the other methods pertaining to a specific node
-     * needs to be invoked or not.
+     * Method to determine if any other methods in {@link YarnSchedulerInterceptor}
+     * pertaining to a given node manager should be invoked or not.
      *
-     * @return
+     * @param nodeManager NodeId of the Node Manager registered with RM.
+     * @return true to allow invoking further interceptor methods. false otherwise.
      */
-    public CallBackFilter getCallBackFilter();
+    public boolean allowCallBacksForNode(NodeId nodeManager);
+  }
 
-    /**
-     * Invoked *before* {@link AbstractYarnScheduler#reinitialize(Configuration, RMContext)}
-     *
-     * @param conf
-     * @param yarnScheduler
-     * @param rmContext
-     * @throws IOException
-     */
-    public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException;
+  /**
+   * Return an instance of {@link CallBackFilter}. {@link CallBackFilter#allowCallBacksForNode(NodeId)}
+   * method is invoked to *determine* if any of the other methods pertaining to a specific node
+   * needs to be invoked or not.
+   *
+   * @return
+   */
+  public CallBackFilter getCallBackFilter();
 
-    /**
-     * Invoked *before* {@link RMNodeImpl#handle(RMNodeEvent)} only if
-     * {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true.
-     *
-     * @param event
-     * @param context
-     */
-    public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context);
+  /**
+   * Invoked *before* {@link AbstractYarnScheduler#reinitialize(Configuration, RMContext)}
+   *
+   * @param conf
+   * @param yarnScheduler
+   * @param rmContext
+   * @throws IOException
+   */
+  public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException;
 
-    /**
-     * Invoked *before* {@link YarnScheduler#handle(org.apache.hadoop.yarn.event.Event)} only if
-     * {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true.
-     *
-     * @param event
-     */
-    public void beforeSchedulerEventHandled(SchedulerEvent event);
+  /**
+   * Invoked *before* {@link RMNodeImpl#handle(RMNodeEvent)} only if
+   * {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true.
+   *
+   * @param event
+   * @param context
+   */
+  public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context);
 
-    /**
-     * Invoked *after* {@link YarnScheduler#handle(org.apache.hadoop.yarn.event.Event)} only if
-     * {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true.
-     *
-     * @param event
-     */
-    public void afterSchedulerEventHandled(SchedulerEvent event);
+  /**
+   * Invoked *before* {@link YarnScheduler#handle(org.apache.hadoop.yarn.event.Event)} only if
+   * {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true.
+   *
+   * @param event
+   */
+  public void beforeSchedulerEventHandled(SchedulerEvent event);
+
+  /**
+   * Invoked *after* {@link YarnScheduler#handle(org.apache.hadoop.yarn.event.Event)} only if
+   * {@link CallBackFilter#allowCallBacksForNode(NodeId)} returns true.
+   *
+   * @param event
+   */
+  public void afterSchedulerEventHandled(SchedulerEvent event);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/state/Cluster.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/Cluster.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/Cluster.java
index 1f2fe41..687b491 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/Cluster.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/Cluster.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,72 +28,72 @@ import java.util.UUID;
  * Model which represents the configuration of a cluster
  */
 public class Cluster {
-    private String clusterId;
-    private String clusterName;
-    private Collection<NodeTask> nodes;
-    private String resourceManagerHost;
-    private String resourceManagerPort;
-    private double minQuota;
-
-    public Cluster() {
-        this.clusterId = UUID.randomUUID().toString();
-        this.nodes = new HashSet<>();
-    }
-
-    public String getClusterId() {
-        return clusterId;
-    }
-
-    public String getClusterName() {
-        return clusterName;
-    }
-
-    public void setClusterName(String clusterName) {
-        this.clusterName = clusterName;
-    }
-
-    public Collection<NodeTask> getNodes() {
-        return nodes;
-    }
-
-    public void addNode(NodeTask node) {
-        this.nodes.add(node);
-    }
-
-    public void addNodes(Collection<NodeTask> nodes) {
-        this.nodes.addAll(nodes);
-    }
-
-    public void removeNode(NodeTask task) {
-        this.nodes.remove(task);
-    }
-
-    public String getResourceManagerHost() {
-        return resourceManagerHost;
-    }
-
-    public void setResourceManagerHost(String resourceManagerHost) {
-        this.resourceManagerHost = resourceManagerHost;
-    }
-
-    public String getResourceManagerPort() {
-        return resourceManagerPort;
-    }
-
-    public void setResourceManagerPort(String resourceManagerPort) {
-        this.resourceManagerPort = resourceManagerPort;
-    }
-
-    public double getMinQuota() {
-        return minQuota;
-    }
-
-    public void setMinQuota(double minQuota) {
-        this.minQuota = minQuota;
-    }
-
-    public String toString() {
-        Gson gson = new Gson();
-        return gson.toJson(this);
-    }
+  private String clusterId;
+  private String clusterName;
+  private Collection<NodeTask> nodes;
+  private String resourceManagerHost;
+  private String resourceManagerPort;
+  private double minQuota;
+
+  public Cluster() {
+    this.clusterId = UUID.randomUUID().toString();
+    this.nodes = new HashSet<>();
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  public String getClusterName() {
+    return clusterName;
+  }
+
+  public void setClusterName(String clusterName) {
+    this.clusterName = clusterName;
+  }
+
+  public Collection<NodeTask> getNodes() {
+    return nodes;
+  }
+
+  public void addNode(NodeTask node) {
+    this.nodes.add(node);
+  }
+
+  public void addNodes(Collection<NodeTask> nodes) {
+    this.nodes.addAll(nodes);
+  }
+
+  public void removeNode(NodeTask task) {
+    this.nodes.remove(task);
+  }
+
+  public String getResourceManagerHost() {
+    return resourceManagerHost;
+  }
+
+  public void setResourceManagerHost(String resourceManagerHost) {
+    this.resourceManagerHost = resourceManagerHost;
+  }
+
+  public String getResourceManagerPort() {
+    return resourceManagerPort;
+  }
+
+  public void setResourceManagerPort(String resourceManagerPort) {
+    this.resourceManagerPort = resourceManagerPort;
+  }
+
+  public double getMinQuota() {
+    return minQuota;
+  }
+
+  public void setMinQuota(double minQuota) {
+    this.minQuota = minQuota;
+  }
+
+  public String toString() {
+    Gson gson = new Gson();
+    return gson.toJson(this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadState.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadState.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadState.java
index 550c8cc..5e868f9 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadState.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadState.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,27 +30,27 @@ import java.util.concurrent.ExecutionException;
  * Model that represents the state of Myriad
  */
 public class MyriadState {
-    public static final String KEY_FRAMEWORK_ID = "frameworkId";
+  public static final String KEY_FRAMEWORK_ID = "frameworkId";
 
-    private State stateStore;
+  private State stateStore;
 
-    public MyriadState(State stateStore) {
-        this.stateStore = stateStore;
-    }
+  public MyriadState(State stateStore) {
+    this.stateStore = stateStore;
+  }
 
-    public Protos.FrameworkID getFrameworkID() throws InterruptedException, ExecutionException, InvalidProtocolBufferException {
-        byte[] frameworkId = stateStore.fetch(KEY_FRAMEWORK_ID).get().value();
+  public Protos.FrameworkID getFrameworkID() throws InterruptedException, ExecutionException, InvalidProtocolBufferException {
+    byte[] frameworkId = stateStore.fetch(KEY_FRAMEWORK_ID).get().value();
 
-        if (frameworkId.length > 0) {
-            return Protos.FrameworkID.parseFrom(frameworkId);
-        } else {
-            return null;
-        }
+    if (frameworkId.length > 0) {
+      return Protos.FrameworkID.parseFrom(frameworkId);
+    } else {
+      return null;
     }
+  }
 
-    public void setFrameworkId(Protos.FrameworkID newFrameworkId) throws InterruptedException, ExecutionException {
-        Variable frameworkId = stateStore.fetch(KEY_FRAMEWORK_ID).get();
-        frameworkId = frameworkId.mutate(newFrameworkId.toByteArray());
-        stateStore.store(frameworkId).get();
-    }
+  public void setFrameworkId(Protos.FrameworkID newFrameworkId) throws InterruptedException, ExecutionException {
+    Variable frameworkId = stateStore.fetch(KEY_FRAMEWORK_ID).get();
+    frameworkId = frameworkId.mutate(newFrameworkId.toByteArray());
+    stateStore.store(frameworkId).get();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java
index d02fab9..99ab327 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,7 +22,7 @@ package com.ebay.myriad.state;
 import com.ebay.myriad.state.utils.StoreContext;
 
 /**
- * Interface implemented by all Myriad State Store implementations 
+ * Interface implemented by all Myriad State Store implementations
  */
 public interface MyriadStateStore {
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
index 354c575..d784092 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -32,90 +32,90 @@ import org.apache.mesos.Protos.Attribute;
  * Represents a task to be launched by the executor
  */
 public class NodeTask {
-    @JsonProperty
-    private String hostname;
-    @JsonProperty
-    private Protos.SlaveID slaveId;
-    @JsonProperty
-    private Protos.TaskStatus taskStatus;
-    @JsonProperty
-    private String taskPrefix;
-    @JsonProperty
-    private ServiceResourceProfile serviceresourceProfile;
-
-    @Inject
-    TaskUtils taskUtils;
-    /**
-     * Mesos executor for this node.
-     */
-    private Protos.ExecutorInfo executorInfo;
-
-    private Constraint constraint;
-    private List<Attribute> slaveAttributes;
-
-    public NodeTask(ServiceResourceProfile profile, Constraint constraint) {
-        this.serviceresourceProfile = profile;
-        this.hostname = "";
-        this.constraint = constraint;
-    }
-
-    public Protos.SlaveID getSlaveId() {
-        return slaveId;
-    }
-
-    public void setSlaveId(Protos.SlaveID slaveId) {
-        this.slaveId = slaveId;
-    }
-
-    public Constraint getConstraint() {
-      return constraint;
-    }
-
-    public String getHostname() {
-        return this.hostname;
-    }
-
-    public void setHostname(String hostname) {
-        this.hostname = hostname;
-    }
-
-    public Protos.TaskStatus getTaskStatus() {
-        return taskStatus;
-    }
-
-    public void setTaskStatus(Protos.TaskStatus taskStatus) {
-        this.taskStatus = taskStatus;
-    }
-
-    public Protos.ExecutorInfo getExecutorInfo() {
-        return executorInfo;
-    }
-
-    public void setExecutorInfo(Protos.ExecutorInfo executorInfo) {
-        this.executorInfo = executorInfo;
-    }
-
-    public void setSlaveAttributes(List<Attribute> slaveAttributes) {
-      this.slaveAttributes = slaveAttributes;
-    }
-
-    public List<Attribute> getSlaveAttributes() {
-      return slaveAttributes;
-    }
-
-    public String getTaskPrefix() {
-      return taskPrefix;
-    }
-
-    public void setTaskPrefix(String taskPrefix) {
-      this.taskPrefix = taskPrefix;
-    }
-
-    public ServiceResourceProfile getProfile() {
-      return serviceresourceProfile;
-    }
-
-    public void setProfile(ServiceResourceProfile serviceresourceProfile) {
-      this.serviceresourceProfile = serviceresourceProfile;
-    }
+  @JsonProperty
+  private String hostname;
+  @JsonProperty
+  private Protos.SlaveID slaveId;
+  @JsonProperty
+  private Protos.TaskStatus taskStatus;
+  @JsonProperty
+  private String taskPrefix;
+  @JsonProperty
+  private ServiceResourceProfile serviceresourceProfile;
+
+  @Inject
+  TaskUtils taskUtils;
+  /**
+   * Mesos executor for this node.
+   */
+  private Protos.ExecutorInfo executorInfo;
+
+  private Constraint constraint;
+  private List<Attribute> slaveAttributes;
+
+  public NodeTask(ServiceResourceProfile profile, Constraint constraint) {
+    this.serviceresourceProfile = profile;
+    this.hostname = "";
+    this.constraint = constraint;
+  }
+
+  public Protos.SlaveID getSlaveId() {
+    return slaveId;
+  }
+
+  public void setSlaveId(Protos.SlaveID slaveId) {
+    this.slaveId = slaveId;
+  }
+
+  public Constraint getConstraint() {
+    return constraint;
+  }
+
+  public String getHostname() {
+    return this.hostname;
+  }
+
+  public void setHostname(String hostname) {
+    this.hostname = hostname;
+  }
+
+  public Protos.TaskStatus getTaskStatus() {
+    return taskStatus;
+  }
+
+  public void setTaskStatus(Protos.TaskStatus taskStatus) {
+    this.taskStatus = taskStatus;
+  }
+
+  public Protos.ExecutorInfo getExecutorInfo() {
+    return executorInfo;
+  }
+
+  public void setExecutorInfo(Protos.ExecutorInfo executorInfo) {
+    this.executorInfo = executorInfo;
+  }
+
+  public void setSlaveAttributes(List<Attribute> slaveAttributes) {
+    this.slaveAttributes = slaveAttributes;
+  }
+
+  public List<Attribute> getSlaveAttributes() {
+    return slaveAttributes;
+  }
+
+  public String getTaskPrefix() {
+    return taskPrefix;
+  }
+
+  public void setTaskPrefix(String taskPrefix) {
+    this.taskPrefix = taskPrefix;
+  }
+
+  public ServiceResourceProfile getProfile() {
+    return serviceresourceProfile;
+  }
+
+  public void setProfile(ServiceResourceProfile serviceresourceProfile) {
+    this.serviceresourceProfile = serviceresourceProfile;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
index 08a4dfa..99a7506 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -44,411 +44,398 @@ import org.slf4j.LoggerFactory;
  * Represents the state of the Myriad scheduler
  */
 public class SchedulerState {
-    private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerState.class);
-
-    private static Pattern taskIdPattern = Pattern.compile("\\.");
-    
-    private Map<Protos.TaskID, NodeTask> tasks;
-    private Protos.FrameworkID frameworkId;
-    private MyriadStateStore stateStore;
-    private Map<String, SchedulerStateForType> statesForTaskType;
-
-    public SchedulerState(MyriadStateStore stateStore) {
-        this.tasks = new ConcurrentHashMap<>();
-        this.stateStore = stateStore;
-        this.statesForTaskType = new ConcurrentHashMap<>();
-        loadStateStore();
-    }
-
-    /**
-     * Making method synchronized, so if someone tries flexup/down at the same time
-     * addNodes and removeTask will not put data into an inconsistent state
-     * @param nodes
-     */
-    public synchronized void addNodes(Collection<NodeTask> nodes) {
-        if (CollectionUtils.isEmpty(nodes)) {
-            LOGGER.info("No nodes to add");
-            return;
-        }
-        for (NodeTask node : nodes) {
-            Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue(String.format("%s.%s.%s", node.getTaskPrefix(), node.getProfile().getName(), UUID.randomUUID()))
-                    .build();
-            addTask(taskId, node);
-            SchedulerStateForType taskState = this.statesForTaskType.get(node.getTaskPrefix());
-            LOGGER.info("Marked taskId {} pending, size of pending queue for {} is: {}", taskId.getValue(), node.getTaskPrefix(), 
-                (taskState == null ? 0 : taskState.getPendingTaskIds().size()));
-            makeTaskPending(taskId);
-        }
+  private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerState.class);
 
-    }
+  private static Pattern taskIdPattern = Pattern.compile("\\.");
 
-    // TODO (sdaingade) Clone NodeTask
-    public synchronized void addTask(Protos.TaskID taskId, NodeTask node) {
-        this.tasks.put(taskId, node);
-        updateStateStore();
-    }
+  private Map<Protos.TaskID, NodeTask> tasks;
+  private Protos.FrameworkID frameworkId;
+  private MyriadStateStore stateStore;
+  private Map<String, SchedulerStateForType> statesForTaskType;
 
-    public synchronized void updateTask(Protos.TaskStatus taskStatus) {
-        Objects.requireNonNull(taskStatus, "TaskStatus object shouldn't be null");
-        Protos.TaskID taskId = taskStatus.getTaskId();
-        if (this.tasks.containsKey(taskId)) {
-            this.tasks.get(taskId).setTaskStatus(taskStatus);
-        }
-        updateStateStore();
-    }
+  public SchedulerState(MyriadStateStore stateStore) {
+    this.tasks = new ConcurrentHashMap<>();
+    this.stateStore = stateStore;
+    this.statesForTaskType = new ConcurrentHashMap<>();
+    loadStateStore();
+  }
 
-    public synchronized void makeTaskPending(Protos.TaskID taskId) {
-        Objects.requireNonNull(taskId,
-                "taskId cannot be empty or null");
-        String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
-        SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
-        if (taskTypeState == null) {
-          taskTypeState = new SchedulerStateForType(taskPrefix);
-          statesForTaskType.put(taskPrefix, taskTypeState);
-        }
-        taskTypeState.makeTaskPending(taskId);
-        updateStateStore();
+  /**
+   * Making method synchronized, so if someone tries flexup/down at the same time
+   * addNodes and removeTask will not put data into an inconsistent state
+   *
+   * @param nodes
+   */
+  public synchronized void addNodes(Collection<NodeTask> nodes) {
+    if (CollectionUtils.isEmpty(nodes)) {
+      LOGGER.info("No nodes to add");
+      return;
+    }
+    for (NodeTask node : nodes) {
+      Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue(String.format("%s.%s.%s", node.getTaskPrefix(), node.getProfile().getName(), UUID.randomUUID())).build();
+      addTask(taskId, node);
+      SchedulerStateForType taskState = this.statesForTaskType.get(node.getTaskPrefix());
+      LOGGER.info("Marked taskId {} pending, size of pending queue for {} is: {}", taskId.getValue(), node.getTaskPrefix(), (taskState == null ? 0 : taskState.getPendingTaskIds().size()));
+      makeTaskPending(taskId);
     }
 
-    public synchronized void makeTaskStaging(Protos.TaskID taskId) {
-        Objects.requireNonNull(taskId,
-                "taskId cannot be empty or null");
-        String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
-        SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
-        if (taskTypeState == null) {
-          taskTypeState = new SchedulerStateForType(taskPrefix);
-          statesForTaskType.put(taskPrefix, taskTypeState);
-        }
-        taskTypeState.makeTaskStaging(taskId);
-        updateStateStore();
-    }
+  }
 
-    public synchronized void makeTaskActive(Protos.TaskID taskId) {
-        Objects.requireNonNull(taskId,
-                "taskId cannot be empty or null");
-        String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
-        SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
-        if (taskTypeState == null) {
-          taskTypeState = new SchedulerStateForType(taskPrefix);
-          statesForTaskType.put(taskPrefix, taskTypeState);
-        }
-        taskTypeState.makeTaskActive(taskId);
-        updateStateStore();
-    }
+  // TODO (sdaingade) Clone NodeTask
+  public synchronized void addTask(Protos.TaskID taskId, NodeTask node) {
+    this.tasks.put(taskId, node);
+    updateStateStore();
+  }
 
-    public synchronized void makeTaskLost(Protos.TaskID taskId) {
-        Objects.requireNonNull(taskId,
-                "taskId cannot be empty or null");
-        String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
-        SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
-        if (taskTypeState == null) {
-          taskTypeState = new SchedulerStateForType(taskPrefix);
-          statesForTaskType.put(taskPrefix, taskTypeState);
-        }
-        taskTypeState.makeTaskLost(taskId);
-        updateStateStore();
+  public synchronized void updateTask(Protos.TaskStatus taskStatus) {
+    Objects.requireNonNull(taskStatus, "TaskStatus object shouldn't be null");
+    Protos.TaskID taskId = taskStatus.getTaskId();
+    if (this.tasks.containsKey(taskId)) {
+      this.tasks.get(taskId).setTaskStatus(taskStatus);
     }
+    updateStateStore();
+  }
 
-    public synchronized void makeTaskKillable(Protos.TaskID taskId) {
-        Objects.requireNonNull(taskId,
-                "taskId cannot be empty or null");
-        String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
-        SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
-        if (taskTypeState == null) {
-          taskTypeState = new SchedulerStateForType(taskPrefix);
-          statesForTaskType.put(taskPrefix, taskTypeState);
-        }
-        taskTypeState.makeTaskKillable(taskId);
-        updateStateStore();
+  public synchronized void makeTaskPending(Protos.TaskID taskId) {
+    Objects.requireNonNull(taskId, "taskId cannot be empty or null");
+    String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+    SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
+    if (taskTypeState == null) {
+      taskTypeState = new SchedulerStateForType(taskPrefix);
+      statesForTaskType.put(taskPrefix, taskTypeState);
     }
+    taskTypeState.makeTaskPending(taskId);
+    updateStateStore();
+  }
 
-    // TODO (sdaingade) Clone NodeTask
-    public synchronized NodeTask getTask(Protos.TaskID taskId) {
-        return this.tasks.get(taskId);
+  public synchronized void makeTaskStaging(Protos.TaskID taskId) {
+    Objects.requireNonNull(taskId, "taskId cannot be empty or null");
+    String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+    SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
+    if (taskTypeState == null) {
+      taskTypeState = new SchedulerStateForType(taskPrefix);
+      statesForTaskType.put(taskPrefix, taskTypeState);
     }
+    taskTypeState.makeTaskStaging(taskId);
+    updateStateStore();
+  }
 
-    public synchronized Set<Protos.TaskID> getKillableTasks() {
-      Set<Protos.TaskID> returnSet = new HashSet<>();
-      for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) {
-        returnSet.addAll(entry.getValue().getKillableTasks());
-      }
-      return returnSet;
+  public synchronized void makeTaskActive(Protos.TaskID taskId) {
+    Objects.requireNonNull(taskId, "taskId cannot be empty or null");
+    String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+    SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
+    if (taskTypeState == null) {
+      taskTypeState = new SchedulerStateForType(taskPrefix);
+      statesForTaskType.put(taskPrefix, taskTypeState);
     }
+    taskTypeState.makeTaskActive(taskId);
+    updateStateStore();
+  }
 
-    public synchronized Set<Protos.TaskID> getKillableTasks(String taskPrefix) {
-      SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
-      return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getKillableTasks());
+  public synchronized void makeTaskLost(Protos.TaskID taskId) {
+    Objects.requireNonNull(taskId, "taskId cannot be empty or null");
+    String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+    SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
+    if (taskTypeState == null) {
+      taskTypeState = new SchedulerStateForType(taskPrefix);
+      statesForTaskType.put(taskPrefix, taskTypeState);
     }
-    
-    public synchronized void removeTask(Protos.TaskID taskId) {
-      String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
-      SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
-      if (taskTypeState != null) {
-        taskTypeState.removeTask(taskId);
-      }
-      this.tasks.remove(taskId);
-      updateStateStore();
+    taskTypeState.makeTaskLost(taskId);
+    updateStateStore();
+  }
+
+  public synchronized void makeTaskKillable(Protos.TaskID taskId) {
+    Objects.requireNonNull(taskId, "taskId cannot be empty or null");
+    String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+    SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
+    if (taskTypeState == null) {
+      taskTypeState = new SchedulerStateForType(taskPrefix);
+      statesForTaskType.put(taskPrefix, taskTypeState);
     }
+    taskTypeState.makeTaskKillable(taskId);
+    updateStateStore();
+  }
 
-    public synchronized Set<Protos.TaskID> getPendingTaskIds() {
-      Set<Protos.TaskID> returnSet = new HashSet<>();
-      for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) {
-        returnSet.addAll(entry.getValue().getPendingTaskIds());
-      }
-      return returnSet;
+  // TODO (sdaingade) Clone NodeTask
+  public synchronized NodeTask getTask(Protos.TaskID taskId) {
+    return this.tasks.get(taskId);
+  }
+
+  public synchronized Set<Protos.TaskID> getKillableTasks() {
+    Set<Protos.TaskID> returnSet = new HashSet<>();
+    for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) {
+      returnSet.addAll(entry.getValue().getKillableTasks());
     }
+    return returnSet;
+  }
 
-    public synchronized Collection<Protos.TaskID> getPendingTaskIDsForProfile(ServiceResourceProfile serviceProfile) {
-      List<Protos.TaskID> pendingTaskIds = new ArrayList<>();
-      Set<Protos.TaskID> pendingTasks = getPendingTaskIds();
-      for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
-        NodeTask nodeTask = entry.getValue();
-        if (pendingTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(serviceProfile.getName())) {
-          pendingTaskIds.add(entry.getKey());
-        }
-      }
-      return Collections.unmodifiableCollection(pendingTaskIds);
+  public synchronized Set<Protos.TaskID> getKillableTasks(String taskPrefix) {
+    SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
+    return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getKillableTasks());
+  }
+
+  public synchronized void removeTask(Protos.TaskID taskId) {
+    String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+    SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
+    if (taskTypeState != null) {
+      taskTypeState.removeTask(taskId);
     }
+    this.tasks.remove(taskId);
+    updateStateStore();
+  }
 
-    public synchronized Set<Protos.TaskID> getPendingTaskIds(String taskPrefix) {
-      SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
-      return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getPendingTaskIds());  
+  public synchronized Set<Protos.TaskID> getPendingTaskIds() {
+    Set<Protos.TaskID> returnSet = new HashSet<>();
+    for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) {
+      returnSet.addAll(entry.getValue().getPendingTaskIds());
     }
+    return returnSet;
+  }
 
-    public synchronized Set<Protos.TaskID> getActiveTaskIds() {
-      Set<Protos.TaskID> returnSet = new HashSet<>();
-      for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) {
-        returnSet.addAll(entry.getValue().getActiveTaskIds());
+  public synchronized Collection<Protos.TaskID> getPendingTaskIDsForProfile(ServiceResourceProfile serviceProfile) {
+    List<Protos.TaskID> pendingTaskIds = new ArrayList<>();
+    Set<Protos.TaskID> pendingTasks = getPendingTaskIds();
+    for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
+      NodeTask nodeTask = entry.getValue();
+      if (pendingTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(serviceProfile.getName())) {
+        pendingTaskIds.add(entry.getKey());
       }
-      return returnSet;
     }
+    return Collections.unmodifiableCollection(pendingTaskIds);
+  }
 
-    public synchronized Set<Protos.TaskID> getActiveTaskIds(String taskPrefix) {
-      SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
-      return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getActiveTaskIds());
-    }
+  public synchronized Set<Protos.TaskID> getPendingTaskIds(String taskPrefix) {
+    SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
+    return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getPendingTaskIds());
+  }
 
-    public synchronized Set<NodeTask> getActiveTasks() {
-        return getTasks(getActiveTaskIds());
+  public synchronized Set<Protos.TaskID> getActiveTaskIds() {
+    Set<Protos.TaskID> returnSet = new HashSet<>();
+    for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) {
+      returnSet.addAll(entry.getValue().getActiveTaskIds());
     }
+    return returnSet;
+  }
 
-    public Set<NodeTask> getActiveTasksByType(String taskPrefix) {
-        return getTasks(getActiveTaskIds(taskPrefix));
-    }
+  public synchronized Set<Protos.TaskID> getActiveTaskIds(String taskPrefix) {
+    SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
+    return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getActiveTaskIds());
+  }
 
-    public Set<NodeTask> getStagingTasks() {
-        return getTasks(getStagingTaskIds());
-    }
+  public synchronized Set<NodeTask> getActiveTasks() {
+    return getTasks(getActiveTaskIds());
+  }
 
-    public Set<NodeTask> getStagingTasksByType(String taskPrefix) {
-        return getTasks(getStagingTaskIds(taskPrefix));
-    }
+  public Set<NodeTask> getActiveTasksByType(String taskPrefix) {
+    return getTasks(getActiveTaskIds(taskPrefix));
+  }
 
-    public Set<NodeTask> getPendingTasksByType(String taskPrefix) {
-        return getTasks(getPendingTaskIds(taskPrefix));
-    }
+  public Set<NodeTask> getStagingTasks() {
+    return getTasks(getStagingTaskIds());
+  }
 
-    public synchronized Set<NodeTask> getTasks(Set<Protos.TaskID> taskIds) {
-        Set<NodeTask> nodeTasks = new HashSet<>();
-        if (CollectionUtils.isNotEmpty(taskIds)
-            && CollectionUtils.isNotEmpty(tasks.values())) {
-            for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
-                if (taskIds.contains(entry.getKey())) {
-                    nodeTasks.add(entry.getValue());
-                }
-            }
-        }
-            return Collections.unmodifiableSet(nodeTasks);
-    }
-
-    public synchronized Collection<Protos.TaskID> getActiveTaskIDsForProfile(ServiceResourceProfile serviceProfile) {
-      List<Protos.TaskID> activeTaskIDs = new ArrayList<>();
-      Set<Protos.TaskID> activeTaskIds = getActiveTaskIds();
-      if (CollectionUtils.isNotEmpty(activeTaskIds)
-          && CollectionUtils.isNotEmpty(tasks.values())) {
-        for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
-          NodeTask nodeTask = entry.getValue();
-          if (activeTaskIds.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(serviceProfile.getName())) {
-            activeTaskIDs.add(entry.getKey());
-          }
-        }
-      }
-      return Collections.unmodifiableCollection(activeTaskIDs);
-    }
+  public Set<NodeTask> getStagingTasksByType(String taskPrefix) {
+    return getTasks(getStagingTaskIds(taskPrefix));
+  }
 
-   // TODO (sdaingade) Clone NodeTask
-    public synchronized NodeTask getNodeTask(SlaveID slaveId, String taskPrefix) {
-      if (taskPrefix == null) {
-        return null;
-      }
+  public Set<NodeTask> getPendingTasksByType(String taskPrefix) {
+    return getTasks(getPendingTaskIds(taskPrefix));
+  }
+
+  public synchronized Set<NodeTask> getTasks(Set<Protos.TaskID> taskIds) {
+    Set<NodeTask> nodeTasks = new HashSet<>();
+    if (CollectionUtils.isNotEmpty(taskIds) && CollectionUtils.isNotEmpty(tasks.values())) {
       for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
-        final NodeTask task = entry.getValue();
-        if (task.getSlaveId() != null &&
-            task.getSlaveId().equals(slaveId) &&
-            taskPrefix.equals(task.getTaskPrefix())) {
-            return entry.getValue(); 
+        if (taskIds.contains(entry.getKey())) {
+          nodeTasks.add(entry.getValue());
         }
       }
-      return null;
     }
+    return Collections.unmodifiableSet(nodeTasks);
+  }
 
-    public synchronized Set<NodeTask> getNodeTasks(SlaveID slaveId) {
-      Set<NodeTask> nodeTasks = Sets.newHashSet();
+  public synchronized Collection<Protos.TaskID> getActiveTaskIDsForProfile(ServiceResourceProfile serviceProfile) {
+    List<Protos.TaskID> activeTaskIDs = new ArrayList<>();
+    Set<Protos.TaskID> activeTaskIds = getActiveTaskIds();
+    if (CollectionUtils.isNotEmpty(activeTaskIds) && CollectionUtils.isNotEmpty(tasks.values())) {
       for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
-        final NodeTask task = entry.getValue();
-        if (task.getSlaveId() != null &&
-            task.getSlaveId().equals(slaveId)) {
-          nodeTasks.add(entry.getValue()); 
+        NodeTask nodeTask = entry.getValue();
+        if (activeTaskIds.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(serviceProfile.getName())) {
+          activeTaskIDs.add(entry.getKey());
         }
       }
-      return nodeTasks;
     }
+    return Collections.unmodifiableCollection(activeTaskIDs);
+  }
 
-    public Set<Protos.TaskID> getStagingTaskIds() {
-      Set<Protos.TaskID> returnSet = new HashSet<>();
-      for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) {
-        returnSet.addAll(entry.getValue().getStagingTaskIds());
+  // TODO (sdaingade) Clone NodeTask
+  public synchronized NodeTask getNodeTask(SlaveID slaveId, String taskPrefix) {
+    if (taskPrefix == null) {
+      return null;
+    }
+    for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
+      final NodeTask task = entry.getValue();
+      if (task.getSlaveId() != null &&
+          task.getSlaveId().equals(slaveId) &&
+          taskPrefix.equals(task.getTaskPrefix())) {
+        return entry.getValue();
       }
-      return returnSet;
     }
+    return null;
+  }
 
-    public synchronized Collection<Protos.TaskID> getStagingTaskIDsForProfile(ServiceResourceProfile serviceProfile) {
-      List<Protos.TaskID> stagingTaskIDs = new ArrayList<>();
-      
-      Set<Protos.TaskID> stagingTasks = getStagingTaskIds();
-      for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
-        NodeTask nodeTask = entry.getValue();
-        if (stagingTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(serviceProfile.getName())) {
-          stagingTaskIDs.add(entry.getKey());
-        }
+  public synchronized Set<NodeTask> getNodeTasks(SlaveID slaveId) {
+    Set<NodeTask> nodeTasks = Sets.newHashSet();
+    for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
+      final NodeTask task = entry.getValue();
+      if (task.getSlaveId() != null && task.getSlaveId().equals(slaveId)) {
+        nodeTasks.add(entry.getValue());
       }
-      return Collections.unmodifiableCollection(stagingTaskIDs);
     }
+    return nodeTasks;
+  }
 
-    public Set<Protos.TaskID> getStagingTaskIds(String taskPrefix) {
-      SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
-      return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getStagingTaskIds());
+  public Set<Protos.TaskID> getStagingTaskIds() {
+    Set<Protos.TaskID> returnSet = new HashSet<>();
+    for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) {
+      returnSet.addAll(entry.getValue().getStagingTaskIds());
     }
-    
-    public Set<Protos.TaskID> getLostTaskIds() {
-      Set<Protos.TaskID> returnSet = new HashSet<>();
-      for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) {
-        returnSet.addAll(entry.getValue().getLostTaskIds());
+    return returnSet;
+  }
+
+  public synchronized Collection<Protos.TaskID> getStagingTaskIDsForProfile(ServiceResourceProfile serviceProfile) {
+    List<Protos.TaskID> stagingTaskIDs = new ArrayList<>();
+
+    Set<Protos.TaskID> stagingTasks = getStagingTaskIds();
+    for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
+      NodeTask nodeTask = entry.getValue();
+      if (stagingTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(serviceProfile.getName())) {
+        stagingTaskIDs.add(entry.getKey());
       }
-      return returnSet;
-    }
-    
-    public Set<Protos.TaskID> getLostTaskIds(String taskPrefix) {
-      SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
-      return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getLostTaskIds());
-    }
-
-    // TODO (sdaingade) Currently cannot return unmodifiableCollection
-    // as this will break ReconcileService code
-    public synchronized Collection<Protos.TaskStatus> getTaskStatuses() {
-        Collection<Protos.TaskStatus> taskStatuses = new ArrayList<>(this.tasks.size());
-        Collection<NodeTask> tasks = this.tasks.values();
-        for (NodeTask task : tasks) {
-            Protos.TaskStatus taskStatus = task.getTaskStatus();
-            if (taskStatus != null) {
-                taskStatuses.add(taskStatus);
-            }
-        }
+    }
+    return Collections.unmodifiableCollection(stagingTaskIDs);
+  }
 
-        return taskStatuses;
+  public Set<Protos.TaskID> getStagingTaskIds(String taskPrefix) {
+    SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
+    return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getStagingTaskIds());
+  }
+
+  public Set<Protos.TaskID> getLostTaskIds() {
+    Set<Protos.TaskID> returnSet = new HashSet<>();
+    for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) {
+      returnSet.addAll(entry.getValue().getLostTaskIds());
     }
+    return returnSet;
+  }
+
+  public Set<Protos.TaskID> getLostTaskIds(String taskPrefix) {
+    SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
+    return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getLostTaskIds());
+  }
 
-    public synchronized boolean hasTask(Protos.TaskID taskID) {
-        return this.tasks.containsKey(taskID);
+  // TODO (sdaingade) Currently cannot return unmodifiableCollection
+  // as this will break ReconcileService code
+  public synchronized Collection<Protos.TaskStatus> getTaskStatuses() {
+    Collection<Protos.TaskStatus> taskStatuses = new ArrayList<>(this.tasks.size());
+    Collection<NodeTask> tasks = this.tasks.values();
+    for (NodeTask task : tasks) {
+      Protos.TaskStatus taskStatus = task.getTaskStatus();
+      if (taskStatus != null) {
+        taskStatuses.add(taskStatus);
+      }
     }
 
-    public synchronized Protos.FrameworkID getFrameworkID() {
-         return this.frameworkId;
+    return taskStatuses;
+  }
+
+  public synchronized boolean hasTask(Protos.TaskID taskID) {
+    return this.tasks.containsKey(taskID);
+  }
+
+  public synchronized Protos.FrameworkID getFrameworkID() {
+    return this.frameworkId;
+  }
+
+  public synchronized void setFrameworkId(Protos.FrameworkID newFrameworkId) {
+    this.frameworkId = newFrameworkId;
+    updateStateStore();
+  }
+
+  private synchronized void updateStateStore() {
+    if (this.stateStore == null) {
+      LOGGER.debug("Could not update state to state store as HA is disabled");
+      return;
     }
 
-    public synchronized void setFrameworkId(Protos.FrameworkID newFrameworkId) {
-        this.frameworkId = newFrameworkId;
-        updateStateStore();
+    try {
+      StoreContext sc = new StoreContext(frameworkId, tasks, getPendingTaskIds(), getStagingTaskIds(), getActiveTaskIds(), getLostTaskIds(), getKillableTasks());
+      stateStore.storeMyriadState(sc);
+    } catch (Exception e) {
+      LOGGER.error("Failed to update scheduler state to state store", e);
     }
+  }
 
-    private synchronized void updateStateStore() {
-        if (this.stateStore == null) {
-            LOGGER.debug("Could not update state to state store as HA is disabled");
-            return;
-        }
+  private synchronized void loadStateStore() {
+    if (this.stateStore == null) {
+      LOGGER.debug("Could not load state from state store as HA is disabled");
+      return;
+    }
+
+    try {
+      StoreContext sc = stateStore.loadMyriadState();
+      if (sc != null) {
+        this.frameworkId = sc.getFrameworkId();
+        this.tasks.putAll(sc.getTasks());
+        convertToThis(TaskState.PENDING, sc.getPendingTasks());
+        convertToThis(TaskState.STAGING, sc.getStagingTasks());
+        convertToThis(TaskState.ACTIVE, sc.getActiveTasks());
+        convertToThis(TaskState.LOST, sc.getLostTasks());
+        convertToThis(TaskState.KILLABLE, sc.getKillableTasks());
+        LOGGER.info("Loaded Myriad state from state store successfully.");
+        LOGGER.debug("State Store state includes " +
+            "frameworkId: {}, pending tasks count: {}, staging tasks count: {} " +
+            "active tasks count: {}, lost tasks count: {}, " +
+            "and killable tasks count: {}", frameworkId.getValue(), this.getPendingTaskIds().size(), this.getStagingTaskIds().size(), this.getActiveTaskIds().size(), this.getLostTaskIds().size(), this.getKillableTasks().size());
+      }
+    } catch (Exception e) {
+      LOGGER.error("Failed to read scheduler state from state store", e);
+    }
+  }
 
-        try {
-            StoreContext sc = new StoreContext(frameworkId, tasks, getPendingTaskIds(),
-                getStagingTaskIds(), getActiveTaskIds(), getLostTaskIds(), getKillableTasks());
-            stateStore.storeMyriadState(sc);
-        } catch (Exception e) {
-            LOGGER.error("Failed to update scheduler state to state store", e);
-        }
+  private void convertToThis(TaskState taskType, Set<Protos.TaskID> taskIds) {
+    for (Protos.TaskID taskId : taskIds) {
+      String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+      SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
+      if (taskTypeState == null) {
+        taskTypeState = new SchedulerStateForType(taskPrefix);
+        statesForTaskType.put(taskPrefix, taskTypeState);
+      }
+      switch (taskType) {
+        case PENDING:
+          taskTypeState.makeTaskPending(taskId);
+          break;
+        case STAGING:
+          taskTypeState.makeTaskStaging(taskId);
+          break;
+        case ACTIVE:
+          taskTypeState.makeTaskActive(taskId);
+          break;
+        case KILLABLE:
+          taskTypeState.makeTaskKillable(taskId);
+          break;
+        case LOST:
+          taskTypeState.makeTaskLost(taskId);
+          break;
+      }
     }
+  }
 
-    private synchronized void loadStateStore() {
-        if (this.stateStore == null) {
-            LOGGER.debug("Could not load state from state store as HA is disabled");
-            return;
-        }
+  /**
+   * Class to keep all the tasks states for a particular taskPrefix together
+   */
+  private static class SchedulerStateForType {
 
-        try {
-            StoreContext sc = stateStore.loadMyriadState();
-            if (sc != null) {
-                this.frameworkId = sc.getFrameworkId();
-                this.tasks.putAll(sc.getTasks());
-                convertToThis(TaskState.PENDING, sc.getPendingTasks());
-                convertToThis(TaskState.STAGING, sc.getStagingTasks());
-                convertToThis(TaskState.ACTIVE, sc.getActiveTasks());
-                convertToThis(TaskState.LOST, sc.getLostTasks());
-                convertToThis(TaskState.KILLABLE, sc.getKillableTasks());
-                LOGGER.info("Loaded Myriad state from state store successfully.");
-                LOGGER.debug("State Store state includes " +
-                  "frameworkId: {}, pending tasks count: {}, staging tasks count: {} " +
-                  "active tasks count: {}, lost tasks count: {}, " +
-                  "and killable tasks count: {}", frameworkId.getValue(),
-                  this.getPendingTaskIds().size(), this.getStagingTaskIds().size(),
-                  this.getActiveTaskIds().size(), this.getLostTaskIds().size(),
-                  this.getKillableTasks().size());
-            }
-        }  catch (Exception e) {
-            LOGGER.error("Failed to read scheduler state from state store", e);
-        }
-   }
-
-   private void convertToThis(TaskState taskType, Set<Protos.TaskID> taskIds) {
-     for (Protos.TaskID taskId : taskIds) {
-       String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
-       SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
-       if (taskTypeState == null) {
-         taskTypeState = new SchedulerStateForType(taskPrefix);
-         statesForTaskType.put(taskPrefix, taskTypeState);
-       }
-       switch(taskType) {
-       case PENDING:
-         taskTypeState.makeTaskPending(taskId);
-         break;
-       case STAGING:
-         taskTypeState.makeTaskStaging(taskId);
-         break;
-       case ACTIVE:
-         taskTypeState.makeTaskActive(taskId);
-         break;
-       case KILLABLE:
-         taskTypeState.makeTaskKillable(taskId);
-         break;
-       case LOST:
-         taskTypeState.makeTaskLost(taskId);
-         break;
-       }
-     }
-   }
-   /**
-    * Class to keep all the tasks states for a particular taskPrefix together
-    *
-    */
-   private static class SchedulerStateForType {
-      
     private final String taskPrefix;
     private Set<Protos.TaskID> pendingTasks;
     private Set<Protos.TaskID> stagingTasks;
@@ -467,15 +454,15 @@ public class SchedulerState {
       this.killableTasks = Collections.newSetFromMap(new ConcurrentHashMap<Protos.TaskID, Boolean>());
 
     }
+
     @SuppressWarnings("unused")
     public String getTaskPrefix() {
       return taskPrefix;
     }
-    
+
     public synchronized void makeTaskPending(Protos.TaskID taskId) {
-      Objects.requireNonNull(taskId,
-              "taskId cannot be empty or null");
-      
+      Objects.requireNonNull(taskId, "taskId cannot be empty or null");
+
       pendingTasks.add(taskId);
       stagingTasks.remove(taskId);
       activeTasks.remove(taskId);
@@ -484,18 +471,16 @@ public class SchedulerState {
     }
 
     public synchronized void makeTaskStaging(Protos.TaskID taskId) {
-        Objects.requireNonNull(taskId,
-                "taskId cannot be empty or null");
-        pendingTasks.remove(taskId);
-        stagingTasks.add(taskId);
-        activeTasks.remove(taskId);
-        lostTasks.remove(taskId);
-        killableTasks.remove(taskId);
+      Objects.requireNonNull(taskId, "taskId cannot be empty or null");
+      pendingTasks.remove(taskId);
+      stagingTasks.add(taskId);
+      activeTasks.remove(taskId);
+      lostTasks.remove(taskId);
+      killableTasks.remove(taskId);
     }
 
     public synchronized void makeTaskActive(Protos.TaskID taskId) {
-      Objects.requireNonNull(taskId,
-              "taskId cannot be empty or null");
+      Objects.requireNonNull(taskId, "taskId cannot be empty or null");
       pendingTasks.remove(taskId);
       stagingTasks.remove(taskId);
       activeTasks.add(taskId);
@@ -504,8 +489,7 @@ public class SchedulerState {
     }
 
     public synchronized void makeTaskLost(Protos.TaskID taskId) {
-      Objects.requireNonNull(taskId,
-              "taskId cannot be empty or null");
+      Objects.requireNonNull(taskId, "taskId cannot be empty or null");
       pendingTasks.remove(taskId);
       stagingTasks.remove(taskId);
       activeTasks.remove(taskId);
@@ -514,15 +498,14 @@ public class SchedulerState {
     }
 
     public synchronized void makeTaskKillable(Protos.TaskID taskId) {
-      Objects.requireNonNull(taskId,
-              "taskId cannot be empty or null");
+      Objects.requireNonNull(taskId, "taskId cannot be empty or null");
       pendingTasks.remove(taskId);
       stagingTasks.remove(taskId);
       activeTasks.remove(taskId);
       lostTasks.remove(taskId);
       killableTasks.add(taskId);
     }
-    
+
     public synchronized void removeTask(Protos.TaskID taskId) {
       this.pendingTasks.remove(taskId);
       this.stagingTasks.remove(taskId);
@@ -530,7 +513,7 @@ public class SchedulerState {
       this.lostTasks.remove(taskId);
       this.killableTasks.remove(taskId);
     }
-    
+
     public synchronized Set<Protos.TaskID> getPendingTaskIds() {
       return Collections.unmodifiableSet(this.pendingTasks);
     }
@@ -552,15 +535,15 @@ public class SchedulerState {
     }
 
   }
-   /**
-    * TaskState type
-    *
-    */
-   public enum TaskState {
-     PENDING,
-     STAGING,
-     ACTIVE,
-     KILLABLE,
-     LOST
-   }
+
+  /**
+   * TaskState type
+   */
+  public enum TaskState {
+    PENDING,
+    STAGING,
+    ACTIVE,
+    KILLABLE,
+    LOST
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
index 4711a52..fcf2cb8 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,20 +39,17 @@ import com.google.gson.GsonBuilder;
 import com.google.protobuf.GeneratedMessage;
 
 /**
-* ByteBuffer support for the Serialization of the StoreContext
-*/
+ * ByteBuffer support for the Serialization of the StoreContext
+ */
 public class ByteBufferSupport {
 
   public static final int INT_SIZE = Integer.SIZE / Byte.SIZE;
   public static final String UTF8 = "UTF-8";
   public static final byte[] ZERO_BYTES = new byte[0];
   private static Gson gson = new Gson();
-  private static Gson gsonCustom = new GsonBuilder().
-      registerTypeAdapter(ServiceResourceProfile.class, new ServiceResourceProfile.CustomDeserializer()).
-      create();
-  
-  public static void addByteBuffers(List<ByteBuffer> list,
-    ByteArrayOutputStream bytes) throws IOException {
+  private static Gson gsonCustom = new GsonBuilder().registerTypeAdapter(ServiceResourceProfile.class, new ServiceResourceProfile.CustomDeserializer()).create();
+
+  public static void addByteBuffers(List<ByteBuffer> list, ByteArrayOutputStream bytes) throws IOException {
     // If list, add the list size, then the size of each buffer followed by the buffer.
     if (list != null) {
       bytes.write(toIntBytes(list.size()));
@@ -64,8 +61,7 @@ public class ByteBufferSupport {
     }
   }
 
-  public static void addByteBuffer(ByteBuffer bb,
-    ByteArrayOutputStream bytes) throws IOException {
+  public static void addByteBuffer(ByteBuffer bb, ByteArrayOutputStream bytes) throws IOException {
     if (bb != null && bytes != null) {
       bytes.write(toIntBytes(bb.array().length));
       bytes.write(bb.array());
@@ -139,11 +135,11 @@ public class ByteBufferSupport {
     } else {
       size += INT_SIZE;
     }
-    
+
     if (nt.getExecutorInfo() != null) {
-        size += nt.getExecutorInfo().getSerializedSize() + INT_SIZE;
+      size += nt.getExecutorInfo().getSerializedSize() + INT_SIZE;
     } else {
-        size += INT_SIZE;
+      size += INT_SIZE;
     }
 
     byte[] taskPrefixBytes = ZERO_BYTES;
@@ -151,7 +147,7 @@ public class ByteBufferSupport {
       taskPrefixBytes = toBytes(nt.getTaskPrefix());
       size += taskPrefixBytes.length + INT_SIZE;
     }
-    
+
     // Allocate and populate the buffer.
     ByteBuffer bb = createBuffer(size);
     putBytes(bb, profile);
@@ -197,11 +193,11 @@ public class ByteBufferSupport {
 
   /**
    * ByteBuffer is expected to have a NodeTask at its next position.
-  *
-  * @param bb
-  * @return NodeTask or null if buffer is empty. Can throw a RuntimeException
-  * if the buffer is not formatted correctly.
-  */
+   *
+   * @param bb
+   * @return NodeTask or null if buffer is empty. Can throw a RuntimeException
+   * if the buffer is not formatted correctly.
+   */
   public static NodeTask toNodeTask(ByteBuffer bb) {
     NodeTask nt = null;
     if (bb != null && bb.array().length > 0) {
@@ -261,7 +257,7 @@ public class ByteBufferSupport {
    * @return string from the next position, or "" if the size is zero
    */
   public static String toString(ByteBuffer bb) {
-    byte [] bytes = new byte[bb.getInt()];
+    byte[] bytes = new byte[bb.getInt()];
     String s = "";
     try {
       if (bytes.length > 0) {
@@ -269,8 +265,7 @@ public class ByteBufferSupport {
         s = new String(bytes, UTF8);
       }
     } catch (Exception e) {
-      throw new RuntimeException("ByteBuffer not in expected format," +
-        " failed to parse string bytes", e);
+      throw new RuntimeException("ByteBuffer not in expected format," + " failed to parse string bytes", e);
     }
     return s;
   }
@@ -314,8 +309,7 @@ public class ByteBufferSupport {
       try {
         return Protos.SlaveID.parseFrom(getBytes(bb, size));
       } catch (Exception e) {
-        throw new RuntimeException("ByteBuffer not in expected format," +
-          " failed to parse SlaveId bytes", e);
+        throw new RuntimeException("ByteBuffer not in expected format," + " failed to parse SlaveId bytes", e);
       }
     } else {
       return null;
@@ -328,8 +322,7 @@ public class ByteBufferSupport {
       try {
         return Protos.TaskStatus.parseFrom(getBytes(bb, size));
       } catch (Exception e) {
-        throw new RuntimeException("ByteBuffer not in expected format," +
-          " failed to parse TaskStatus bytes", e);
+        throw new RuntimeException("ByteBuffer not in expected format," + " failed to parse TaskStatus bytes", e);
       }
     } else {
       return null;
@@ -342,8 +335,7 @@ public class ByteBufferSupport {
       try {
         return Protos.ExecutorInfo.parseFrom(getBytes(bb, size));
       } catch (Exception e) {
-        throw new RuntimeException("ByteBuffer not in expected format," +
-          " failed to parse ExecutorInfo bytes", e);
+        throw new RuntimeException("ByteBuffer not in expected format," + " failed to parse ExecutorInfo bytes", e);
       }
     } else {
       return null;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java
index 364a4c3..2ffcaa7 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -37,10 +37,10 @@ import org.apache.mesos.Protos.TaskID;
 import com.ebay.myriad.state.NodeTask;
 
 /**
-* The purpose of this container/utility is to create a mechanism to serialize the SchedulerState
-* to RMStateStore and back. Json did not seem to handle the Protos fields very well so this was an
-* alternative approach.
-*/
+ * The purpose of this container/utility is to create a mechanism to serialize the SchedulerState
+ * to RMStateStore and back. Json did not seem to handle the Protos fields very well so this was an
+ * alternative approach.
+ */
 public final class StoreContext {
   private static Pattern taskIdPattern = Pattern.compile("\\.");
   private ByteBuffer frameworkId;
@@ -57,6 +57,7 @@ public final class StoreContext {
 
   /**
    * Accept all the SchedulerState maps and flatten them into lists of ByteBuffers
+   *
    * @param tasks
    * @param pendingTasks
    * @param stagingTasks
@@ -64,11 +65,8 @@ public final class StoreContext {
    * @param lostTasks
    * @param killableTasks
    */
-  public StoreContext(Protos.FrameworkID frameworkId,
-    Map<Protos.TaskID, NodeTask> tasks,
-    Set<Protos.TaskID> pendingTasks, Set<Protos.TaskID> stagingTasks,
-    Set<Protos.TaskID> activeTasks, Set<Protos.TaskID> lostTasks,
-    Set<Protos.TaskID> killableTasks) {
+  public StoreContext(Protos.FrameworkID frameworkId, Map<Protos.TaskID, NodeTask> tasks, Set<Protos.TaskID> pendingTasks, Set<Protos.TaskID> stagingTasks, Set<Protos.TaskID> activeTasks, Set<Protos.TaskID> lostTasks, Set<Protos.TaskID>
+      killableTasks) {
     setFrameworkId(frameworkId);
     setTasks(tasks);
     setPendingTasks(pendingTasks);
@@ -80,6 +78,7 @@ public final class StoreContext {
 
   /**
    * Accept list of ByteBuffers and re-create the SchedulerState maps.
+   *
    * @param framwrorkId
    * @param taskIds
    * @param taskNodes
@@ -89,11 +88,8 @@ public final class StoreContext {
    * @param lostTasks
    * @param killableTasks
    */
-  public StoreContext(ByteBuffer frameworkId,
-    List<ByteBuffer> taskIds, List<ByteBuffer> taskNodes,
-    List<ByteBuffer> pendingTasks, List<ByteBuffer> stagingTasks,
-    List<ByteBuffer> activeTasks, List<ByteBuffer> lostTasks,
-    List<ByteBuffer> killableTasks) {
+  public StoreContext(ByteBuffer frameworkId, List<ByteBuffer> taskIds, List<ByteBuffer> taskNodes, List<ByteBuffer> pendingTasks, List<ByteBuffer> stagingTasks, List<ByteBuffer> activeTasks, List<ByteBuffer> lostTasks, List<ByteBuffer>
+      killableTasks) {
     this.frameworkId = frameworkId;
     this.taskIds = taskIds;
     this.taskNodes = taskNodes;
@@ -106,6 +102,7 @@ public final class StoreContext {
 
   /**
    * Use this to gather bytes to push to the state store
+   *
    * @return byte stream of the state store context.
    * @throws IOException
    */
@@ -131,7 +128,7 @@ public final class StoreContext {
   @SuppressWarnings("unchecked")
   public static StoreContext fromSerializedBytes(byte bytes[]) {
     StoreContext ctx;
-    if (bytes != null && bytes.length > 0){
+    if (bytes != null && bytes.length > 0) {
       ByteBuffer bb = ByteBufferSupport.fillBuffer(bytes);
       ByteBuffer frameworkId = ByteBufferSupport.createBuffer(bb);
       List<ByteBuffer> taskIds = ByteBufferSupport.createBufferList(bb, bb.getInt());
@@ -141,8 +138,7 @@ public final class StoreContext {
       List<ByteBuffer> activeTasks = ByteBufferSupport.createBufferList(bb, bb.getInt());
       List<ByteBuffer> lostTasks = ByteBufferSupport.createBufferList(bb, bb.getInt());
       List<ByteBuffer> killableTasks = ByteBufferSupport.createBufferList(bb, bb.getInt());
-      ctx = new StoreContext(frameworkId, taskIds, taskNodes, pendingTasks, stagingTasks, activeTasks,
-        lostTasks, killableTasks);
+      ctx = new StoreContext(frameworkId, taskIds, taskNodes, pendingTasks, stagingTasks, activeTasks, lostTasks, killableTasks);
     } else {
       ctx = new StoreContext();
     }
@@ -173,7 +169,7 @@ public final class StoreContext {
   }
 
   /**
-   * Serialize the Protos.FrameworkID into a ByteBuffer.  
+   * Serialize the Protos.FrameworkID into a ByteBuffer.
    */
   public void setFrameworkId(Protos.FrameworkID frameworkId) {
     if (frameworkId != null) {
@@ -213,7 +209,7 @@ public final class StoreContext {
     }
   }
 
-  public Set<Protos.TaskID> getPendingTasks () {
+  public Set<Protos.TaskID> getPendingTasks() {
     return toTaskSet(pendingTasks);
   }
 
@@ -263,7 +259,7 @@ public final class StoreContext {
 
   private void toTaskBuffer(Set<Protos.TaskID> src, List<ByteBuffer> tgt) {
     for (Protos.TaskID id : src) {
-       tgt.add(ByteBufferSupport.toByteBuffer(id));
+      tgt.add(ByteBufferSupport.toByteBuffer(id));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/HttpConnectorProvider.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/HttpConnectorProvider.java b/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/HttpConnectorProvider.java
index e618aea..a52b310 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/HttpConnectorProvider.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/HttpConnectorProvider.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,20 +30,20 @@ import javax.inject.Inject;
  */
 public class HttpConnectorProvider implements Provider<Connector> {
 
-    private MyriadConfiguration myriadConf;
+  private MyriadConfiguration myriadConf;
 
-    @Inject
-    public HttpConnectorProvider(MyriadConfiguration myriadConf) {
-        this.myriadConf = myriadConf;
-    }
+  @Inject
+  public HttpConnectorProvider(MyriadConfiguration myriadConf) {
+    this.myriadConf = myriadConf;
+  }
 
-    @Override
-    public Connector get() {
-        SelectChannelConnector ret = new SelectChannelConnector();
-        ret.setName("Myriad");
-        ret.setHost("0.0.0.0");
-        ret.setPort(myriadConf.getRestApiPort());
+  @Override
+  public Connector get() {
+    SelectChannelConnector ret = new SelectChannelConnector();
+    ret.setName("Myriad");
+    ret.setHost("0.0.0.0");
+    ret.setPort(myriadConf.getRestApiPort());
 
-        return ret;
-    }
+    return ret;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadServletModule.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadServletModule.java b/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadServletModule.java
index 263860a..cea22d1 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadServletModule.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadServletModule.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -31,15 +31,15 @@ import org.codehaus.jackson.jaxrs.JacksonJaxbJsonProvider;
  */
 public class MyriadServletModule extends ServletModule {
 
-    @Override
-    protected void configureServlets() {
-        bind(ClustersResource.class);
-        bind(ConfigurationResource.class);
-        bind(SchedulerStateResource.class);
+  @Override
+  protected void configureServlets() {
+    bind(ClustersResource.class);
+    bind(ConfigurationResource.class);
+    bind(SchedulerStateResource.class);
 
-        bind(GuiceContainer.class);
-        bind(JacksonJaxbJsonProvider.class).in(Scopes.SINGLETON);
+    bind(GuiceContainer.class);
+    bind(JacksonJaxbJsonProvider.class).in(Scopes.SINGLETON);
 
-        serve("/api/*").with(GuiceContainer.class);
-    }
+    serve("/api/*").with(GuiceContainer.class);
+  }
 }


Mime
View raw message