myriad-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From smare...@apache.org
Subject [10/20] incubator-myriad git commit: com.ebay => org.apache
Date Wed, 28 Oct 2015 16:07:42 GMT
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/state/Cluster.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/state/Cluster.java b/myriad-scheduler/src/main/java/org/apache/myriad/state/Cluster.java
new file mode 100644
index 0000000..07604bc
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/state/Cluster.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.state;
+
+import com.google.gson.Gson;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.UUID;
+
+/**
+ * Model which represents the configuration of a cluster
+ */
+public class Cluster {
+  private String clusterId;
+  private String clusterName;
+  private Collection<NodeTask> nodes;
+  private String resourceManagerHost;
+  private String resourceManagerPort;
+  private double minQuota;
+
+  public Cluster() {
+    this.clusterId = UUID.randomUUID().toString();
+    this.nodes = new HashSet<>();
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  public String getClusterName() {
+    return clusterName;
+  }
+
+  public void setClusterName(String clusterName) {
+    this.clusterName = clusterName;
+  }
+
+  public Collection<NodeTask> getNodes() {
+    return nodes;
+  }
+
+  public void addNode(NodeTask node) {
+    this.nodes.add(node);
+  }
+
+  public void addNodes(Collection<NodeTask> nodes) {
+    this.nodes.addAll(nodes);
+  }
+
+  public void removeNode(NodeTask task) {
+    this.nodes.remove(task);
+  }
+
+  public String getResourceManagerHost() {
+    return resourceManagerHost;
+  }
+
+  public void setResourceManagerHost(String resourceManagerHost) {
+    this.resourceManagerHost = resourceManagerHost;
+  }
+
+  public String getResourceManagerPort() {
+    return resourceManagerPort;
+  }
+
+  public void setResourceManagerPort(String resourceManagerPort) {
+    this.resourceManagerPort = resourceManagerPort;
+  }
+
+  public double getMinQuota() {
+    return minQuota;
+  }
+
+  public void setMinQuota(double minQuota) {
+    this.minQuota = minQuota;
+  }
+
+  public String toString() {
+    Gson gson = new Gson();
+    return gson.toJson(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/state/MyriadState.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/state/MyriadState.java b/myriad-scheduler/src/main/java/org/apache/myriad/state/MyriadState.java
new file mode 100644
index 0000000..cc1a10f
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/state/MyriadState.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.state;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import org.apache.mesos.Protos;
+import org.apache.mesos.state.State;
+import org.apache.mesos.state.Variable;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Model that represents the state of Myriad
+ */
+public class MyriadState {
+  public static final String KEY_FRAMEWORK_ID = "frameworkId";
+
+  private State stateStore;
+
+  public MyriadState(State stateStore) {
+    this.stateStore = stateStore;
+  }
+
+  public Protos.FrameworkID getFrameworkID() throws InterruptedException, ExecutionException, InvalidProtocolBufferException {
+    byte[] frameworkId = stateStore.fetch(KEY_FRAMEWORK_ID).get().value();
+
+    if (frameworkId.length > 0) {
+      return Protos.FrameworkID.parseFrom(frameworkId);
+    } else {
+      return null;
+    }
+  }
+
+  public void setFrameworkId(Protos.FrameworkID newFrameworkId) throws InterruptedException, ExecutionException {
+    Variable frameworkId = stateStore.fetch(KEY_FRAMEWORK_ID).get();
+    frameworkId = frameworkId.mutate(newFrameworkId.toByteArray());
+    stateStore.store(frameworkId).get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/state/MyriadStateStore.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/state/MyriadStateStore.java b/myriad-scheduler/src/main/java/org/apache/myriad/state/MyriadStateStore.java
new file mode 100644
index 0000000..69b0238
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/state/MyriadStateStore.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.myriad.state;
+
+import org.apache.myriad.state.utils.StoreContext;
+
+/**
+ * Interface implemented by all Myriad State Store implementations
+ */
+public interface MyriadStateStore {
+
+  StoreContext loadMyriadState() throws Exception;
+
+  void storeMyriadState(StoreContext storeContext) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/state/NodeTask.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/state/NodeTask.java b/myriad-scheduler/src/main/java/org/apache/myriad/state/NodeTask.java
new file mode 100644
index 0000000..f5822e9
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/state/NodeTask.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.state;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+import org.apache.myriad.scheduler.ServiceResourceProfile;
+import com.google.inject.Inject;
+
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.Attribute;
+
+/**
+ * Represents a task to be launched by the executor
+ */
+public class NodeTask {
+  @JsonProperty
+  private String hostname;
+  @JsonProperty
+  private Protos.SlaveID slaveId;
+  @JsonProperty
+  private Protos.TaskStatus taskStatus;
+  @JsonProperty
+  private String taskPrefix;
+  @JsonProperty
+  private ServiceResourceProfile serviceresourceProfile;
+
+  @Inject
+  org.apache.myriad.scheduler.TaskUtils taskUtils;
+  /**
+   * Mesos executor for this node.
+   */
+  private Protos.ExecutorInfo executorInfo;
+
+  private org.apache.myriad.scheduler.constraints.Constraint constraint;
+  private List<Attribute> slaveAttributes;
+
+  public NodeTask(ServiceResourceProfile profile, org.apache.myriad.scheduler.constraints.Constraint constraint) {
+    this.serviceresourceProfile = profile;
+    this.hostname = "";
+    this.constraint = constraint;
+  }
+
+  public Protos.SlaveID getSlaveId() {
+    return slaveId;
+  }
+
+  public void setSlaveId(Protos.SlaveID slaveId) {
+    this.slaveId = slaveId;
+  }
+
+  public org.apache.myriad.scheduler.constraints.Constraint getConstraint() {
+    return constraint;
+  }
+
+  public String getHostname() {
+    return this.hostname;
+  }
+
+  public void setHostname(String hostname) {
+    this.hostname = hostname;
+  }
+
+  public Protos.TaskStatus getTaskStatus() {
+    return taskStatus;
+  }
+
+  public void setTaskStatus(Protos.TaskStatus taskStatus) {
+    this.taskStatus = taskStatus;
+  }
+
+  public Protos.ExecutorInfo getExecutorInfo() {
+    return executorInfo;
+  }
+
+  public void setExecutorInfo(Protos.ExecutorInfo executorInfo) {
+    this.executorInfo = executorInfo;
+  }
+
+  public void setSlaveAttributes(List<Attribute> slaveAttributes) {
+    this.slaveAttributes = slaveAttributes;
+  }
+
+  public List<Attribute> getSlaveAttributes() {
+    return slaveAttributes;
+  }
+
+  public String getTaskPrefix() {
+    return taskPrefix;
+  }
+
+  public void setTaskPrefix(String taskPrefix) {
+    this.taskPrefix = taskPrefix;
+  }
+
+  public ServiceResourceProfile getProfile() {
+    return serviceresourceProfile;
+  }
+
+  public void setProfile(ServiceResourceProfile serviceresourceProfile) {
+    this.serviceresourceProfile = serviceresourceProfile;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java b/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java
new file mode 100644
index 0000000..fe3fd67
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java
@@ -0,0 +1,549 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.state;
+
+import org.apache.myriad.scheduler.ServiceResourceProfile;
+import org.apache.myriad.state.utils.StoreContext;
+import com.google.common.collect.Sets;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.SlaveID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents the state of the Myriad scheduler
+ */
+public class SchedulerState {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerState.class);
+
+  private static Pattern taskIdPattern = Pattern.compile("\\.");
+
+  private Map<Protos.TaskID, NodeTask> tasks;
+  private Protos.FrameworkID frameworkId;
+  private MyriadStateStore stateStore;
+  private Map<String, SchedulerStateForType> statesForTaskType;
+
+  public SchedulerState(MyriadStateStore stateStore) {
+    this.tasks = new ConcurrentHashMap<>();
+    this.stateStore = stateStore;
+    this.statesForTaskType = new ConcurrentHashMap<>();
+    loadStateStore();
+  }
+
+  /**
+   * Making method synchronized, so if someone tries flexup/down at the same time
+   * addNodes and removeTask will not put data into an inconsistent state
+   *
+   * @param nodes
+   */
+  public synchronized void addNodes(Collection<NodeTask> nodes) {
+    if (CollectionUtils.isEmpty(nodes)) {
+      LOGGER.info("No nodes to add");
+      return;
+    }
+    for (NodeTask node : nodes) {
+      Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue(String.format("%s.%s.%s", node.getTaskPrefix(), node.getProfile().getName(), UUID.randomUUID())).build();
+      addTask(taskId, node);
+      SchedulerStateForType taskState = this.statesForTaskType.get(node.getTaskPrefix());
+      LOGGER.info("Marked taskId {} pending, size of pending queue for {} is: {}", taskId.getValue(), node.getTaskPrefix(), (taskState == null ? 0 : taskState.getPendingTaskIds().size()));
+      makeTaskPending(taskId);
+    }
+
+  }
+
+  // TODO (sdaingade) Clone NodeTask
+  public synchronized void addTask(Protos.TaskID taskId, NodeTask node) {
+    this.tasks.put(taskId, node);
+    updateStateStore();
+  }
+
+  public synchronized void updateTask(Protos.TaskStatus taskStatus) {
+    Objects.requireNonNull(taskStatus, "TaskStatus object shouldn't be null");
+    Protos.TaskID taskId = taskStatus.getTaskId();
+    if (this.tasks.containsKey(taskId)) {
+      this.tasks.get(taskId).setTaskStatus(taskStatus);
+    }
+    updateStateStore();
+  }
+
+  public synchronized void makeTaskPending(Protos.TaskID taskId) {
+    Objects.requireNonNull(taskId, "taskId cannot be empty or null");
+    String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+    SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
+    if (taskTypeState == null) {
+      taskTypeState = new SchedulerStateForType(taskPrefix);
+      statesForTaskType.put(taskPrefix, taskTypeState);
+    }
+    taskTypeState.makeTaskPending(taskId);
+    updateStateStore();
+  }
+
+  public synchronized void makeTaskStaging(Protos.TaskID taskId) {
+    Objects.requireNonNull(taskId, "taskId cannot be empty or null");
+    String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+    SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
+    if (taskTypeState == null) {
+      taskTypeState = new SchedulerStateForType(taskPrefix);
+      statesForTaskType.put(taskPrefix, taskTypeState);
+    }
+    taskTypeState.makeTaskStaging(taskId);
+    updateStateStore();
+  }
+
+  public synchronized void makeTaskActive(Protos.TaskID taskId) {
+    Objects.requireNonNull(taskId, "taskId cannot be empty or null");
+    String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+    SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
+    if (taskTypeState == null) {
+      taskTypeState = new SchedulerStateForType(taskPrefix);
+      statesForTaskType.put(taskPrefix, taskTypeState);
+    }
+    taskTypeState.makeTaskActive(taskId);
+    updateStateStore();
+  }
+
+  public synchronized void makeTaskLost(Protos.TaskID taskId) {
+    Objects.requireNonNull(taskId, "taskId cannot be empty or null");
+    String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+    SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
+    if (taskTypeState == null) {
+      taskTypeState = new SchedulerStateForType(taskPrefix);
+      statesForTaskType.put(taskPrefix, taskTypeState);
+    }
+    taskTypeState.makeTaskLost(taskId);
+    updateStateStore();
+  }
+
+  public synchronized void makeTaskKillable(Protos.TaskID taskId) {
+    Objects.requireNonNull(taskId, "taskId cannot be empty or null");
+    String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+    SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
+    if (taskTypeState == null) {
+      taskTypeState = new SchedulerStateForType(taskPrefix);
+      statesForTaskType.put(taskPrefix, taskTypeState);
+    }
+    taskTypeState.makeTaskKillable(taskId);
+    updateStateStore();
+  }
+
+  // TODO (sdaingade) Clone NodeTask
+  public synchronized NodeTask getTask(Protos.TaskID taskId) {
+    return this.tasks.get(taskId);
+  }
+
+  public synchronized Set<Protos.TaskID> getKillableTasks() {
+    Set<Protos.TaskID> returnSet = new HashSet<>();
+    for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) {
+      returnSet.addAll(entry.getValue().getKillableTasks());
+    }
+    return returnSet;
+  }
+
+  public synchronized Set<Protos.TaskID> getKillableTasks(String taskPrefix) {
+    SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
+    return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getKillableTasks());
+  }
+
+  public synchronized void removeTask(Protos.TaskID taskId) {
+    String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+    SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
+    if (taskTypeState != null) {
+      taskTypeState.removeTask(taskId);
+    }
+    this.tasks.remove(taskId);
+    updateStateStore();
+  }
+
+  public synchronized Set<Protos.TaskID> getPendingTaskIds() {
+    Set<Protos.TaskID> returnSet = new HashSet<>();
+    for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) {
+      returnSet.addAll(entry.getValue().getPendingTaskIds());
+    }
+    return returnSet;
+  }
+
+  public synchronized Collection<Protos.TaskID> getPendingTaskIDsForProfile(ServiceResourceProfile serviceProfile) {
+    List<Protos.TaskID> pendingTaskIds = new ArrayList<>();
+    Set<Protos.TaskID> pendingTasks = getPendingTaskIds();
+    for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
+      NodeTask nodeTask = entry.getValue();
+      if (pendingTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(serviceProfile.getName())) {
+        pendingTaskIds.add(entry.getKey());
+      }
+    }
+    return Collections.unmodifiableCollection(pendingTaskIds);
+  }
+
+  public synchronized Set<Protos.TaskID> getPendingTaskIds(String taskPrefix) {
+    SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
+    return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getPendingTaskIds());
+  }
+
+  public synchronized Set<Protos.TaskID> getActiveTaskIds() {
+    Set<Protos.TaskID> returnSet = new HashSet<>();
+    for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) {
+      returnSet.addAll(entry.getValue().getActiveTaskIds());
+    }
+    return returnSet;
+  }
+
+  public synchronized Set<Protos.TaskID> getActiveTaskIds(String taskPrefix) {
+    SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
+    return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getActiveTaskIds());
+  }
+
+  public synchronized Set<NodeTask> getActiveTasks() {
+    return getTasks(getActiveTaskIds());
+  }
+
+  public Set<NodeTask> getActiveTasksByType(String taskPrefix) {
+    return getTasks(getActiveTaskIds(taskPrefix));
+  }
+
+  public Set<NodeTask> getStagingTasks() {
+    return getTasks(getStagingTaskIds());
+  }
+
+  public Set<NodeTask> getStagingTasksByType(String taskPrefix) {
+    return getTasks(getStagingTaskIds(taskPrefix));
+  }
+
+  public Set<NodeTask> getPendingTasksByType(String taskPrefix) {
+    return getTasks(getPendingTaskIds(taskPrefix));
+  }
+
+  public synchronized Set<NodeTask> getTasks(Set<Protos.TaskID> taskIds) {
+    Set<NodeTask> nodeTasks = new HashSet<>();
+    if (CollectionUtils.isNotEmpty(taskIds) && CollectionUtils.isNotEmpty(tasks.values())) {
+      for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
+        if (taskIds.contains(entry.getKey())) {
+          nodeTasks.add(entry.getValue());
+        }
+      }
+    }
+    return Collections.unmodifiableSet(nodeTasks);
+  }
+
+  public synchronized Collection<Protos.TaskID> getActiveTaskIDsForProfile(ServiceResourceProfile serviceProfile) {
+    List<Protos.TaskID> activeTaskIDs = new ArrayList<>();
+    Set<Protos.TaskID> activeTaskIds = getActiveTaskIds();
+    if (CollectionUtils.isNotEmpty(activeTaskIds) && CollectionUtils.isNotEmpty(tasks.values())) {
+      for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
+        NodeTask nodeTask = entry.getValue();
+        if (activeTaskIds.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(serviceProfile.getName())) {
+          activeTaskIDs.add(entry.getKey());
+        }
+      }
+    }
+    return Collections.unmodifiableCollection(activeTaskIDs);
+  }
+
+  // TODO (sdaingade) Clone NodeTask
+  public synchronized NodeTask getNodeTask(SlaveID slaveId, String taskPrefix) {
+    if (taskPrefix == null) {
+      return null;
+    }
+    for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
+      final NodeTask task = entry.getValue();
+      if (task.getSlaveId() != null &&
+          task.getSlaveId().equals(slaveId) &&
+          taskPrefix.equals(task.getTaskPrefix())) {
+        return entry.getValue();
+      }
+    }
+    return null;
+  }
+
+  public synchronized Set<NodeTask> getNodeTasks(SlaveID slaveId) {
+    Set<NodeTask> nodeTasks = Sets.newHashSet();
+    for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
+      final NodeTask task = entry.getValue();
+      if (task.getSlaveId() != null && task.getSlaveId().equals(slaveId)) {
+        nodeTasks.add(entry.getValue());
+      }
+    }
+    return nodeTasks;
+  }
+
+  public Set<Protos.TaskID> getStagingTaskIds() {
+    Set<Protos.TaskID> returnSet = new HashSet<>();
+    for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) {
+      returnSet.addAll(entry.getValue().getStagingTaskIds());
+    }
+    return returnSet;
+  }
+
+  public synchronized Collection<Protos.TaskID> getStagingTaskIDsForProfile(ServiceResourceProfile serviceProfile) {
+    List<Protos.TaskID> stagingTaskIDs = new ArrayList<>();
+
+    Set<Protos.TaskID> stagingTasks = getStagingTaskIds();
+    for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
+      NodeTask nodeTask = entry.getValue();
+      if (stagingTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(serviceProfile.getName())) {
+        stagingTaskIDs.add(entry.getKey());
+      }
+    }
+    return Collections.unmodifiableCollection(stagingTaskIDs);
+  }
+
+  public Set<Protos.TaskID> getStagingTaskIds(String taskPrefix) {
+    SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
+    return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getStagingTaskIds());
+  }
+
+  public Set<Protos.TaskID> getLostTaskIds() {
+    Set<Protos.TaskID> returnSet = new HashSet<>();
+    for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) {
+      returnSet.addAll(entry.getValue().getLostTaskIds());
+    }
+    return returnSet;
+  }
+
+  public Set<Protos.TaskID> getLostTaskIds(String taskPrefix) {
+    SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
+    return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getLostTaskIds());
+  }
+
+  // TODO (sdaingade) Currently cannot return unmodifiableCollection
+  // as this will break ReconcileService code
+  public synchronized Collection<Protos.TaskStatus> getTaskStatuses() {
+    Collection<Protos.TaskStatus> taskStatuses = new ArrayList<>(this.tasks.size());
+    Collection<NodeTask> tasks = this.tasks.values();
+    for (NodeTask task : tasks) {
+      Protos.TaskStatus taskStatus = task.getTaskStatus();
+      if (taskStatus != null) {
+        taskStatuses.add(taskStatus);
+      }
+    }
+
+    return taskStatuses;
+  }
+
+  public synchronized boolean hasTask(Protos.TaskID taskID) {
+    return this.tasks.containsKey(taskID);
+  }
+
+  public synchronized Protos.FrameworkID getFrameworkID() {
+    return this.frameworkId;
+  }
+
+  public synchronized void setFrameworkId(Protos.FrameworkID newFrameworkId) {
+    this.frameworkId = newFrameworkId;
+    updateStateStore();
+  }
+
+  private synchronized void updateStateStore() {
+    if (this.stateStore == null) {
+      LOGGER.debug("Could not update state to state store as HA is disabled");
+      return;
+    }
+
+    try {
+      StoreContext sc = new StoreContext(frameworkId, tasks, getPendingTaskIds(), getStagingTaskIds(), getActiveTaskIds(), getLostTaskIds(), getKillableTasks());
+      stateStore.storeMyriadState(sc);
+    } catch (Exception e) {
+      LOGGER.error("Failed to update scheduler state to state store", e);
+    }
+  }
+
+  private synchronized void loadStateStore() {
+    if (this.stateStore == null) {
+      LOGGER.debug("Could not load state from state store as HA is disabled");
+      return;
+    }
+
+    try {
+      StoreContext sc = stateStore.loadMyriadState();
+      if (sc != null) {
+        this.frameworkId = sc.getFrameworkId();
+        this.tasks.putAll(sc.getTasks());
+        convertToThis(TaskState.PENDING, sc.getPendingTasks());
+        convertToThis(TaskState.STAGING, sc.getStagingTasks());
+        convertToThis(TaskState.ACTIVE, sc.getActiveTasks());
+        convertToThis(TaskState.LOST, sc.getLostTasks());
+        convertToThis(TaskState.KILLABLE, sc.getKillableTasks());
+        LOGGER.info("Loaded Myriad state from state store successfully.");
+        LOGGER.debug("State Store state includes " +
+            "frameworkId: {}, pending tasks count: {}, staging tasks count: {} " +
+            "active tasks count: {}, lost tasks count: {}, " +
+            "and killable tasks count: {}", frameworkId.getValue(), this.getPendingTaskIds().size(), this.getStagingTaskIds().size(), this.getActiveTaskIds().size(), this.getLostTaskIds().size(), this.getKillableTasks().size());
+      }
+    } catch (Exception e) {
+      LOGGER.error("Failed to read scheduler state from state store", e);
+    }
+  }
+
+  private void convertToThis(TaskState taskType, Set<Protos.TaskID> taskIds) {
+    for (Protos.TaskID taskId : taskIds) {
+      String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+      SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix);
+      if (taskTypeState == null) {
+        taskTypeState = new SchedulerStateForType(taskPrefix);
+        statesForTaskType.put(taskPrefix, taskTypeState);
+      }
+      switch (taskType) {
+        case PENDING:
+          taskTypeState.makeTaskPending(taskId);
+          break;
+        case STAGING:
+          taskTypeState.makeTaskStaging(taskId);
+          break;
+        case ACTIVE:
+          taskTypeState.makeTaskActive(taskId);
+          break;
+        case KILLABLE:
+          taskTypeState.makeTaskKillable(taskId);
+          break;
+        case LOST:
+          taskTypeState.makeTaskLost(taskId);
+          break;
+      }
+    }
+  }
+
+  /**
+   * Class to keep all the tasks states for a particular taskPrefix together
+   */
+  private static class SchedulerStateForType {
+
+    private final String taskPrefix;
+    private Set<Protos.TaskID> pendingTasks;
+    private Set<Protos.TaskID> stagingTasks;
+    private Set<Protos.TaskID> activeTasks;
+    private Set<Protos.TaskID> lostTasks;
+    private Set<Protos.TaskID> killableTasks;
+
+    public SchedulerStateForType(String taskPrefix) {
+      this.taskPrefix = taskPrefix;
+      // Since Sets.newConcurrentHashSet is available only starting form Guava version 15
+      // and so far (Hadoop 2.7) uses guava 13 we can not easily use it
+      this.pendingTasks = Collections.newSetFromMap(new ConcurrentHashMap<Protos.TaskID, Boolean>());
+      this.stagingTasks = Collections.newSetFromMap(new ConcurrentHashMap<Protos.TaskID, Boolean>());
+      this.activeTasks = Collections.newSetFromMap(new ConcurrentHashMap<Protos.TaskID, Boolean>());
+      this.lostTasks = Collections.newSetFromMap(new ConcurrentHashMap<Protos.TaskID, Boolean>());
+      this.killableTasks = Collections.newSetFromMap(new ConcurrentHashMap<Protos.TaskID, Boolean>());
+
+    }
+
+    @SuppressWarnings("unused")
+    public String getTaskPrefix() {
+      return taskPrefix;
+    }
+
+    public synchronized void makeTaskPending(Protos.TaskID taskId) {
+      Objects.requireNonNull(taskId, "taskId cannot be empty or null");
+
+      pendingTasks.add(taskId);
+      stagingTasks.remove(taskId);
+      activeTasks.remove(taskId);
+      lostTasks.remove(taskId);
+      killableTasks.remove(taskId);
+    }
+
+    public synchronized void makeTaskStaging(Protos.TaskID taskId) {
+      Objects.requireNonNull(taskId, "taskId cannot be empty or null");
+      pendingTasks.remove(taskId);
+      stagingTasks.add(taskId);
+      activeTasks.remove(taskId);
+      lostTasks.remove(taskId);
+      killableTasks.remove(taskId);
+    }
+
+    public synchronized void makeTaskActive(Protos.TaskID taskId) {
+      Objects.requireNonNull(taskId, "taskId cannot be empty or null");
+      pendingTasks.remove(taskId);
+      stagingTasks.remove(taskId);
+      activeTasks.add(taskId);
+      lostTasks.remove(taskId);
+      killableTasks.remove(taskId);
+    }
+
+    public synchronized void makeTaskLost(Protos.TaskID taskId) {
+      Objects.requireNonNull(taskId, "taskId cannot be empty or null");
+      pendingTasks.remove(taskId);
+      stagingTasks.remove(taskId);
+      activeTasks.remove(taskId);
+      lostTasks.add(taskId);
+      killableTasks.remove(taskId);
+    }
+
+    public synchronized void makeTaskKillable(Protos.TaskID taskId) {
+      Objects.requireNonNull(taskId, "taskId cannot be empty or null");
+      pendingTasks.remove(taskId);
+      stagingTasks.remove(taskId);
+      activeTasks.remove(taskId);
+      lostTasks.remove(taskId);
+      killableTasks.add(taskId);
+    }
+
+    public synchronized void removeTask(Protos.TaskID taskId) {
+      this.pendingTasks.remove(taskId);
+      this.stagingTasks.remove(taskId);
+      this.activeTasks.remove(taskId);
+      this.lostTasks.remove(taskId);
+      this.killableTasks.remove(taskId);
+    }
+
+    public synchronized Set<Protos.TaskID> getPendingTaskIds() {
+      return Collections.unmodifiableSet(this.pendingTasks);
+    }
+
+    public Set<Protos.TaskID> getActiveTaskIds() {
+      return Collections.unmodifiableSet(this.activeTasks);
+    }
+
+    public synchronized Set<Protos.TaskID> getStagingTaskIds() {
+      return Collections.unmodifiableSet(this.stagingTasks);
+    }
+
+    public synchronized Set<Protos.TaskID> getLostTaskIds() {
+      return Collections.unmodifiableSet(this.lostTasks);
+    }
+
+    public synchronized Set<Protos.TaskID> getKillableTasks() {
+      return Collections.unmodifiableSet(this.killableTasks);
+    }
+
+  }
+
+  /**
+   * TaskState type
+   */
+  public enum TaskState {
+    PENDING,
+    STAGING,
+    ACTIVE,
+    KILLABLE,
+    LOST
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/state/utils/ByteBufferSupport.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/state/utils/ByteBufferSupport.java b/myriad-scheduler/src/main/java/org/apache/myriad/state/utils/ByteBufferSupport.java
new file mode 100644
index 0000000..9fb2490
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/state/utils/ByteBufferSupport.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.myriad.state.utils;
+
+import org.apache.myriad.scheduler.constraints.Constraint;
+import org.apache.myriad.scheduler.constraints.Constraint.Type;
+import org.apache.myriad.scheduler.constraints.LikeConstraint;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.mesos.Protos;
+
+import org.apache.myriad.scheduler.ServiceResourceProfile;
+import org.apache.myriad.state.NodeTask;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.protobuf.GeneratedMessage;
+
+/**
+ * ByteBuffer support for the Serialization of the StoreContext
+ */
+public class ByteBufferSupport {
+
+  public static final int INT_SIZE = Integer.SIZE / Byte.SIZE;
+  public static final String UTF8 = "UTF-8";
+  public static final byte[] ZERO_BYTES = new byte[0];
+  private static Gson gson = new Gson();
+  private static Gson gsonCustom = new GsonBuilder().registerTypeAdapter(ServiceResourceProfile.class, new ServiceResourceProfile.CustomDeserializer()).create();
+
+  public static void addByteBuffers(List<ByteBuffer> list, ByteArrayOutputStream bytes) throws IOException {
+    // If list, add the list size, then the size of each buffer followed by the buffer.
+    if (list != null) {
+      bytes.write(toIntBytes(list.size()));
+      for (ByteBuffer bb : list) {
+        addByteBuffer(bb, bytes);
+      }
+    } else {
+      bytes.write(toIntBytes(0));
+    }
+  }
+
+  public static void addByteBuffer(ByteBuffer bb, ByteArrayOutputStream bytes) throws IOException {
+    if (bb != null && bytes != null) {
+      bytes.write(toIntBytes(bb.array().length));
+      bytes.write(bb.array());
+    }
+  }
+
+  public static ByteBuffer toByteBuffer(Protos.TaskID taskId) {
+    return toBuffer(taskId);
+  }
+
+  public static ByteBuffer toByteBuffer(Protos.FrameworkID frameworkId) {
+    return toBuffer(frameworkId);
+  }
+
+  /*
+   * Common method to convert Protobuf object to ByteBuffer 
+   */
+  public static ByteBuffer toBuffer(GeneratedMessage message) {
+    byte dst[];
+    int size;
+    if (message != null) {
+      size = message.getSerializedSize() + INT_SIZE;
+      dst = message.toByteArray();
+    } else {
+      size = INT_SIZE;
+      dst = ZERO_BYTES;
+    }
+    ByteBuffer bb = createBuffer(size);
+    putBytes(bb, dst);
+    bb.rewind();
+    return bb;
+  }
+
+  public static byte[] toIntBytes(int src) {
+    ByteBuffer bb = createBuffer(INT_SIZE);
+    bb.putInt(src);
+    return bb.array();
+  }
+
+
+  public static ByteBuffer toByteBuffer(NodeTask nt) {
+    // Determine the size of ByteBuffer to allocate
+    // The ServiceResourceProfile toString() returns Json, if this ever changes then this
+    // will fail. Json is expected.
+    byte[] profile = toBytes(nt.getProfile().toString());
+    int size = profile.length + INT_SIZE;
+
+    Constraint constraint = nt.getConstraint();
+    Constraint.Type type = constraint == null ? Type.NULL : constraint.getType();
+    size += INT_SIZE;
+
+    byte[] constraintBytes = ZERO_BYTES;
+    if (constraint != null) {
+      constraintBytes = toBytes(constraint.toString());
+      size += constraintBytes.length + INT_SIZE;
+    } else {
+      size += INT_SIZE;
+    }
+
+    byte[] hostname = toBytes(nt.getHostname());
+    size += hostname.length + INT_SIZE;
+
+    if (nt.getSlaveId() != null) {
+      size += nt.getSlaveId().getSerializedSize() + INT_SIZE;
+    } else {
+      size += INT_SIZE;
+    }
+
+    if (nt.getTaskStatus() != null) {
+      size += nt.getTaskStatus().getSerializedSize() + INT_SIZE;
+    } else {
+      size += INT_SIZE;
+    }
+
+    if (nt.getExecutorInfo() != null) {
+      size += nt.getExecutorInfo().getSerializedSize() + INT_SIZE;
+    } else {
+      size += INT_SIZE;
+    }
+
+    byte[] taskPrefixBytes = ZERO_BYTES;
+    if (nt.getTaskPrefix() != null) {
+      taskPrefixBytes = toBytes(nt.getTaskPrefix());
+      size += taskPrefixBytes.length + INT_SIZE;
+    }
+
+    // Allocate and populate the buffer.
+    ByteBuffer bb = createBuffer(size);
+    putBytes(bb, profile);
+    bb.putInt(type.ordinal());
+    putBytes(bb, constraintBytes);
+    putBytes(bb, hostname);
+    putBytes(bb, getSlaveBytes(nt));
+    putBytes(bb, getTaskBytes(nt));
+    putBytes(bb, getExecutorInfoBytes(nt));
+    putBytes(bb, taskPrefixBytes);
+    // Make sure the buffer is at the beginning
+    bb.rewind();
+    return bb;
+  }
+
+  /**
+   * Assumes the entire ByteBuffer is a TaskID.
+   *
+   * @param bb
+   * @return Protos.TaskID
+   */
+  public static Protos.TaskID toTaskId(ByteBuffer bb) {
+    try {
+      return Protos.TaskID.parseFrom(getBytes(bb, bb.getInt()));
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to parse Task ID", e);
+    }
+  }
+
+  /**
+   * Assumes the entire ByteBuffer is a FrameworkID.
+   *
+   * @param bb
+   * @return Protos.FrameworkID
+   */
+  public static Protos.FrameworkID toFrameworkID(ByteBuffer bb) {
+    try {
+      return Protos.FrameworkID.parseFrom(getBytes(bb, bb.getInt()));
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to parse Framework ID", e);
+    }
+  }
+
+  /**
+   * ByteBuffer is expected to have a NodeTask at its next position.
+   *
+   * @param bb
+   * @return NodeTask or null if buffer is empty. Can throw a RuntimeException
+   * if the buffer is not formatted correctly.
+   */
+  public static NodeTask toNodeTask(ByteBuffer bb) {
+    NodeTask nt = null;
+    if (bb != null && bb.array().length > 0) {
+      nt = new NodeTask(getServiceResourceProfile(bb), getConstraint(bb));
+      nt.setHostname(toString(bb));
+      nt.setSlaveId(toSlaveId(bb));
+      nt.setTaskStatus(toTaskStatus(bb));
+      nt.setExecutorInfo(toExecutorInfo(bb));
+    }
+    return nt;
+  }
+
+  public static byte[] getTaskBytes(NodeTask nt) {
+    if (nt.getTaskStatus() != null) {
+      return nt.getTaskStatus().toByteArray();
+    } else {
+      return ZERO_BYTES;
+    }
+  }
+
+  public static byte[] getExecutorInfoBytes(NodeTask nt) {
+    if (nt.getExecutorInfo() != null) {
+      return nt.getExecutorInfo().toByteArray();
+    } else {
+      return ZERO_BYTES;
+    }
+  }
+
+  public static byte[] getSlaveBytes(NodeTask nt) {
+    if (nt.getSlaveId() != null) {
+      return nt.getSlaveId().toByteArray();
+    } else {
+      return ZERO_BYTES;
+    }
+  }
+
+  public static void putBytes(ByteBuffer bb, byte bytes[]) {
+    if (bytes != null && bytes.length > 0) {
+      bb.putInt(bytes.length);
+      bb.put(bytes);
+    } else {
+      bb.putInt(0);
+    }
+  }
+
+  public static byte[] getBytes(ByteBuffer bb, int size) {
+    byte bytes[] = new byte[size];
+    bb.get(bytes);
+    return bytes;
+  }
+
+  /**
+   * This assumes the next position is the size as an int, and the following is a string
+   * iff the size is not zero.
+   *
+   * @param bb ByteBuffer to extract string from
+   * @return string from the next position, or "" if the size is zero
+   */
+  public static String toString(ByteBuffer bb) {
+    byte[] bytes = new byte[bb.getInt()];
+    String s = "";
+    try {
+      if (bytes.length > 0) {
+        bb.get(bytes);
+        s = new String(bytes, UTF8);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("ByteBuffer not in expected format," + " failed to parse string bytes", e);
+    }
+    return s;
+  }
+
+  public static byte[] toBytes(String s) {
+    try {
+      return s.getBytes(UTF8);
+    } catch (Exception e) {
+      return ZERO_BYTES;
+    }
+  }
+
+  public static ServiceResourceProfile getServiceResourceProfile(ByteBuffer bb) {
+    String p = toString(bb);
+    if (!StringUtils.isEmpty(p)) {
+      return gsonCustom.fromJson(p, ServiceResourceProfile.class);
+    } else {
+      return null;
+    }
+  }
+
+  public static Constraint getConstraint(ByteBuffer bb) {
+    Constraint.Type type = Constraint.Type.values()[bb.getInt()];
+    String p = toString(bb);
+    switch (type) {
+      case NULL:
+        return null;
+
+      case LIKE:
+
+        if (!StringUtils.isEmpty(p)) {
+          return gson.fromJson(p, LikeConstraint.class);
+        }
+    }
+    return null;
+  }
+
+  public static Protos.SlaveID toSlaveId(ByteBuffer bb) {
+    int size = bb.getInt();
+    if (size > 0) {
+      try {
+        return Protos.SlaveID.parseFrom(getBytes(bb, size));
+      } catch (Exception e) {
+        throw new RuntimeException("ByteBuffer not in expected format," + " failed to parse SlaveId bytes", e);
+      }
+    } else {
+      return null;
+    }
+  }
+
+  public static Protos.TaskStatus toTaskStatus(ByteBuffer bb) {
+    int size = bb.getInt();
+    if (size > 0) {
+      try {
+        return Protos.TaskStatus.parseFrom(getBytes(bb, size));
+      } catch (Exception e) {
+        throw new RuntimeException("ByteBuffer not in expected format," + " failed to parse TaskStatus bytes", e);
+      }
+    } else {
+      return null;
+    }
+  }
+
+  public static Protos.ExecutorInfo toExecutorInfo(ByteBuffer bb) {
+    int size = bb.getInt();
+    if (size > 0) {
+      try {
+        return Protos.ExecutorInfo.parseFrom(getBytes(bb, size));
+      } catch (Exception e) {
+        throw new RuntimeException("ByteBuffer not in expected format," + " failed to parse ExecutorInfo bytes", e);
+      }
+    } else {
+      return null;
+    }
+  }
+
+  public static ByteBuffer fillBuffer(byte src[]) {
+    ByteBuffer bb = createBuffer(src.length);
+    bb.put(src);
+    bb.rewind();
+    return bb;
+  }
+
+  public static List<ByteBuffer> createBufferList(ByteBuffer bb, int size) {
+    List<ByteBuffer> list = new ArrayList<ByteBuffer>(size);
+    for (int i = 0; i < size; i++) {
+      list.add(fillBuffer(getBytes(bb, bb.getInt())));
+    }
+    return list;
+  }
+
+  private static ByteBuffer createBuffer(int size) {
+    return ByteBuffer.allocate(size).order(ByteOrder.LITTLE_ENDIAN);
+  }
+
+  public static ByteBuffer createBuffer(ByteBuffer bb) {
+    return fillBuffer(getBytes(bb, bb.getInt()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/state/utils/StoreContext.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/state/utils/StoreContext.java b/myriad-scheduler/src/main/java/org/apache/myriad/state/utils/StoreContext.java
new file mode 100644
index 0000000..feebff9
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/state/utils/StoreContext.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.myriad.state.utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.TaskID;
+
+/**
+ * The purpose of this container/utility is to create a mechanism to serialize the SchedulerState
+ * to RMStateStore and back. Json did not seem to handle the Protos fields very well so this was an
+ * alternative approach.
+ */
+public final class StoreContext {
+  private static Pattern taskIdPattern = Pattern.compile("\\.");
+  private ByteBuffer frameworkId;
+  private List<ByteBuffer> taskIds;
+  private List<ByteBuffer> taskNodes;
+  private List<ByteBuffer> pendingTasks;
+  private List<ByteBuffer> stagingTasks;
+  private List<ByteBuffer> activeTasks;
+  private List<ByteBuffer> lostTasks;
+  private List<ByteBuffer> killableTasks;
+
+  public StoreContext() {
+  }
+
+  /**
+   * Accept all the SchedulerState maps and flatten them into lists of ByteBuffers
+   *
+   * @param tasks
+   * @param pendingTasks
+   * @param stagingTasks
+   * @param activeTasks
+   * @param lostTasks
+   * @param killableTasks
+   */
+  public StoreContext(Protos.FrameworkID frameworkId, Map<Protos.TaskID, org.apache.myriad.state.NodeTask> tasks, Set<Protos.TaskID> pendingTasks, Set<Protos.TaskID> stagingTasks, Set<Protos.TaskID> activeTasks, Set<Protos.TaskID> lostTasks,
+      Set<Protos.TaskID> killableTasks) {
+    setFrameworkId(frameworkId);
+    setTasks(tasks);
+    setPendingTasks(pendingTasks);
+    setStagingTasks(stagingTasks);
+    setActiveTasks(activeTasks);
+    setLostTasks(lostTasks);
+    setKillableTasks(killableTasks);
+  }
+
+  /**
+   * Accept list of ByteBuffers and re-create the SchedulerState maps.
+   *
+   * @param framwrorkId
+   * @param taskIds
+   * @param taskNodes
+   * @param pendingTasks
+   * @param stagingTasks
+   * @param activeTasks
+   * @param lostTasks
+   * @param killableTasks
+   */
+  public StoreContext(ByteBuffer frameworkId, List<ByteBuffer> taskIds, List<ByteBuffer> taskNodes, List<ByteBuffer> pendingTasks, List<ByteBuffer> stagingTasks, List<ByteBuffer> activeTasks, List<ByteBuffer> lostTasks, List<ByteBuffer>
+      killableTasks) {
+    this.frameworkId = frameworkId;
+    this.taskIds = taskIds;
+    this.taskNodes = taskNodes;
+    this.pendingTasks = pendingTasks;
+    this.stagingTasks = stagingTasks;
+    this.activeTasks = activeTasks;
+    this.lostTasks = lostTasks;
+    this.killableTasks = killableTasks;
+  }
+
+  /**
+   * Use this to gather bytes to push to the state store
+   *
+   * @return byte stream of the state store context.
+   * @throws IOException
+   */
+  public ByteArrayOutputStream toSerializedContext() throws IOException {
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    ByteBufferSupport.addByteBuffer(frameworkId, bytes);
+    ByteBufferSupport.addByteBuffers(taskIds, bytes);
+    ByteBufferSupport.addByteBuffers(taskNodes, bytes);
+    ByteBufferSupport.addByteBuffers(pendingTasks, bytes);
+    ByteBufferSupport.addByteBuffers(stagingTasks, bytes);
+    ByteBufferSupport.addByteBuffers(activeTasks, bytes);
+    ByteBufferSupport.addByteBuffers(lostTasks, bytes);
+    ByteBufferSupport.addByteBuffers(killableTasks, bytes);
+    return bytes;
+  }
+
+  /**
+   * When the bytes come back from the store, use this method to create a new context.
+   *
+   * @param bytes from state store
+   * @return initialized StoreContext to use to initialize a SchedulerState
+   */
+  @SuppressWarnings("unchecked")
+  public static StoreContext fromSerializedBytes(byte bytes[]) {
+    StoreContext ctx;
+    if (bytes != null && bytes.length > 0) {
+      ByteBuffer bb = ByteBufferSupport.fillBuffer(bytes);
+      ByteBuffer frameworkId = ByteBufferSupport.createBuffer(bb);
+      List<ByteBuffer> taskIds = ByteBufferSupport.createBufferList(bb, bb.getInt());
+      List<ByteBuffer> taskNodes = ByteBufferSupport.createBufferList(bb, bb.getInt());
+      List<ByteBuffer> pendingTasks = ByteBufferSupport.createBufferList(bb, bb.getInt());
+      List<ByteBuffer> stagingTasks = ByteBufferSupport.createBufferList(bb, bb.getInt());
+      List<ByteBuffer> activeTasks = ByteBufferSupport.createBufferList(bb, bb.getInt());
+      List<ByteBuffer> lostTasks = ByteBufferSupport.createBufferList(bb, bb.getInt());
+      List<ByteBuffer> killableTasks = ByteBufferSupport.createBufferList(bb, bb.getInt());
+      ctx = new StoreContext(frameworkId, taskIds, taskNodes, pendingTasks, stagingTasks, activeTasks, lostTasks, killableTasks);
+    } else {
+      ctx = new StoreContext();
+    }
+    return ctx;
+  }
+
+  /**
+   * Serialize tasks into internal ByteBuffers, removing the map.
+   *
+   * @param tasks
+   */
+  public void setTasks(Map<Protos.TaskID, org.apache.myriad.state.NodeTask> tasks) {
+    taskIds = new ArrayList<ByteBuffer>(tasks.size());
+    taskNodes = new ArrayList<ByteBuffer>(tasks.size());
+    for (Entry<TaskID, org.apache.myriad.state.NodeTask> entry : tasks.entrySet()) {
+      taskIds.add(ByteBufferSupport.toByteBuffer(entry.getKey()));
+      taskNodes.add(ByteBufferSupport.toByteBuffer(entry.getValue()));
+    }
+  }
+
+  /**
+   * De-serialize the internal ByteBuffer back into a Protos.FrameworkID.
+   *
+   * @return
+   */
+  public Protos.FrameworkID getFrameworkId() {
+    return ByteBufferSupport.toFrameworkID(frameworkId);
+  }
+
+  /**
+   * Serialize the Protos.FrameworkID into a ByteBuffer.
+   */
+  public void setFrameworkId(Protos.FrameworkID frameworkId) {
+    if (frameworkId != null) {
+      this.frameworkId = ByteBufferSupport.toByteBuffer(frameworkId);
+    }
+  }
+
+  /**
+   * De-serialize the internal ByteBuffers back into a Task map.
+   *
+   * @return
+   */
+  public Map<Protos.TaskID, org.apache.myriad.state.NodeTask> getTasks() {
+    Map<Protos.TaskID, org.apache.myriad.state.NodeTask> map = null;
+    if (taskIds != null) {
+      map = new HashMap<Protos.TaskID, org.apache.myriad.state.NodeTask>(taskIds.size());
+      int idx = 0;
+      for (ByteBuffer bb : taskIds) {
+        final Protos.TaskID taskId = ByteBufferSupport.toTaskId(bb);
+        final org.apache.myriad.state.NodeTask task = ByteBufferSupport.toNodeTask(taskNodes.get(idx++));
+        if (task.getTaskPrefix() == null && taskId != null) {
+          String taskPrefix = taskIdPattern.split(taskId.getValue())[0];
+          task.setTaskPrefix(taskPrefix);
+        }
+        map.put(taskId, task);
+      }
+    } else {
+      map = new HashMap<Protos.TaskID, org.apache.myriad.state.NodeTask>(0);
+    }
+    return map;
+  }
+
+  public void setPendingTasks(Set<Protos.TaskID> tasks) {
+    if (tasks != null) {
+      pendingTasks = new ArrayList<ByteBuffer>(tasks.size());
+      toTaskBuffer(tasks, pendingTasks);
+    }
+  }
+
+  public Set<Protos.TaskID> getPendingTasks() {
+    return toTaskSet(pendingTasks);
+  }
+
+  public void setStagingTasks(Set<Protos.TaskID> tasks) {
+    if (tasks != null) {
+      stagingTasks = new ArrayList<ByteBuffer>(tasks.size());
+      toTaskBuffer(tasks, stagingTasks);
+    }
+  }
+
+  public Set<Protos.TaskID> getStagingTasks() {
+    return toTaskSet(stagingTasks);
+  }
+
+  public void setActiveTasks(Set<Protos.TaskID> tasks) {
+    if (tasks != null) {
+      activeTasks = new ArrayList<ByteBuffer>(tasks.size());
+      toTaskBuffer(tasks, activeTasks);
+    }
+  }
+
+  public Set<Protos.TaskID> getActiveTasks() {
+    return toTaskSet(activeTasks);
+  }
+
+  public void setLostTasks(Set<Protos.TaskID> tasks) {
+    if (tasks != null) {
+      lostTasks = new ArrayList<ByteBuffer>(tasks.size());
+      toTaskBuffer(tasks, lostTasks);
+    }
+  }
+
+  public Set<Protos.TaskID> getLostTasks() {
+    return toTaskSet(lostTasks);
+  }
+
+  public void setKillableTasks(Set<Protos.TaskID> tasks) {
+    if (tasks != null) {
+      killableTasks = new ArrayList<ByteBuffer>(tasks.size());
+      toTaskBuffer(tasks, killableTasks);
+    }
+  }
+
+  public Set<Protos.TaskID> getKillableTasks() {
+    return toTaskSet(killableTasks);
+  }
+
+  private void toTaskBuffer(Set<Protos.TaskID> src, List<ByteBuffer> tgt) {
+    for (Protos.TaskID id : src) {
+      tgt.add(ByteBufferSupport.toByteBuffer(id));
+    }
+  }
+
+  private Set<Protos.TaskID> toTaskSet(List<ByteBuffer> src) {
+    Set<Protos.TaskID> tasks;
+    if (src != null) {
+      tasks = new HashSet<Protos.TaskID>(src.size());
+      for (int i = 0; i < src.size(); i++) {
+        tasks.add(ByteBufferSupport.toTaskId(src.get(i)));
+      }
+    } else {
+      tasks = new HashSet<Protos.TaskID>(0);
+    }
+    return tasks;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/webapp/HttpConnectorProvider.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/webapp/HttpConnectorProvider.java b/myriad-scheduler/src/main/java/org/apache/myriad/webapp/HttpConnectorProvider.java
new file mode 100644
index 0000000..fc97c75
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/webapp/HttpConnectorProvider.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.webapp;
+
+import org.apache.myriad.configuration.MyriadConfiguration;
+import com.google.inject.Provider;
+import org.mortbay.jetty.Connector;
+import org.mortbay.jetty.nio.SelectChannelConnector;
+
+import javax.inject.Inject;
+
+/**
+ * The factory for creating the http connector for the myriad scheduler
+ */
+public class HttpConnectorProvider implements Provider<Connector> {
+
+  private MyriadConfiguration myriadConf;
+
+  @Inject
+  public HttpConnectorProvider(MyriadConfiguration myriadConf) {
+    this.myriadConf = myriadConf;
+  }
+
+  @Override
+  public Connector get() {
+    SelectChannelConnector ret = new SelectChannelConnector();
+    ret.setName("Myriad");
+    ret.setHost("0.0.0.0");
+    ret.setPort(myriadConf.getRestApiPort());
+
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/webapp/MyriadServletModule.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/webapp/MyriadServletModule.java b/myriad-scheduler/src/main/java/org/apache/myriad/webapp/MyriadServletModule.java
new file mode 100644
index 0000000..e1f09c8
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/webapp/MyriadServletModule.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.webapp;
+
+import org.apache.myriad.api.ConfigurationResource;
+import org.apache.myriad.api.SchedulerStateResource;
+import com.google.inject.Scopes;
+import com.google.inject.servlet.ServletModule;
+import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
+import org.codehaus.jackson.jaxrs.JacksonJaxbJsonProvider;
+
+/**
+ * The guice module for configuring the myriad dashboard
+ */
+public class MyriadServletModule extends ServletModule {
+
+  @Override
+  protected void configureServlets() {
+    bind(org.apache.myriad.api.ClustersResource.class);
+    bind(ConfigurationResource.class);
+    bind(SchedulerStateResource.class);
+
+    bind(GuiceContainer.class);
+    bind(JacksonJaxbJsonProvider.class).in(Scopes.SINGLETON);
+
+    serve("/api/*").with(GuiceContainer.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/webapp/MyriadWebServer.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/webapp/MyriadWebServer.java b/myriad-scheduler/src/main/java/org/apache/myriad/webapp/MyriadWebServer.java
new file mode 100644
index 0000000..ac36683
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/webapp/MyriadWebServer.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.webapp;
+
+import com.google.inject.servlet.GuiceFilter;
+import org.mortbay.jetty.Connector;
+import org.mortbay.jetty.Handler;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.*;
+
+import javax.inject.Inject;
+
+/**
+ * The myriad web server configuration for jetty
+ */
+public class MyriadWebServer {
+  private final Server jetty;
+  private final Connector connector;
+  private final GuiceFilter filter;
+
+  @Inject
+  public MyriadWebServer(Server jetty, Connector connector, GuiceFilter filter) {
+    this.jetty = jetty;
+    this.connector = connector;
+    this.filter = filter;
+  }
+
+  public void start() throws Exception {
+    this.jetty.addConnector(connector);
+
+    ServletHandler servletHandler = new ServletHandler();
+
+    String filterName = "MyriadGuiceFilter";
+    FilterHolder holder = new FilterHolder(filter);
+    holder.setName(filterName);
+
+    FilterMapping filterMapping = new FilterMapping();
+    filterMapping.setPathSpec("/*");
+    filterMapping.setDispatches(Handler.ALL);
+    filterMapping.setFilterName(filterName);
+
+    servletHandler.addFilter(holder, filterMapping);
+
+    Context context = new Context();
+    context.setServletHandler(servletHandler);
+    context.addServlet(DefaultServlet.class, "/");
+
+    String staticDir = this.getClass().getClassLoader().getResource("webapp/public").toExternalForm();
+    context.setResourceBase(staticDir);
+
+    this.jetty.addHandler(context);
+    this.jetty.start();
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/webapp/WebAppGuiceModule.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/webapp/WebAppGuiceModule.java b/myriad-scheduler/src/main/java/org/apache/myriad/webapp/WebAppGuiceModule.java
new file mode 100644
index 0000000..a1e8e8f
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/webapp/WebAppGuiceModule.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.webapp;
+
+import com.google.inject.AbstractModule;
+import org.mortbay.jetty.Connector;
+
+/**
+ * The guice web application configuration
+ */
+public class WebAppGuiceModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    bind(Connector.class).toProvider(HttpConnectorProvider.class);
+    install(new MyriadServletModule());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/resources/yarn-site-default.xml
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/resources/yarn-site-default.xml b/myriad-scheduler/src/main/resources/yarn-site-default.xml
index 311723e..649cd3f 100644
--- a/myriad-scheduler/src/main/resources/yarn-site-default.xml
+++ b/myriad-scheduler/src/main/resources/yarn-site-default.xml
@@ -27,7 +27,7 @@
 <!-- Configure Myriad Scheduler here -->
 <property>
     <name>yarn.resourcemanager.scheduler.class</name>
-    <value>com.ebay.myriad.scheduler.yarn.MyriadFairScheduler</value>
-    <description>One can configure other schedulers as well from following list: com.ebay.myriad.scheduler.yarn.MyriadCapacityScheduler, com.ebay.myriad.scheduler.yarn.MyriadFifoScheduler</description>
+    <value>org.apache.myriad.scheduler.yarn.MyriadFairScheduler</value>
+    <description>One can configure other schedulers as well from following list: org.apache.myriad.scheduler.yarn.MyriadCapacityScheduler, org.apache.myriad.scheduler.yarn.MyriadFifoScheduler</description>
 </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/test/java/com/ebay/myriad/MesosModule.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/MesosModule.java b/myriad-scheduler/src/test/java/com/ebay/myriad/MesosModule.java
deleted file mode 100644
index 5f4dd8e..0000000
--- a/myriad-scheduler/src/test/java/com/ebay/myriad/MesosModule.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad;
-
-import java.util.concurrent.FutureTask;
-
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.Status;
-import org.apache.mesos.SchedulerDriver;
-import org.apache.mesos.state.State;
-import org.apache.mesos.state.Variable;
-import org.mockito.Mockito;
-
-import com.ebay.myriad.configuration.MyriadConfiguration;
-import com.ebay.myriad.scheduler.MyriadDriver;
-import com.ebay.myriad.scheduler.MyriadScheduler;
-import com.ebay.myriad.state.MyriadState;
-import com.ebay.myriad.state.SchedulerState;
-import com.google.inject.AbstractModule;
-import com.google.inject.Provides;
-import com.google.inject.Scopes;
-import com.google.inject.Singleton;
-
-/**
- * Guice Module for Mesos objects.
- */
-public class MesosModule extends AbstractModule {
-  public MesosModule() {
-  }
-
-  @Override
-  protected void configure() {
-    bind(MyriadDriver.class).in(Scopes.SINGLETON);
-  }
-
-  @Provides
-  @Singleton
-  SchedulerDriver providesSchedulerDriver(MyriadScheduler scheduler, MyriadConfiguration cfg, SchedulerState schedulerState) {
-
-    SchedulerDriver driver = Mockito.mock(SchedulerDriver.class);
-    Mockito.when(driver.start()).thenReturn(Status.DRIVER_RUNNING);
-    Mockito.when(driver.abort()).thenReturn(Status.DRIVER_ABORTED);
-
-    return driver;
-  }
-
-  @Provides
-  @Singleton
-  State providesStateStore(MyriadConfiguration cfg) {
-    State stateStore = Mockito.mock(State.class);
-
-    Runnable dummyTask = new Runnable() {
-      public void run() {
-      }
-    };
-
-    Variable var = Mockito.mock(Variable.class);
-    Protos.FrameworkID id = Protos.FrameworkID.newBuilder().setValue("1").build();
-
-    Mockito.when(var.value()).thenReturn(id.toByteArray());
-    FutureTask<Variable> futureTask = new FutureTask<Variable>(dummyTask, var);
-    futureTask.run();
-    Mockito.when(stateStore.fetch(MyriadState.KEY_FRAMEWORK_ID)).thenReturn(futureTask);
-
-    return stateStore;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/test/java/com/ebay/myriad/MultiBindingsTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/MultiBindingsTest.java b/myriad-scheduler/src/test/java/com/ebay/myriad/MultiBindingsTest.java
deleted file mode 100644
index e282f21..0000000
--- a/myriad-scheduler/src/test/java/com/ebay/myriad/MultiBindingsTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ebay.myriad;
-
-import static org.junit.Assert.*;
-
-import java.util.Map;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.ebay.myriad.scheduler.TaskFactory;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-
-/**
- * Test for Multibindings
- */
-public class MultiBindingsTest {
-
-  private static Injector injector;
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    MyriadTestModule myriadModule = new MyriadTestModule();
-    injector = Guice.createInjector(myriadModule);
-
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-  }
-
-  @Test
-  public void multiBindingsTest() {
-
-
-    MultiBindingsUsage myinstance = injector.getInstance(MultiBindingsUsage.class);
-
-    Map<String, TaskFactory> taskMap = myinstance.getMap();
-    assertNotNull(taskMap);
-    assertEquals(3, taskMap.size());
-
-    taskMap = myinstance.getMap();
-    for (Map.Entry<String, TaskFactory> entry : taskMap.entrySet()) {
-      String keyName = entry.getKey();
-      TaskFactory taskFactory = entry.getValue();
-      System.out.println(taskFactory);
-    }
-
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/test/java/com/ebay/myriad/MultiBindingsUsage.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/MultiBindingsUsage.java b/myriad-scheduler/src/test/java/com/ebay/myriad/MultiBindingsUsage.java
deleted file mode 100644
index d38f5a3..0000000
--- a/myriad-scheduler/src/test/java/com/ebay/myriad/MultiBindingsUsage.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ebay.myriad;
-
-import java.util.Map;
-
-import javax.inject.Inject;
-
-import com.ebay.myriad.scheduler.TaskFactory;
-
-/**
- * Helper class to test multibindings
- */
-public class MultiBindingsUsage {
-
-  @Inject
-  private Map<String, TaskFactory> taskFactoryMap;
-
-  public Map<String, TaskFactory> getMap() {
-    return taskFactoryMap;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/test/java/com/ebay/myriad/MyriadTestModule.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/MyriadTestModule.java b/myriad-scheduler/src/test/java/com/ebay/myriad/MyriadTestModule.java
deleted file mode 100644
index a9dd0dc..0000000
--- a/myriad-scheduler/src/test/java/com/ebay/myriad/MyriadTestModule.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ebay.myriad;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.ebay.myriad.configuration.ServiceConfiguration;
-import com.ebay.myriad.configuration.MyriadConfiguration;
-import com.ebay.myriad.configuration.MyriadExecutorConfiguration;
-import com.ebay.myriad.configuration.NodeManagerConfiguration;
-import com.ebay.myriad.scheduler.TaskFactory.NMTaskFactoryImpl;
-import com.ebay.myriad.scheduler.DownloadNMExecutorCLGenImpl;
-import com.ebay.myriad.scheduler.ExecutorCommandLineGenerator;
-import com.ebay.myriad.scheduler.NMExecutorCLGenImpl;
-import com.ebay.myriad.scheduler.ServiceTaskFactoryImpl;
-import com.ebay.myriad.scheduler.TaskFactory;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import com.google.inject.AbstractModule;
-import com.google.inject.Provides;
-import com.google.inject.Scopes;
-import com.google.inject.Singleton;
-import com.google.inject.multibindings.MapBinder;
-
-/**
- * AbstractModule extension for UnitTests
- */
-public class MyriadTestModule extends AbstractModule {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(MyriadTestModule.class);
-
-  private MyriadConfiguration cfg;
-
-  @SuppressWarnings("unchecked")
-  @Override
-  protected void configure() {
-
-    ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
-    try {
-      cfg = mapper.readValue(Thread.currentThread().getContextClassLoader().getResource("myriad-config-test-default.yml"), MyriadConfiguration.class);
-    } catch (IOException e1) {
-      LOGGER.error("IOException", e1);
-      return;
-    }
-
-    if (cfg == null) {
-      return;
-    }
-
-    bind(MyriadConfiguration.class).toInstance(cfg);
-
-    MapBinder<String, TaskFactory> mapBinder = MapBinder.newMapBinder(binder(), String.class, TaskFactory.class);
-    mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactoryImpl.class).in(Scopes.SINGLETON);
-    Map<String, ServiceConfiguration> auxServicesConfigs = cfg.getServiceConfigurations();
-    for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) {
-      String taskFactoryClass = entry.getValue().getTaskFactoryImplName().orNull();
-      if (taskFactoryClass != null) {
-        try {
-          Class<? extends TaskFactory> implClass = (Class<? extends TaskFactory>) Class.forName(taskFactoryClass);
-          mapBinder.addBinding(entry.getKey()).to(implClass).in(Scopes.SINGLETON);
-        } catch (ClassNotFoundException e) {
-          e.printStackTrace();
-        }
-      } else {
-        mapBinder.addBinding(entry.getKey()).to(ServiceTaskFactoryImpl.class).in(Scopes.SINGLETON);
-      }
-    }
-  }
-
-  @Provides
-  @Singleton
-  ExecutorCommandLineGenerator providesCLIGenerator(MyriadConfiguration cfg) {
-    ExecutorCommandLineGenerator cliGenerator = null;
-    MyriadExecutorConfiguration myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration();
-    if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
-      cliGenerator = new DownloadNMExecutorCLGenImpl(cfg, myriadExecutorConfiguration.getNodeManagerUri().get());
-    } else {
-      cliGenerator = new NMExecutorCLGenImpl(cfg);
-    }
-    return cliGenerator;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/test/java/com/ebay/myriad/configuration/MyriadBadConfigurationExceptionTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/configuration/MyriadBadConfigurationExceptionTest.java b/myriad-scheduler/src/test/java/com/ebay/myriad/configuration/MyriadBadConfigurationExceptionTest.java
deleted file mode 100644
index 6c75b38..0000000
--- a/myriad-scheduler/src/test/java/com/ebay/myriad/configuration/MyriadBadConfigurationExceptionTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ebay.myriad.configuration;
-
-import static org.junit.Assert.*;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * Class to test MyriadBadConfigurationException
- */
-public class MyriadBadConfigurationExceptionTest {
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-  }
-
-  @Test
-  public void myriadExceptionTest() {
-    final String testStr = "com.ebay.myriad.configuration.MyriadBadConfigurationException: Bad configuration exception";
-    MyriadBadConfigurationException exp = new MyriadBadConfigurationException("Bad configuration exception");
-
-    assertEquals(testStr, exp.toString());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/test/java/com/ebay/myriad/configuration/MyriadConfigurationTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/configuration/MyriadConfigurationTest.java b/myriad-scheduler/src/test/java/com/ebay/myriad/configuration/MyriadConfigurationTest.java
deleted file mode 100644
index 85d4dfa..0000000
--- a/myriad-scheduler/src/test/java/com/ebay/myriad/configuration/MyriadConfigurationTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ebay.myriad.configuration;
-
-import static org.junit.Assert.*;
-
-import java.util.Map;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-
-/**
- * AuxServices/tasks test
- */
-public class MyriadConfigurationTest {
-
-  static MyriadConfiguration cfg;
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
-    cfg = mapper.readValue(Thread.currentThread().getContextClassLoader().getResource("myriad-config-test-default.yml"), MyriadConfiguration.class);
-
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-  }
-
-  @Test
-  public void additionalPropertiestest() throws Exception {
-
-    Map<String, ServiceConfiguration> auxConfigs = cfg.getServiceConfigurations();
-
-    assertNotNull(auxConfigs);
-    assertEquals(auxConfigs.size(), 2);
-
-    for (Map.Entry<String, ServiceConfiguration> entry : auxConfigs.entrySet()) {
-      String taskName = entry.getKey();
-      ServiceConfiguration config = entry.getValue();
-      String outTaskname = config.getTaskName();
-      assertEquals(taskName, outTaskname);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy
deleted file mode 100644
index 4846c21..0000000
--- a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.ebay.myriad.scheduler
-
-import com.ebay.myriad.configuration.NodeManagerConfiguration
-import com.ebay.myriad.state.NodeTask
-import com.ebay.myriad.state.SchedulerState
-import org.apache.mesos.Protos
-import spock.lang.Specification
-
-/**
- *
- * @author kensipe
- */
-class SchedulerUtilsSpec extends Specification {
-
-    def "is unique host name"() {
-        given:
-        def offer = Mock(Protos.OfferOrBuilder)
-        offer.getHostname() >> "hostname"
-
-        expect:
-        returnValue == SchedulerUtils.isUniqueHostname(offer, launchTask, tasks)
-
-        where:
-        tasks                                              | launchTask                     | returnValue
-        []                                                 | null                           | true
-        null                                               | null                           | true
-        createNodeTaskList("hostname")                     | createNodeTask("hostname")     | false
-        createNodeTaskList("missinghost")                  | createNodeTask("hostname")     | true
-        createNodeTaskList("missinghost1", "missinghost2") | createNodeTask("missinghost3") | true
-        createNodeTaskList("missinghost1", "hostname")     | createNodeTask("hostname")     | false
-
-    }
-
-    def "is eligible for Fine Grained Scaling"() {
-        given:
-        def state = Mock(SchedulerState)
-        def tasks = []
-        def fgsNMTask = new NodeTask(new ExtendedResourceProfile(new NMProfile("zero", 0, 0), 1.0, 2.0), null)
-        def cgsNMTask = new NodeTask(new ExtendedResourceProfile(new NMProfile("low", 2, 4096), 1.0, 2.0), null)
-        fgsNMTask.setHostname("test_fgs_hostname")
-        cgsNMTask.setHostname("test_cgs_hostname")
-        tasks << fgsNMTask << cgsNMTask
-        state.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX) >> tasks
-
-        expect:
-        returnValue == SchedulerUtils.isEligibleForFineGrainedScaling(hostName, state)
-
-        where:
-        hostName            | returnValue
-        "test_fgs_hostname" | true
-        "test_cgs_hostname" | false
-        "blah"              | false
-        ""                  | false
-        null                | false
-    }
-
-    ArrayList<NodeTask> createNodeTaskList(String... hostnames) {
-        def list = []
-        hostnames.each { hostname ->
-            list << createNodeTask(hostname)
-        }
-        return list
-    }
-
-
-    NodeTask createNodeTask(String hostname) {
-        def node = new NodeTask(new ExtendedResourceProfile(new NMProfile("", 1, 1), 1.0, 1.0), null)
-        node.hostname = hostname
-        node.taskPrefix = "nm"
-        node
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/TMSTaskFactoryImpl.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/TMSTaskFactoryImpl.java b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/TMSTaskFactoryImpl.java
deleted file mode 100644
index 8a0a0b4..0000000
--- a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/TMSTaskFactoryImpl.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.ebay.myriad.scheduler;
-
-import javax.inject.Inject;
-
-import org.apache.mesos.Protos.CommandInfo;
-import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskInfo;
-
-import com.ebay.myriad.configuration.MyriadConfiguration;
-import com.ebay.myriad.state.NodeTask;
-
-/**
- * Test implementation of TaskFactory
- */
-public class TMSTaskFactoryImpl implements TaskFactory {
-
-  private MyriadConfiguration cfg;
-  private TaskUtils taskUtils;
-
-  @Inject
-  public TMSTaskFactoryImpl(MyriadConfiguration cfg, TaskUtils taskUtils) {
-    this.setCfg(cfg);
-    this.setTaskUtils(taskUtils);
-  }
-
-  @Override
-  public TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID taskId, NodeTask nodeTask) {
-    return null;
-  }
-
-  public MyriadConfiguration getCfg() {
-    return cfg;
-  }
-
-  public void setCfg(MyriadConfiguration cfg) {
-    this.cfg = cfg;
-  }
-
-  public TaskUtils getTaskUtils() {
-    return taskUtils;
-  }
-
-  public void setTaskUtils(TaskUtils taskUtils) {
-    this.taskUtils = taskUtils;
-  }
-
-  @Override
-  public ExecutorInfo getExecutorInfoForSlave(FrameworkID frameworkId, Offer offer, CommandInfo commandInfo) {
-    return null;
-  }
-}



Mime
View raw message