aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [34/51] [partial] Rename twitter* and com.twitter to apache and org.apache directories to preserve all file history before the refactor.
Date Tue, 31 Dec 2013 21:20:27 GMT
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/http/SchedulerzHome.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/SchedulerzHome.java b/src/main/java/org/apache/aurora/scheduler/http/SchedulerzHome.java
new file mode 100644
index 0000000..07a3259
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/http/SchedulerzHome.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.http;
+
+import java.util.Set;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import com.google.common.base.Function;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+import org.antlr.stringtemplate.StringTemplate;
+
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.state.CronJobManager;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.common.base.Closure;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.common.base.MorePreconditions.checkNotBlank;
+
+/**
+ * HTTP interface to serve as a HUD for the aurora scheduler.
+ */
+@Path("/scheduler")
+public class SchedulerzHome extends JerseyTemplateServlet {
+
+  private static final Function<String, Role> CREATE_ROLE = new Function<String, Role>() {
+    @Override public Role apply(String ownerRole) {
+      Role role = new Role();
+      role.role = ownerRole;
+      return role;
+    }
+  };
+
+  private final Storage storage;
+  private final CronJobManager cronScheduler;
+  private final String clusterName;
+
+  /**
+   * Creates a new scheduler home servlet.
+   *
+   * @param storage Backing store to fetch tasks from.
+   * @param cronScheduler Cron scheduler.
+   * @param clusterName Name of the serving cluster.
+   */
+  @Inject
+  public SchedulerzHome(
+      Storage storage,
+      CronJobManager cronScheduler,
+      @ClusterName String clusterName) {
+
+    super("schedulerzhome");
+    this.storage = checkNotNull(storage);
+    this.cronScheduler = checkNotNull(cronScheduler);
+    this.clusterName = checkNotBlank(clusterName);
+  }
+
+  /**
+   * Fetches the scheduler landing page.
+   *
+   * @return HTTP response.
+   */
+  @GET
+  @Produces(MediaType.TEXT_HTML)
+  public Response get() {
+    return fillTemplate(new Closure<StringTemplate>() {
+      @Override public void execute(StringTemplate template) {
+        template.setAttribute("cluster_name", clusterName);
+
+        LoadingCache<String, Role> owners =
+            CacheBuilder.newBuilder().build(CacheLoader.from(CREATE_ROLE));
+
+        // TODO(William Farner): Render this page without an expensive query.
+        Set<IScheduledTask> tasks =
+            Storage.Util.weaklyConsistentFetchTasks(storage, Query.unscoped());
+        for (ITaskConfig task : Iterables.transform(tasks, Tasks.SCHEDULED_TO_INFO)) {
+          owners.getUnchecked(task.getOwner().getRole()).accumulate(task);
+        }
+
+        // Add cron job counts for each role.
+        for (IJobConfiguration job : cronScheduler.getJobs()) {
+          owners.getUnchecked(job.getOwner().getRole()).accumulate(job);
+        }
+
+        template.setAttribute(
+            "owners",
+            DisplayUtils.ROLE_ORDERING.sortedCopy(owners.asMap().values()));
+      }
+    });
+  }
+
+  /**
+   * Template object to represent a role.
+   */
+  static class Role {
+    private String role;
+    private Set<String> jobs = Sets.newHashSet();
+    private Set<String> cronJobs = Sets.newHashSet();
+
+    private void accumulate(ITaskConfig task) {
+      jobs.add(task.getJobName());
+    }
+
+    private void accumulate(IJobConfiguration job) {
+      cronJobs.add(job.getKey().getName());
+    }
+
+    public String getRole() {
+      return role;
+    }
+
+    public int getJobCount() {
+      return jobs.size();
+    }
+
+    public int getCronJobCount() {
+      return cronJobs.size();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/http/SchedulerzJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/SchedulerzJob.java b/src/main/java/org/apache/aurora/scheduler/http/SchedulerzJob.java
new file mode 100644
index 0000000..12b0bec
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/http/SchedulerzJob.java
@@ -0,0 +1,432 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.http;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+
+import org.antlr.stringtemplate.StringTemplate;
+
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.gen.apiConstants;
+import com.twitter.aurora.scheduler.base.JobKeys;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.filter.SchedulingFilter.Veto;
+import com.twitter.aurora.scheduler.metadata.NearestFit;
+import com.twitter.aurora.scheduler.state.CronJobManager;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.entities.IAssignedTask;
+import com.twitter.aurora.scheduler.storage.entities.IConstraint;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConstraint;
+import com.twitter.aurora.scheduler.storage.entities.ITaskEvent;
+import com.twitter.common.base.Closure;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.gen.ScheduleStatus.ASSIGNED;
+import static com.twitter.aurora.gen.ScheduleStatus.FAILED;
+import static com.twitter.aurora.gen.ScheduleStatus.FINISHED;
+import static com.twitter.aurora.gen.ScheduleStatus.KILLED;
+import static com.twitter.aurora.gen.ScheduleStatus.KILLING;
+import static com.twitter.aurora.gen.ScheduleStatus.LOST;
+import static com.twitter.aurora.gen.ScheduleStatus.PENDING;
+import static com.twitter.aurora.gen.ScheduleStatus.RUNNING;
+import static com.twitter.aurora.gen.ScheduleStatus.STARTING;
+import static com.twitter.common.base.MorePreconditions.checkNotBlank;
+
+/**
+ * HTTP interface to view information about a job in the aurora scheduler.
+ */
+@Path("/scheduler/{role}/{environment}/{job}")
+public class SchedulerzJob extends JerseyTemplateServlet {
+  private static final String STATUS_FILTER_PARAM = "status";
+  private static final String ADMIN_VIEW_PARAM = "admin";
+
+  // Pagination controls.
+  private static final String OFFSET_PARAM = "o";
+  private static final int PAGE_SIZE = 50;
+
+  private static final Ordering<IScheduledTask> INSTANCE_ID_COMPARATOR =
+    Ordering.natural().onResultOf(Tasks.SCHEDULED_TO_INSTANCE_ID);
+
+  private static final Map<ScheduleStatus, Set<ScheduleStatus>> FILTER_MAP =
+      ImmutableMap.<ScheduleStatus, Set<ScheduleStatus>>builder()
+        .put(PENDING, EnumSet.of(PENDING))
+        .put(RUNNING, EnumSet.of(ASSIGNED, STARTING, RUNNING, KILLING))
+        .put(FINISHED, EnumSet.of(KILLED, FINISHED))
+        .put(FAILED, EnumSet.of(LOST, FAILED))
+      .build();
+
+  private static final Comparator<IScheduledTask> REVERSE_CHRON_COMPARATOR =
+      new Comparator<IScheduledTask>() {
+        @Override public int compare(IScheduledTask taskA, IScheduledTask taskB) {
+          // Sort in reverse chronological order.
+          Iterable<ITaskEvent> taskAEvents = taskA.getTaskEvents();
+          Iterable<ITaskEvent> taskBEvents = taskB.getTaskEvents();
+
+          boolean taskAHasEvents = taskAEvents != null && !Iterables.isEmpty(taskAEvents);
+          boolean taskBHasEvents = taskBEvents != null && !Iterables.isEmpty(taskBEvents);
+          if (taskAHasEvents && taskBHasEvents) {
+            return Long.signum(Iterables.getLast(taskBEvents).getTimestamp()
+                - Iterables.getLast(taskAEvents).getTimestamp());
+          } else {
+            return 0;
+          }
+        }
+      };
+
+  private static final Function<Veto, String> GET_REASON = new Function<Veto, String>() {
+    @Override public String apply(Veto veto) {
+      return veto.getReason();
+    }
+  };
+
+  // Double percents to escape formatting sequence.
+  private static final String PORT_FORMAT = "%%port:%s%%";
+  // TODO(William Farner): Search for usage of this, figure out a deprecation strategy to switch
+  //                       to %instance_id%.
+  private static final String INSTANCE_ID_REGEXP = "%shard_id%";
+  private static final String TASK_ID_REGEXP = "%task_id%";
+  private static final String HOST_REGEXP = "%host%";
+
+  private static String expandText(String value, IAssignedTask task) {
+    String expanded = value
+        .replaceAll(INSTANCE_ID_REGEXP, String.valueOf(task.getInstanceId()))
+        .replaceAll(TASK_ID_REGEXP, task.getTaskId());
+
+    if (task.isSetSlaveHost()) {
+      expanded = expanded.replaceAll(HOST_REGEXP, task.getSlaveHost());
+    }
+
+    // Expand ports.
+    if (task.isSetAssignedPorts()) {
+      for (Map.Entry<String, Integer> portEntry : task.getAssignedPorts().entrySet()) {
+        expanded = expanded.replaceAll(
+            String.format(PORT_FORMAT, portEntry.getKey()),
+            String.valueOf(portEntry.getValue()));
+      }
+    }
+
+    return expanded;
+  }
+
+  private final Function<IScheduledTask, Map<String, Object>> taskToStringMap =
+      new Function<IScheduledTask, Map<String, Object>>() {
+        @Override public Map<String, Object> apply(IScheduledTask scheduledTask) {
+          final IAssignedTask task = scheduledTask.getAssignedTask();
+          ImmutableMap.Builder<String, Object> builder = ImmutableMap.<String, Object>builder()
+            .put("taskId", task.getTaskId())
+            .put("instanceId", task.getInstanceId())
+            .put("slaveHost", task.isSetSlaveHost() ? task.getSlaveHost() : "")
+            .put("status", scheduledTask.getStatus())
+            .put("statusTimestamp", Iterables.getLast(scheduledTask.getTaskEvents()).getTimestamp())
+            .put("taskEvents", scheduledTask.getTaskEvents());
+
+          if (scheduledTask.getStatus() == ScheduleStatus.PENDING) {
+            String pendingReason;
+            Set<Veto> vetoes = nearestFit.getNearestFit(task.getTaskId());
+            if (vetoes.isEmpty()) {
+              pendingReason = "No matching hosts.";
+            } else {
+              pendingReason = Joiner.on(",").join(Iterables.transform(vetoes, GET_REASON));
+            }
+            builder.put("pendingReason", pendingReason);
+          }
+
+          Function<String, String> expander = new Function<String, String>() {
+            @Override public String apply(String input) {
+              return expandText(input, task);
+            }
+          };
+
+          Map<String, String> links = ImmutableMap.of();
+          if (apiConstants.LIVE_STATES.contains(scheduledTask.getStatus())) {
+            links =
+                ImmutableMap.copyOf(Maps.transformValues(task.getTask().getTaskLinks(), expander));
+          }
+          builder.put("links", links);
+          builder.put("executorPort", 1338);
+          if (task.isSetSlaveHost()) {
+            builder.put("executorUri",
+                "http://" + task.getSlaveHost() + ":1338/task/" + task.getTaskId());
+          }
+          return builder.build();
+        }
+      };
+
+  private final Storage storage;
+  private final String clusterName;
+  private final NearestFit nearestFit;
+  private final CronJobManager cronJobManager;
+
+  /**
+   * Creates a new job servlet.
+   *
+   * @param storage Backing store to fetch tasks from.
+   * @param clusterName Name of the serving cluster.
+   */
+  @Inject
+  public SchedulerzJob(
+      Storage storage,
+      CronJobManager cronJobManager,
+      @ClusterName String clusterName,
+      NearestFit nearestFit) {
+
+    super("schedulerzjob");
+    this.storage = checkNotNull(storage);
+    this.clusterName = checkNotBlank(clusterName);
+    this.nearestFit = checkNotNull(nearestFit);
+    this.cronJobManager = checkNotNull(cronJobManager);
+  }
+
+  private static <T> Iterable<T> offsetAndLimit(Iterable<T> iterable, int offset) {
+    return ImmutableList.copyOf(Iterables.limit(Iterables.skip(iterable, offset), PAGE_SIZE));
+  }
+
+  private static String scaleMb(long mb) {
+    return (mb >= 1024) ? ((mb / 1024) + " GiB") : (mb + " MiB");
+  }
+
+  private static final Function<IConstraint, String> DISPLAY_CONSTRAINT =
+      new Function<IConstraint, String>() {
+        @Override public String apply(IConstraint constraint) {
+          StringBuilder sb = new StringBuilder().append(constraint.getName()).append(": ");
+          ITaskConstraint taskConstraint = constraint.getConstraint();
+          switch (taskConstraint.getSetField()) {
+            case VALUE:
+              if (taskConstraint.getValue().isNegated()) {
+                sb.append("not ");
+              }
+              sb.append(Joiner.on(", ").join(taskConstraint.getValue().getValues()));
+              break;
+
+            case LIMIT:
+              sb.append("limit ").append(taskConstraint.getLimit().getLimit());
+              break;
+
+            default:
+              sb.append("Unhandled constraint type " + taskConstraint.getSetField());
+          }
+
+          return sb.toString();
+        }
+      };
+
+  private static final Function<ITaskConfig, SchedulingDetails> CONFIG_TO_DETAILS =
+      new Function<ITaskConfig, SchedulingDetails>() {
+        @Override public SchedulingDetails apply(ITaskConfig task) {
+          String resources = Joiner.on(", ").join(
+              "cpu: " + task.getNumCpus(),
+              "ram: " + scaleMb(task.getRamMb()),
+              "disk: " + scaleMb(task.getDiskMb()));
+          ImmutableMap.Builder<String, Object> details = ImmutableMap.<String, Object>builder()
+              .put("resources", resources);
+          if (!task.getConstraints().isEmpty()) {
+            Iterable<String> displayConstraints = FluentIterable.from(task.getConstraints())
+                .transform(DISPLAY_CONSTRAINT)
+                .toSortedList(Ordering.<String>natural());
+            details.put("constraints", Joiner.on(", ").join(displayConstraints));
+          }
+          if (task.isIsService()) {
+            details.put("service", "true");
+          }
+          if (task.isProduction()) {
+            details.put("production", "true");
+          }
+          if (!task.getRequestedPorts().isEmpty()) {
+            details.put("ports",
+                Joiner.on(", ").join(ImmutableSortedSet.copyOf(task.getRequestedPorts())));
+          }
+          if (!task.getPackages().isEmpty()) {
+            List<String> packages = Ordering.natural().sortedCopy(
+                Iterables.transform(task.getPackages(), TransformationUtils.PACKAGE_TOSTRING));
+            details.put(
+                "packages",
+                Joiner.on(',').join(packages));
+          }
+          details.put("contact", task.isSetContactEmail() ? task.getContactEmail() : "none");
+          return new SchedulingDetails(details.build());
+        }
+      };
+
+  static class SchedulingDetails {
+    private final Map<String, Object> details;
+
+    SchedulingDetails(ImmutableMap<String, Object> details) {
+      this.details = details;
+    }
+
+    public Map<String, Object> getDetails() {
+      return details;
+    }
+
+    @Override
+    public int hashCode() {
+      return details.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof SchedulingDetails)) {
+        return false;
+      }
+
+      SchedulingDetails other = (SchedulingDetails) o;
+      return other.details.equals(details);
+    }
+  }
+
+
+
+  private static Map<String, SchedulingDetails> buildSchedulingTable(
+      Iterable<IAssignedTask> tasks) {
+
+    Map<Integer, ITaskConfig> byInstance = Maps.transformValues(
+        Maps.uniqueIndex(tasks, Tasks.ASSIGNED_TO_INSTANCE_ID),
+        Tasks.ASSIGNED_TO_INFO);
+    Map<Integer, SchedulingDetails> detailsByInstance =
+        Maps.transformValues(byInstance, CONFIG_TO_DETAILS);
+    Multimap<SchedulingDetails, Integer> instancesByDetails = Multimaps.invertFrom(
+        Multimaps.forMap(detailsByInstance), HashMultimap.<SchedulingDetails, Integer>create());
+    Map<SchedulingDetails, String> instanceStringsByDetails =
+        Maps.transformValues(instancesByDetails.asMap(), TransformationUtils.INSTANCES_TOSTRING);
+    return HashBiMap.create(instanceStringsByDetails).inverse();
+  }
+
+  /**
+   * Fetches the landing page for a job within a role.
+   *
+   * @return HTTP response.
+   */
+  @GET
+  @Produces(MediaType.TEXT_HTML)
+  public Response get(
+      @PathParam("role") final String role,
+      @PathParam("environment") final String environment,
+      @PathParam("job") final String job,
+      @QueryParam(OFFSET_PARAM) final int offset,
+      @QueryParam(STATUS_FILTER_PARAM) final String filterArg,
+      @QueryParam(ADMIN_VIEW_PARAM) final String adminView) {
+
+    return fillTemplate(new Closure<StringTemplate>() {
+      @Override public void execute(StringTemplate template) {
+        template.setAttribute("cluster_name", clusterName);
+        template.setAttribute(ADMIN_VIEW_PARAM, adminView != null);
+        IJobKey jobKey = JobKeys.from(role, environment, job);
+
+        boolean isCron = cronJobManager.hasJob(jobKey);
+        template.setAttribute("is_cron", isCron);
+
+        ScheduleStatus statusFilter = null;
+        if (filterArg != null) {
+          template.setAttribute(STATUS_FILTER_PARAM, filterArg);
+
+          try {
+            statusFilter = ScheduleStatus.valueOf(filterArg.toUpperCase());
+          } catch (IllegalArgumentException e) {
+            template.setAttribute("exception", "Invalid status type: " + filterArg);
+            return;
+          }
+        }
+
+        template.setAttribute("role", role);
+        template.setAttribute("environment", environment);
+        template.setAttribute("job", job);
+        template.setAttribute("statsUrl", DisplayUtils.getJobDashboardUrl(jobKey));
+        boolean hasMore = false;
+
+        Query.Builder builder = Query.jobScoped(JobKeys.from(role, environment, job));
+
+        Optional<Query.Builder> activeQuery = Optional.absent();
+        Optional<Query.Builder> completedQuery = Optional.absent();
+        if (statusFilter != null) {
+          Collection<ScheduleStatus> queryStatuses = FILTER_MAP.get(statusFilter);
+          if (Tasks.isActive(statusFilter)) {
+            activeQuery = Optional.of(builder.byStatus(queryStatuses));
+          } else {
+            completedQuery = Optional.of(builder.byStatus(queryStatuses));
+          }
+        } else {
+          activeQuery = Optional.of(builder.active());
+          completedQuery = Optional.of(builder.terminal());
+        }
+
+        if (activeQuery.isPresent()) {
+          Set<IScheduledTask> activeTasks =
+              Storage.Util.weaklyConsistentFetchTasks(storage, activeQuery.get());
+          List<IScheduledTask> liveTasks = INSTANCE_ID_COMPARATOR.sortedCopy(activeTasks);
+          template.setAttribute("activeTasks",
+              ImmutableList.copyOf(
+                  Iterables.transform(offsetAndLimit(liveTasks, offset), taskToStringMap)));
+          hasMore = hasMore || (liveTasks.size() > (offset + PAGE_SIZE));
+          template.setAttribute("schedulingDetails",
+              buildSchedulingTable(Iterables.transform(liveTasks, Tasks.SCHEDULED_TO_ASSIGNED)));
+        }
+        if (completedQuery.isPresent()) {
+          List<IScheduledTask> completedTasks = Lists.newArrayList(
+              Storage.Util.weaklyConsistentFetchTasks(storage, completedQuery.get()));
+          Collections.sort(completedTasks, REVERSE_CHRON_COMPARATOR);
+          template.setAttribute("completedTasks",
+              ImmutableList.copyOf(
+                  Iterables.transform(offsetAndLimit(completedTasks, offset), taskToStringMap)));
+          hasMore = hasMore || (completedTasks.size() > (offset + PAGE_SIZE));
+        }
+
+        template.setAttribute("offset", offset);
+        if (offset > 0) {
+          template.setAttribute("prevOffset", Math.max(0, offset - PAGE_SIZE));
+        }
+        if (hasMore) {
+          template.setAttribute("nextOffset", offset + PAGE_SIZE);
+        }
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java b/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java
new file mode 100644
index 0000000..756c672
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/http/SchedulerzRole.java
@@ -0,0 +1,393 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.http;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+
+import org.antlr.stringtemplate.StringTemplate;
+
+import com.twitter.aurora.gen.CronCollisionPolicy;
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.gen.apiConstants;
+import com.twitter.aurora.scheduler.base.JobKeys;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.cron.CronPredictor;
+import com.twitter.aurora.scheduler.quota.QuotaManager;
+import com.twitter.aurora.scheduler.quota.Quotas;
+import com.twitter.aurora.scheduler.state.CronJobManager;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.aurora.scheduler.storage.entities.IQuota;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.common.base.Closure;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.scheduler.base.Tasks.GET_STATUS;
+import static com.twitter.aurora.scheduler.base.Tasks.LATEST_ACTIVITY;
+import static com.twitter.common.base.MorePreconditions.checkNotBlank;
+
+/**
+ * HTTP interface to provide information about jobs for a specific role.
+ */
+@Path("/scheduler/{role}")
+public class SchedulerzRole extends JerseyTemplateServlet {
+
+  private static final List<ScheduleStatus> STATUSES = ImmutableList.<ScheduleStatus>builder()
+      .addAll(apiConstants.TERMINAL_STATES)
+      .addAll(apiConstants.ACTIVE_STATES)
+      .build();
+
+  // The freshest task is the latest active task
+  // or the latest inactive task if no active task exists.
+  private static final Ordering<IScheduledTask> FRESH_TASK_ORDER =
+      Ordering.explicit(STATUSES).onResultOf(GET_STATUS).compound(LATEST_ACTIVITY);
+
+  @VisibleForTesting
+  static IScheduledTask getFreshestTask(Iterable<IScheduledTask> tasks) {
+    return FRESH_TASK_ORDER.max(tasks);
+  }
+
+  private final Storage storage;
+  private final CronJobManager cronJobManager;
+  private final CronPredictor cronPredictor;
+  private final String clusterName;
+  private final QuotaManager quotaManager;
+
+  @Inject
+  SchedulerzRole(
+      Storage storage,
+      CronJobManager cronJobManager,
+      CronPredictor cronPredictor,
+      @ClusterName String clusterName,
+      QuotaManager quotaManager) {
+
+    super("schedulerzrole");
+    this.storage = checkNotNull(storage);
+    this.cronJobManager = checkNotNull(cronJobManager);
+    this.cronPredictor = checkNotNull(cronPredictor);
+    this.clusterName = checkNotBlank(clusterName);
+    this.quotaManager = checkNotNull(quotaManager);
+  }
+
+  /**
+   * Fetches the landing page for a role.
+   *
+   * @return HTTP response.
+   */
+  @GET
+  @Produces(MediaType.TEXT_HTML)
+  public Response get(@PathParam("role") final String role) {
+    return processRequest(Optional.of(role), Optional.<String>absent());
+  }
+
+  private Response processRequest(final Optional<String> role, final Optional<String> environment) {
+    return fillTemplate(new Closure<StringTemplate>() {
+      @Override public void execute(StringTemplate template) {
+
+        if (!role.isPresent()) {
+          template.setAttribute("exception", "Please specify a user.");
+          return;
+        }
+
+        Map<IJobKey, Map<?, ?>> cronJobs = fetchCronJobsBy(role.get(), environment);
+        List<Job> jobs = fetchJobsBy(role.get(), environment, cronJobs);
+        if (jobs.isEmpty() && cronJobs.isEmpty()) {
+          String msg = "No jobs found for role " + role.get()
+              + (environment.isPresent() ? (" and environment " + environment.get()) : "");
+          throw new WebApplicationException(Response.status(Status.NOT_FOUND).entity(msg).build());
+        }
+
+        template.setAttribute("cluster_name", clusterName);
+        template.setAttribute("role", role.get());
+        template.setAttribute("environment", environment.orNull());
+        template.setAttribute("jobs", jobs);
+        template.setAttribute("cronJobs", cronJobs.values());
+
+        // TODO(Suman Karumuri): In future compute consumption for role and environment.
+        template.setAttribute("prodResourcesUsed", quotaManager.getConsumption(role.get()));
+        template.setAttribute("nonProdResourcesUsed", getNonProdConsumption(role.get()));
+        template.setAttribute("resourceQuota", getQuota(role.get()));
+      }
+    });
+  }
+
+  private IQuota getQuota(final String role) {
+    return Storage.Util.consistentFetchQuota(storage, role).or(Quotas.noQuota());
+  }
+
+  private IQuota getNonProdConsumption(String role) {
+    FluentIterable<ITaskConfig> tasks = FluentIterable
+        .from(Storage.Util.weaklyConsistentFetchTasks(storage, Query.roleScoped(role).active()))
+        .transform(Tasks.SCHEDULED_TO_INFO)
+        .filter(Predicates.not(Tasks.IS_PRODUCTION));
+
+    return Quotas.fromTasks(tasks);
+  }
+
+  /**
+   * Display jobs for a role and environment.
+   */
+  @Path("/{environment}")
+  @GET
+  @Produces(MediaType.TEXT_HTML)
+  public Response get(
+      @PathParam("role") final String role,
+      @PathParam("environment") final String environment) {
+
+    Optional<String> env = Optional.of(environment);
+    if (env.isPresent() && env.get().isEmpty()) {
+      env = Optional.absent();
+    }
+
+    return processRequest(Optional.of(role), env);
+  }
+
+  private Map<IJobKey, Map<?, ?>> fetchCronJobsBy(
+      final String role,
+      final Optional<String> environment) {
+
+    Predicate<IJobConfiguration> byRoleEnv = new Predicate<IJobConfiguration>() {
+      @Override public boolean apply(IJobConfiguration job) {
+        boolean roleMatch = job.getOwner().getRole().equals(role);
+        boolean envMatch = !environment.isPresent()
+            || job.getKey().getEnvironment().equals(environment.get());
+        return roleMatch && envMatch;
+      }
+    };
+
+    Iterable<IJobConfiguration> jobs = FluentIterable
+        .from(cronJobManager.getJobs())
+        .filter(byRoleEnv);
+
+    return Maps.transformValues(Maps.uniqueIndex(jobs, JobKeys.FROM_CONFIG),
+        new Function<IJobConfiguration, Map<?, ?>>() {
+          @Override public Map<?, ?> apply(IJobConfiguration job) {
+            return ImmutableMap.<Object, Object>builder()
+                .put("jobKey", job.getKey())
+                .put("name", job.getKey().getName())
+                .put("environment", job.getKey().getEnvironment())
+                .put("pendingTaskCount", job.getInstanceCount())
+                .put("cronSchedule", job.getCronSchedule())
+                .put("nextRun", cronPredictor.predictNextRun(job.getCronSchedule()).getTime())
+                .put("cronCollisionPolicy", cronCollisionPolicy(job))
+                .put("packages", getPackages(job))
+                .build();
+          }
+        });
+  }
+
+  private static CronCollisionPolicy cronCollisionPolicy(IJobConfiguration jobConfiguration) {
+    return CronJobManager.orDefault(jobConfiguration.getCronCollisionPolicy());
+  }
+
+  private static String getPackages(IJobConfiguration job) {
+    Set<String> packages = Sets.newHashSet();
+
+    // Insert all packages for all tasks in the set to eliminate duplicates
+    ITaskConfig task = job.getTaskConfig();
+    if (!task.getPackages().isEmpty()) {
+      packages.addAll(Lists.newArrayList(
+          Iterables.transform(task.getPackages(), TransformationUtils.PACKAGE_TOSTRING)));
+    }
+    return Joiner.on(',').join(packages);
+  }
+
+  private List<Job> fetchJobsBy(
+      final String role,
+      final Optional<String> environment,
+      final Map<IJobKey, Map<?, ?>> cronJobs) {
+
+    final Function<Map.Entry<IJobKey, Collection<IScheduledTask>>, Job> toJob =
+        new Function<Map.Entry<IJobKey, Collection<IScheduledTask>>, Job>() {
+          @Override public Job apply(Map.Entry<IJobKey, Collection<IScheduledTask>> tasksByJobKey) {
+            IJobKey jobKey = tasksByJobKey.getKey();
+            Collection<IScheduledTask> tasks = tasksByJobKey.getValue();
+
+            Job job = new Job();
+            job.environment = jobKey.getEnvironment();
+            job.name = jobKey.getName();
+
+            // Pick the freshest task's config and associate it with the job.
+            ITaskConfig freshestConfig = getFreshestTask(tasks).getAssignedTask().getTask();
+            job.production = freshestConfig.isProduction();
+
+            // TODO(Suman Karumuri): Add a source/job type to TaskConfig and replace logic below
+            if (freshestConfig.isIsService()) {
+              job.type = JobType.SERVICE;
+            } else if (cronJobs.containsKey(jobKey)) {
+              job.type = JobType.CRON;
+            } else {
+              job.type = JobType.ADHOC;
+            }
+
+            for (IScheduledTask task : tasks) {
+              switch (task.getStatus()) {
+                case INIT:
+                case PENDING:
+                  job.pendingTaskCount++;
+                  break;
+
+                case ASSIGNED:
+                case STARTING:
+                case RESTARTING:
+                case RUNNING:
+                case KILLING:
+                case PREEMPTING:
+                  job.activeTaskCount++;
+                  break;
+
+                case KILLED:
+                case FINISHED:
+                  job.finishedTaskCount++;
+                  break;
+
+                case LOST:
+                case FAILED:
+                case UNKNOWN:
+                  job.failedTaskCount++;
+                  Date now = new Date();
+                  long elapsedMillis = now.getTime()
+                      - Iterables.getLast(task.getTaskEvents()).getTimestamp();
+
+                  if (Amount.of(elapsedMillis, Time.MILLISECONDS).as(Time.HOURS) < 6) {
+                    job.recentlyFailedTaskCount++;
+                  }
+                  break;
+
+                default:
+                  throw new IllegalArgumentException("Unsupported status: " + task.getStatus());
+              }
+            }
+
+            return job;
+          }
+        };
+
+    Query.Builder query = environment.isPresent()
+        ? Query.envScoped(role, environment.get())
+        : Query.roleScoped(role);
+
+    Multimap<IJobKey, IScheduledTask> tasks =
+        Tasks.byJobKey(Storage.Util.weaklyConsistentFetchTasks(storage, query));
+
+    Iterable<Job> jobs = FluentIterable
+        .from(tasks.asMap().entrySet())
+        .transform(toJob);
+
+    return DisplayUtils.JOB_ORDERING.sortedCopy(jobs);
+  }
+
+  /**
+   * Template object to represent a job.
+   */
+  static class Job {
+    private String name;
+    private String environment;
+    private int pendingTaskCount = 0;
+    private int activeTaskCount = 0;
+    private int finishedTaskCount = 0;
+    private int failedTaskCount = 0;
+    private int recentlyFailedTaskCount = 0;
+    private boolean production = false;
+    private JobType type;
+
+    public String getName() {
+      return name;
+    }
+
+    public String getEnvironment() {
+      return environment;
+    }
+
+    public int getPendingTaskCount() {
+      return pendingTaskCount;
+    }
+
+    public int getActiveTaskCount() {
+      return activeTaskCount;
+    }
+
+    public int getFinishedTaskCount() {
+      return finishedTaskCount;
+    }
+
+    public int getFailedTaskCount() {
+      return failedTaskCount;
+    }
+
+    public int getRecentlyFailedTaskCount() {
+      return recentlyFailedTaskCount;
+    }
+
+    public boolean getProduction() {
+      return production;
+    }
+
+    public String getType() {
+      return type.toString();
+    }
+  }
+
+  static enum JobType {
+    ADHOC("adhoc"), CRON("cron"), SERVICE("service");
+
+    private String jobType;
+
+    private JobType(String jobType) {
+      this.jobType = jobType;
+    }
+
+    public String toString() {
+      return jobType;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java b/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java
new file mode 100644
index 0000000..825e2e1
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/http/ServletModule.java
@@ -0,0 +1,225 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.http;
+
+import java.util.Map;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Resources;
+import com.google.common.net.MediaType;
+import com.google.inject.AbstractModule;
+import com.google.inject.Key;
+import com.google.inject.TypeLiteral;
+import com.google.inject.servlet.GuiceFilter;
+import com.sun.jersey.api.container.filter.GZIPContentEncodingFilter;
+import com.sun.jersey.guice.JerseyServletModule;
+import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
+
+import com.twitter.aurora.scheduler.quota.QuotaManager;
+import com.twitter.aurora.scheduler.state.CronJobManager;
+import com.twitter.aurora.scheduler.state.SchedulerCore;
+import com.twitter.common.application.http.Registration;
+import com.twitter.common.application.modules.LifecycleModule;
+import com.twitter.common.application.modules.LocalServiceRegistry;
+import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.net.pool.DynamicHostSet;
+import com.twitter.common.net.pool.DynamicHostSet.MonitorException;
+import com.twitter.thrift.ServiceInstance;
+
+import static com.sun.jersey.api.core.ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS;
+import static com.sun.jersey.api.core.ResourceConfig.PROPERTY_CONTAINER_RESPONSE_FILTERS;
+import static com.sun.jersey.api.json.JSONConfiguration.FEATURE_POJO_MAPPING;
+
+/**
+ * Binding module for scheduler HTTP servlets.
+ */
+public class ServletModule extends AbstractModule {
+
+  private static final Map<String, String> CONTAINER_PARAMS = ImmutableMap.of(
+      FEATURE_POJO_MAPPING, Boolean.TRUE.toString(),
+      PROPERTY_CONTAINER_REQUEST_FILTERS, GZIPContentEncodingFilter.class.getName(),
+      PROPERTY_CONTAINER_RESPONSE_FILTERS, GZIPContentEncodingFilter.class.getName());
+
+  @Override
+  protected void configure() {
+    requireBinding(SchedulerCore.class);
+    requireBinding(CronJobManager.class);
+    requireBinding(Key.get(String.class, ClusterName.class));
+    requireBinding(QuotaManager.class);
+
+    // Bindings required for the leader redirector.
+    requireBinding(LocalServiceRegistry.class);
+    requireBinding(Key.get(new TypeLiteral<DynamicHostSet<ServiceInstance>>() { }));
+    Registration.registerServletFilter(binder(), GuiceFilter.class, "/*");
+    install(new JerseyServletModule() {
+      private void registerJerseyEndpoint(String indexPath, Class<?>... servlets) {
+        filter(indexPath + "*").through(LeaderRedirectFilter.class);
+        filter(indexPath + "*").through(GuiceContainer.class, CONTAINER_PARAMS);
+        Registration.registerEndpoint(binder(), indexPath);
+        for (Class<?> servlet : servlets) {
+          bind(servlet);
+        }
+      }
+
+      @Override protected void configureServlets() {
+        bind(HttpStatsFilter.class).in(Singleton.class);
+        filter("/scheduler*").through(HttpStatsFilter.class);
+        bind(LeaderRedirectFilter.class).in(Singleton.class);
+        registerJerseyEndpoint("/cron", Cron.class);
+        registerJerseyEndpoint("/maintenance", Maintenance.class);
+        registerJerseyEndpoint("/mname", Mname.class);
+        registerJerseyEndpoint("/offers", Offers.class);
+        registerJerseyEndpoint("/pendingtasks", PendingTasks.class);
+        registerJerseyEndpoint("/quotas", Quotas.class);
+        registerJerseyEndpoint(
+            "/scheduler",
+            SchedulerzHome.class,
+            SchedulerzRole.class,
+            SchedulerzJob.class);
+        registerJerseyEndpoint("/slaves", Slaves.class);
+        registerJerseyEndpoint("/structdump", StructDump.class);
+        registerJerseyEndpoint("/utilization", Utilization.class);
+      }
+    });
+
+    // Static assets.
+    registerJQueryAssets();
+    registerBootstrapAssets();
+
+    registerAsset("assets/util.js", "/js/util.js");
+    registerAsset("assets/dictionary.js", "/js/dictionary.js");
+    registerAsset("assets/images/viz.png", "/images/viz.png");
+    registerAsset("assets/images/aurora.png", "/images/aurora.png");
+
+    // Register datatables
+    registerAsset("assets/datatables/css/jquery.dataTables.css", "/css/jquery.dataTables.css");
+    registerAsset("assets/datatables/images/back_disabled.png", "/images/back_disabled.png");
+    registerAsset(
+        "assets/datatables/images/back_enabled_hover.png",
+        "/images/back_enabled_hover.png");
+    registerAsset("assets/datatables/images/back_enabled.png", "/images/back_enabled.png");
+    registerAsset(
+        "assets/datatables/images/forward_disabled.png",
+        "/images/forward_disabled.png");
+    registerAsset(
+        "assets/datatables/images/forward_enabled_hover.png",
+        "/images/forward_enabled_hover.png");
+    registerAsset(
+        "assets/datatables/images/forward_enabled.png",
+        "/images/forward_enabled.png");
+    registerAsset(
+        "assets/datatables/images/sort_asc_disabled.png",
+        "/images/sort_asc_disabled.png");
+    registerAsset("assets/datatables/images/sort_asc.png", "/images/sort_asc.png");
+    registerAsset("assets/datatables/images/sort_both.png", "/images/sort_both.png");
+    registerAsset(
+        "assets/datatables/images/sort_desc_disabled.png",
+        "/images/sort_desc_disabled.png");
+    registerAsset("assets/datatables/images/sort_desc.png", "/images/sort_desc.png");
+    registerAsset(
+        "assets/datatables/js/jquery.dataTables.min.js",
+        "/js/jquery.dataTables.min.js");
+    registerAsset(
+        "assets/datatables/js/dataTables.bootstrap.js",
+        "/js/dataTables.bootstrap.js");
+    registerAsset(
+        "assets/datatables/js/dataTables.localstorage.js",
+        "/js/dataTables.localstorage.js");
+    registerAsset(
+        "assets/datatables/js/dataTables.htmlNumberType.js",
+        "/js/dataTables.htmlNumberType.js");
+
+    bind(LeaderRedirect.class).in(Singleton.class);
+    LifecycleModule.bindStartupAction(binder(), RedirectMonitor.class);
+  }
+
+  private void registerJQueryAssets() {
+    registerAsset("bower_components/jquery/jquery.js", "/js/jquery.min.js", false);
+  }
+
+  private void registerBootstrapAssets() {
+    final String BOOTSTRAP_PATH = "bower_components/bootstrap.css/";
+
+    registerAsset(BOOTSTRAP_PATH + "js/bootstrap.min.js", "/js/bootstrap.min.js", false);
+    registerAsset(BOOTSTRAP_PATH + "css/bootstrap.min.css", "/css/bootstrap.min.css", false);
+    registerAsset(BOOTSTRAP_PATH + "css/bootstrap-responsive.min.css",
+        "/css/bootstrap-responsive.min.css",
+        false);
+    registerAsset(BOOTSTRAP_PATH + "img/glyphicons-halflings-white.png",
+        "/img/glyphicons-halflings-white.png",
+        false);
+    registerAsset(BOOTSTRAP_PATH + "img/glyphicons-halflings.png",
+        "/img/glyphicons-halflings.png",
+        false);
+  }
+
+  private void registerAsset(String resourceLocation, String registerLocation) {
+    registerAsset(resourceLocation, registerLocation, true);
+  }
+
+  private void registerAsset(String resourceLocation, String registerLocation, boolean isRelative) {
+    String mediaType = getMediaType(registerLocation).toString();
+
+    if (isRelative) {
+      Registration.registerHttpAsset(
+          binder(),
+          registerLocation,
+          ServletModule.class,
+          resourceLocation,
+          mediaType,
+          true);
+    } else {
+      Registration.registerHttpAsset(
+          binder(),
+          registerLocation,
+          Resources.getResource(resourceLocation),
+          mediaType,
+          true);
+    }
+  }
+
+  private MediaType getMediaType(String filePath) {
+    if (filePath.endsWith(".png")) {
+      return MediaType.PNG;
+    } else if (filePath.endsWith(".js")) {
+      return MediaType.JAVASCRIPT_UTF_8;
+    } else if (filePath.endsWith(".html")) {
+      return MediaType.HTML_UTF_8;
+    } else if (filePath.endsWith(".css")) {
+      return MediaType.CSS_UTF_8;
+    } else {
+      throw new IllegalArgumentException("Could not determine media type for " + filePath);
+    }
+  }
+
+  static class RedirectMonitor implements ExceptionalCommand<MonitorException> {
+
+    private final LeaderRedirect redirector;
+
+    @Inject
+    RedirectMonitor(LeaderRedirect redirector) {
+      this.redirector = Preconditions.checkNotNull(redirector);
+    }
+
+    @Override public void execute() throws MonitorException {
+      redirector.monitor();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/http/Slaves.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Slaves.java b/src/main/java/org/apache/aurora/scheduler/http/Slaves.java
new file mode 100644
index 0000000..401ee30
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/http/Slaves.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.http;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+
+import org.antlr.stringtemplate.StringTemplate;
+
+import com.twitter.aurora.gen.Attribute;
+import com.twitter.aurora.gen.HostAttributes;
+import com.twitter.aurora.gen.MaintenanceMode;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.common.base.Closure;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import static com.twitter.aurora.scheduler.storage.Storage.Work;
+import static com.twitter.common.base.MorePreconditions.checkNotBlank;
+
+/**
+ * HTTP interface to serve as a HUD for the mesos slaves tracked in the scheduler.
+ */
+@Path("/slaves")
+public class Slaves extends JerseyTemplateServlet {
+  private final String clusterName;
+  private Storage storage;
+
+  /**
+   * Injected constructor.
+   *
+   * @param clusterName cluster name
+   * @param storage store to fetch the host attributes from
+   */
+  @Inject
+  public Slaves(@ClusterName String clusterName, Storage storage) {
+    super("slaves");
+    this.clusterName = checkNotBlank(clusterName);
+    this.storage = checkNotNull(storage);
+  }
+
+  private Iterable<HostAttributes> getHostAttributes() {
+    return storage.weaklyConsistentRead(new Work.Quiet<Iterable<HostAttributes>>() {
+      @Override public Iterable<HostAttributes> apply(StoreProvider storeProvider) {
+        return storeProvider.getAttributeStore().getHostAttributes();
+      }
+    });
+  }
+
+  private static final Function<HostAttributes, Slave> TO_SLAVE =
+      new Function<HostAttributes, Slave>() {
+        @Override public Slave apply(HostAttributes attributes) {
+          return new Slave(attributes);
+        }
+      };
+
+  /**
+   * Fetches the listing of known slaves.
+   *
+   * @return HTTP response.
+   */
+  @GET
+  @Produces(MediaType.TEXT_HTML)
+  public Response get() {
+    return fillTemplate(new Closure<StringTemplate>() {
+      @Override public void execute(StringTemplate template) {
+        template.setAttribute("cluster_name", clusterName);
+
+        template.setAttribute("slaves",
+            FluentIterable.from(getHostAttributes()).transform(TO_SLAVE).toList());
+      }
+    });
+  }
+
+  private static final Ordering<Attribute> ATTR_ORDER = Ordering.natural().onResultOf(
+      new Function<Attribute, String>() {
+        @Override public String apply(Attribute attr) {
+          return attr .getName();
+        }
+      });
+
+  /**
+   * Template object to represent a slave.
+   */
+  private static class Slave {
+    private final HostAttributes attributes;
+
+    Slave(HostAttributes attributes) {
+      this.attributes = attributes;
+    }
+
+    public String getHost() {
+      return attributes.getHost();
+    }
+
+    public String getId() {
+      return attributes.getSlaveId();
+    }
+
+    public MaintenanceMode getMode() {
+      return attributes.getMode();
+    }
+
+    private static final Function<Attribute, String> ATTR_TO_STRING =
+        new Function<Attribute, String>() {
+          @Override public String apply(Attribute attr) {
+            return attr.getName() + "=[" + Joiner.on(",").join(attr.getValues()) + "]";
+          }
+        };
+
+    public String getAttributes() {
+      return Joiner.on(", ").join(
+          Iterables.transform(ATTR_ORDER.sortedCopy(attributes.getAttributes()), ATTR_TO_STRING));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/StructDump.java b/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
new file mode 100644
index 0000000..8da8b80
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/http/StructDump.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.http;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+import org.antlr.stringtemplate.StringTemplate;
+import org.apache.thrift.TBase;
+
+import com.twitter.aurora.gen.JobConfiguration;
+import com.twitter.aurora.scheduler.base.JobKeys;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.state.CronJobManager;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.aurora.scheduler.storage.Storage.Work.Quiet;
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IJobKey;
+import com.twitter.common.base.Closure;
+import com.twitter.common.thrift.Util;
+
+/**
+ * Servlet that prints out the raw configuration for a specified struct.
+ */
+@Path("/structdump")
+public class StructDump extends JerseyTemplateServlet {
+
+  private final Storage storage;
+
+  @Inject
+  public StructDump(Storage storage) {
+    super("structdump");
+    this.storage = Preconditions.checkNotNull(storage);
+  }
+
+  @GET
+  @Produces(MediaType.TEXT_HTML)
+  public Response getUsage() {
+    return Response
+        .status(Status.BAD_REQUEST)
+        .entity("<html>Usage: /structdump/{task_id} or /structdump/job/{role}/{env}/{job}</html>")
+        .build();
+  }
+
+  /**
+   * Dumps a task struct.
+   *
+   * @return HTTP response.
+   */
+  @GET
+  @Path("/task/{task}")
+  @Produces(MediaType.TEXT_HTML)
+  public Response dumpJob(
+      @PathParam("task") final String taskId) {
+
+    return dumpEntity("Task " + taskId, new Work.Quiet<Optional<? extends TBase<?, ?>>>() {
+      @Override public Optional<? extends TBase<?, ?>> apply(StoreProvider storeProvider) {
+        // Deep copy the struct to sidestep any subclass trickery inside the storage system.
+        return Optional.fromNullable(Iterables.getOnlyElement(
+                storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)),
+                null)
+            .newBuilder());
+      }
+    });
+  }
+
+  /**
+   * Dumps a cron job struct.
+   *
+   * @return HTTP response.
+   */
+  @GET
+  @Path("/cron/{role}/{environment}/{job}")
+  @Produces(MediaType.TEXT_HTML)
+  public Response dump(
+      @PathParam("role") final String role,
+      @PathParam("environment") final String environment,
+      @PathParam("job") final String job) {
+
+    final IJobKey jobKey = JobKeys.from(role, environment, job);
+    return dumpEntity("Cron job " + JobKeys.toPath(jobKey),
+        new Work.Quiet<Optional<? extends TBase<?, ?>>>() {
+          @Override public Optional<JobConfiguration> apply(StoreProvider storeProvider) {
+            return storeProvider.getJobStore().fetchJob(CronJobManager.MANAGER_KEY, jobKey)
+                .transform(IJobConfiguration.TO_BUILDER);
+          }
+        });
+  }
+
+  private Response dumpEntity(final String id, final Quiet<Optional<? extends TBase<?, ?>>> work) {
+    return fillTemplate(new Closure<StringTemplate>() {
+      @Override public void execute(StringTemplate template) {
+        template.setAttribute("id", id);
+        Optional<? extends TBase<?, ?>> struct = storage.weaklyConsistentRead(work);
+        if (!struct.isPresent()) {
+          template.setAttribute("exception", "Entity not found");
+        } else {
+          template.setAttribute("structPretty", Util.prettyPrint(struct.get()));
+        }
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/http/TransformationUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/TransformationUtils.java b/src/main/java/org/apache/aurora/scheduler/http/TransformationUtils.java
new file mode 100644
index 0000000..2f531a7
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/http/TransformationUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.http;
+
+import java.util.Collection;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Range;
+
+import com.twitter.aurora.scheduler.base.Numbers;
+import com.twitter.aurora.scheduler.storage.entities.IPackage;
+
+/**
+ * Utility class to hold common object to string transformation helper functions.
+ */
+final class TransformationUtils {
+  public static final Function<IPackage, String> PACKAGE_TOSTRING =
+      new Function<IPackage, String>() {
+        @Override public String apply(IPackage pkg) {
+          return pkg.getRole() + "/" + pkg.getName() + " v" + pkg.getVersion();
+        }
+      };
+
+  public static final Function<Range<Integer>, String> RANGE_TOSTRING =
+      new Function<Range<Integer>, String>() {
+        @Override public String apply(Range<Integer> range) {
+          int lower = range.lowerEndpoint();
+          int upper = range.upperEndpoint();
+          return (lower == upper) ? String.valueOf(lower) : (lower + " - " + upper);
+        }
+      };
+
+  public static final Function<Collection<Integer>, String> INSTANCES_TOSTRING =
+      new Function<Collection<Integer>, String>() {
+        @Override public String apply(Collection<Integer> instances) {
+          return Joiner.on(", ")
+              .join(Iterables.transform(Numbers.toRanges(instances), RANGE_TOSTRING));
+        }
+      };
+
+  private TransformationUtils() {
+    // Utility class
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/http/Utilization.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Utilization.java b/src/main/java/org/apache/aurora/scheduler/http/Utilization.java
new file mode 100644
index 0000000..60a7d95
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/http/Utilization.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.http;
+
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.FluentIterable;
+
+import org.antlr.stringtemplate.StringTemplate;
+
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.stats.ResourceCounter;
+import com.twitter.aurora.scheduler.stats.ResourceCounter.GlobalMetric;
+import com.twitter.aurora.scheduler.stats.ResourceCounter.Metric;
+import com.twitter.aurora.scheduler.stats.ResourceCounter.MetricType;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+import com.twitter.common.base.Closure;
+import com.twitter.common.base.MorePreconditions;
+import com.twitter.common.util.templating.StringTemplateHelper;
+import com.twitter.common.util.templating.StringTemplateHelper.TemplateException;
+
+/**
+ * A servlet to give an aggregate view of cluster resources consumed, grouped by category.
+ */
+@Path("/utilization")
+public class Utilization {
+
+  private final String clusterName;
+  private final ResourceCounter counter;
+  private final StringTemplateHelper templateHelper;
+
+  @Inject
+  Utilization(ResourceCounter counter, @ClusterName String clusterName) {
+    templateHelper = new StringTemplateHelper(getClass(), "utilization", true);
+    this.counter = Preconditions.checkNotNull(counter);
+    this.clusterName = MorePreconditions.checkNotBlank(clusterName);
+  }
+
+  private String fillTemplate(Map<Display, Metric> metrics) {
+    Function<Entry<Display, Metric>, DisplayMetric> transform =
+        new Function<Entry<Display, Metric>, DisplayMetric>() {
+          @Override public DisplayMetric apply(Entry<Display, Metric> entry) {
+            return new DisplayMetric(entry.getKey(), entry.getValue());
+          }
+        };
+    return fillTemplate(FluentIterable.from(metrics.entrySet()).transform(transform).toList());
+  }
+
+  private String fillTemplate(final Iterable<DisplayMetric> metrics) {
+    StringWriter output = new StringWriter();
+    try {
+      templateHelper.writeTemplate(output, new Closure<StringTemplate>() {
+        @Override public void execute(StringTemplate template) {
+          template.setAttribute("cluster_name", clusterName);
+          template.setAttribute("metrics", metrics);
+        }
+      });
+    } catch (TemplateException e) {
+      throw new WebApplicationException(e);
+    }
+    return output.toString();
+  }
+
+  private static class Display {
+    private final String title;
+    @Nullable
+    private final String link;
+
+    Display(String title, @Nullable String link) {
+      this.title = title;
+      this.link = link;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(title, link);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof  Display)) {
+        return false;
+      }
+
+      Display other = (Display) o;
+      return Objects.equal(title, other.title) && Objects.equal(link, other.link);
+    }
+  }
+
+  private static class DisplayMetric extends Metric {
+    private final Display display;
+
+    DisplayMetric(Display display, Metric wrapped) {
+      super(wrapped);
+      this.display = display;
+    }
+
+    public String getTitle() {
+      return display.title;
+    }
+
+    @Nullable
+    public String getLink() {
+      return display.link;
+    }
+  }
+
+  private static final Function<GlobalMetric, DisplayMetric> TO_DISPLAY =
+      new Function<GlobalMetric, DisplayMetric>() {
+        @Override public DisplayMetric apply(GlobalMetric count) {
+          return new DisplayMetric(
+              new Display(
+                  count.type.name().replace('_', ' ').toLowerCase(),
+                  count.type.name().toLowerCase()),
+              count);
+        }
+      };
+
+  /**
+   * Displays the aggregate utilization for the entire cluster.
+   *
+   * @return HTML-formatted cluster utilization.
+   */
+  @GET
+  @Produces(MediaType.TEXT_HTML)
+  public Response aggregateCluster() {
+    Iterable<DisplayMetric> metrics =
+        FluentIterable.from(counter.computeConsumptionTotals()).transform(TO_DISPLAY).toList();
+    return Response.ok(fillTemplate(metrics)).build();
+  }
+
+  private MetricType getTypeByName(String name) throws WebApplicationException {
+    MetricType type = MetricType.valueOf(name.toUpperCase());
+    if (type == null) {
+      throw new WebApplicationException(
+          Response.status(Status.BAD_REQUEST).entity("Invalid metric type.").build());
+    }
+    return type;
+  }
+
+  /**
+   * Displays the aggregate utilization for roles within a metric type.
+   *
+   * @param metric Metric id.
+   * @return HTML-formatted utilization within the metric type.
+   */
+  @GET
+  @Path("/{metric}")
+  @Produces(MediaType.TEXT_HTML)
+  public Response aggregateRoles(@PathParam("metric") final String metric) {
+    final MetricType type = getTypeByName(metric);
+
+    Function<ITaskConfig, Display> toKey = new Function<ITaskConfig, Display>() {
+      @Override public Display apply(ITaskConfig task) {
+        String role = task.getOwner().getRole();
+        return new Display(role, metric + "/" + role);
+      }
+    };
+    Map<Display, Metric> byRole =
+        counter.computeAggregates(Query.unscoped().active(), type.filter, toKey);
+    return Response.ok(fillTemplate(byRole)).build();
+  }
+
+  /**
+   * Displays the aggregate utilization for jobs within a role.
+   *
+   * @param metric Metric id.
+   * @param role Role for jobs to aggregate.
+   * @return HTML-formatted utilization within the metric/role.
+   */
+  @GET
+  @Path("/{metric}/{role}")
+  @Produces(MediaType.TEXT_HTML)
+  public Response aggregateJobs(
+      @PathParam("metric") String metric,
+      @PathParam("role") String role) {
+
+    MetricType type = getTypeByName(metric);
+    Function<ITaskConfig, Display> toKey = new Function<ITaskConfig, Display>() {
+      @Override public Display apply(ITaskConfig task) {
+        return new Display(task.getJobName(), null);
+      }
+    };
+    Map<Display, Metric> byJob =
+        counter.computeAggregates(Query.roleScoped(role).active(), type.filter, toKey);
+    return Response.ok(fillTemplate(byJob)).build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/local/FakeDriverFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/local/FakeDriverFactory.java b/src/main/java/org/apache/aurora/scheduler/local/FakeDriverFactory.java
new file mode 100644
index 0000000..ae5062e
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/local/FakeDriverFactory.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.local;
+
+import java.util.Collection;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import javax.inject.Provider;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.Filters;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.MasterInfo;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.Request;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.Status;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskStatus;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import com.twitter.aurora.scheduler.DriverFactory;
+import com.twitter.common.application.Lifecycle;
+
+/**
+ * A factory for fake scheduler driver instances.
+ */
+class FakeDriverFactory implements DriverFactory {
+
+  private final Provider<Scheduler> scheduler;
+  private final Lifecycle lifecycle;
+
+  @Inject
+  FakeDriverFactory(Provider<Scheduler> scheduler, Lifecycle lifecycle) {
+    this.scheduler = Preconditions.checkNotNull(scheduler);
+    this.lifecycle = Preconditions.checkNotNull(lifecycle);
+  }
+
+  @Override
+  public SchedulerDriver apply(@Nullable final String frameworkId) {
+    return new FakeSchedulerDriver() {
+      @Override public Status run() {
+        scheduler.get().registered(
+            this,
+            FrameworkID.newBuilder().setValue(
+                Optional.fromNullable(frameworkId).or("new-framework-id")).build(),
+            MasterInfo.newBuilder().setId("master-id").setIp(100).setPort(200).build());
+        lifecycle.awaitShutdown();
+        return null;
+      }
+    };
+  }
+
+  static class FakeSchedulerDriver implements SchedulerDriver {
+    @Override public Status start() {
+      return null;
+    }
+
+    @Override public Status stop(boolean b) {
+      return null;
+    }
+
+    @Override public Status stop() {
+      return null;
+    }
+
+    @Override public Status abort() {
+      return null;
+    }
+
+    @Override public Status join() {
+      return run();
+    }
+
+    @Override public Status run() {
+      return null;
+    }
+
+    @Override public Status requestResources(Collection<Request> requests) {
+      return null;
+    }
+
+    @Override public Status launchTasks(OfferID offerID, Collection<TaskInfo> taskInfos,
+        Filters filters) {
+      return null;
+    }
+
+    @Override public Status launchTasks(OfferID offerID, Collection<TaskInfo> taskInfos) {
+      return null;
+    }
+
+    @Override public Status killTask(TaskID taskID) {
+      return null;
+    }
+
+    @Override public Status declineOffer(OfferID offerID, Filters filters) {
+      return null;
+    }
+
+    @Override public Status declineOffer(OfferID offerID) {
+      return null;
+    }
+
+    @Override public Status reviveOffers() {
+      return null;
+    }
+
+    @Override public Status sendFrameworkMessage(ExecutorID executorID, SlaveID slaveID,
+        byte[] bytes) {
+      return null;
+    }
+
+    @Override
+    public Status reconcileTasks(Collection<TaskStatus> statuses) {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/local/IsolatedSchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/local/IsolatedSchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/local/IsolatedSchedulerModule.java
new file mode 100644
index 0000000..c07359e
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/local/IsolatedSchedulerModule.java
@@ -0,0 +1,330 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.local;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import javax.inject.Provider;
+import javax.inject.Singleton;
+
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.AbstractModule;
+
+import org.apache.mesos.Protos.Attribute;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.Resource;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskState;
+import org.apache.mesos.Protos.TaskStatus;
+import org.apache.mesos.Protos.Value.Scalar;
+import org.apache.mesos.Protos.Value.Text;
+import org.apache.mesos.Protos.Value.Type;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+import org.apache.thrift.TException;
+
+import com.twitter.aurora.gen.AuroraAdmin;
+import com.twitter.aurora.gen.ExecutorConfig;
+import com.twitter.aurora.gen.Identity;
+import com.twitter.aurora.gen.JobConfiguration;
+import com.twitter.aurora.gen.Package;
+import com.twitter.aurora.gen.Quota;
+import com.twitter.aurora.gen.Response;
+import com.twitter.aurora.gen.SessionKey;
+import com.twitter.aurora.gen.TaskConfig;
+import com.twitter.aurora.scheduler.DriverFactory;
+import com.twitter.aurora.scheduler.base.JobKeys;
+import com.twitter.aurora.scheduler.configuration.ConfigurationManager;
+import com.twitter.aurora.scheduler.configuration.Resources;
+import com.twitter.aurora.scheduler.events.PubsubEvent.DriverRegistered;
+import com.twitter.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import com.twitter.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import com.twitter.aurora.scheduler.events.PubsubEventModule;
+import com.twitter.aurora.scheduler.local.FakeDriverFactory.FakeSchedulerDriver;
+import com.twitter.aurora.scheduler.log.testing.FileLogStreamModule;
+import com.twitter.common.application.ShutdownRegistry;
+import com.twitter.common.base.Command;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
+
+/**
+ * A module that binds a fake mesos driver factory and a local (non-replicated) storage system.
+ * <p>
+ * The easiest way to run the scheduler in local/isolated mode is by executing:
+ * <pre>
+ * $ ./pants goal bundle aurora:scheduler-local && ./aurora/scripts/scheduler.sh -c local
+ * </pre>
+ */
+public class IsolatedSchedulerModule extends AbstractModule {
+
+  private static final Logger LOG = Logger.getLogger(IsolatedSchedulerModule.class.getName());
+
+  @Override
+  protected void configure() {
+    bind(DriverFactory.class).to(FakeDriverFactory.class);
+    bind(FakeDriverFactory.class).in(Singleton.class);
+    PubsubEventModule.bindSubscriber(binder(), FakeClusterRunner.class);
+    install(new FileLogStreamModule());
+  }
+
+  static class FakeClusterRunner implements EventSubscriber {
+    private final FrameworkID frameworkId =
+        FrameworkID.newBuilder().setValue("framework-id").build();
+    private final List<FakeSlave> cluster = ImmutableList.of(
+        new FakeSlave(frameworkId, "fake-host1", "rack1", "slave-id1"),
+        new FakeSlave(frameworkId, "fake-host2", "rack2", "slave-id2")
+    );
+
+    private final AtomicLong offerId = new AtomicLong();
+    private final Function<FakeSlave, Offer> slaveToOffer = new Function<FakeSlave, Offer>() {
+      @Override public Offer apply(FakeSlave slave) {
+        return slave.makeOffer(offerId.incrementAndGet());
+      }
+    };
+
+    private final Provider<Scheduler> scheduler;
+    private final AuroraAdmin.Iface thrift;
+    private final ScheduledExecutorService executor;
+    private final SchedulerDriver driver;
+
+    @Inject
+    FakeClusterRunner(
+        Provider<Scheduler> scheduler,
+        AuroraAdmin.Iface thrift,
+        ShutdownRegistry shutdownRegistry) {
+
+      this.scheduler = scheduler;
+      this.thrift = thrift;
+      this.executor = createThreadPool(shutdownRegistry);
+      this.driver = new FakeSchedulerDriver();
+    }
+
+    private static ScheduledExecutorService createThreadPool(ShutdownRegistry shutdownRegistry) {
+      final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
+          1,
+          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TaskScheduler-%d").build()) {
+
+        @Override protected void afterExecute(Runnable runnable, @Nullable Throwable throwable) {
+          if (throwable != null) {
+            LOG.log(Level.WARNING, "Error: " + throwable, throwable);
+          } else if (runnable instanceof Future) {
+            Future<?> future = (Future<?>) runnable;
+            try {
+              future.get();
+            } catch (InterruptedException | ExecutionException e) {
+              e.printStackTrace();
+            }
+          }
+        }
+      };
+      Stats.exportSize("schedule_queue_size", executor.getQueue());
+      shutdownRegistry.addAction(new Command() {
+        @Override public void execute() {
+          new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute();
+        }
+      });
+      return executor;
+    }
+
+    private void offerClusterResources() {
+      executor.submit(new Runnable() {
+        @Override public void run() {
+          scheduler.get().resourceOffers(
+              driver,
+              FluentIterable.from(cluster).transform(slaveToOffer).toList());
+        }
+      });
+    }
+
+    private void setQuotas() {
+      executor.submit(new Runnable() {
+        @Override public void run() {
+          try {
+            thrift.setQuota(
+                "mesos",
+                new Quota(2.0 * 1000000, 100000000, 100000000),
+                new SessionKey());
+          } catch (TException e) {
+            throw Throwables.propagate(e);
+          }
+        }
+      });
+    }
+
+    @Subscribe
+    public void registered(DriverRegistered event) {
+      executor.submit(new Runnable() {
+        @Override public void run() {
+          Identity mesosUser = new Identity("mesos", "mesos");
+          for (int i = 0; i < 20; i++) {
+            JobConfiguration service = createJob("serviceJob" + i, mesosUser);
+            service.getTaskConfig().setProduction((i % 2) == 0);
+            service.getTaskConfig().setIsService(true);
+            submitJob(service);
+          }
+
+          for (int i = 0; i < 20; i++) {
+            JobConfiguration adhocJob = createJob("adhocJob" + i, mesosUser);
+            adhocJob.getTaskConfig().setProduction((i % 2) == 0);
+            adhocJob.getTaskConfig();
+            submitJob(adhocJob);
+          }
+
+          for (int i = 0; i < 20; i++) {
+            JobConfiguration cron = createJob("cronJob" + i, mesosUser);
+            cron.getTaskConfig().setProduction((i % 2) == 0);
+            cron.setCronSchedule("* * * * *");
+            submitJob(cron);
+          }
+        }
+      });
+
+      setQuotas();
+      offerClusterResources();
+      // Send the offers again, since the first batch of offers will be consumed by GC executors.
+      offerClusterResources();
+    }
+
+    private void moveTaskToState(final String taskId, final TaskState state, long delaySeconds) {
+      Runnable changeState = new Runnable() {
+        @Override public void run() {
+          scheduler.get().statusUpdate(
+              driver,
+              TaskStatus.newBuilder()
+                  .setTaskId(TaskID.newBuilder().setValue(taskId))
+                  .setState(state)
+                  .build());
+        }
+      };
+      executor.schedule(changeState, delaySeconds, TimeUnit.SECONDS);
+    }
+
+    @Subscribe
+    public void stateChanged(TaskStateChange stateChange) {
+      String taskId = stateChange.getTaskId();
+      switch (stateChange.getNewState()) {
+        case ASSIGNED:
+          moveTaskToState(taskId, TaskState.TASK_STARTING, 1);
+          break;
+
+        case STARTING:
+          moveTaskToState(taskId, TaskState.TASK_RUNNING, 1);
+          break;
+
+        case RUNNING:
+          // Let the task finish some time randomly in the next 5 minutes.
+          moveTaskToState(taskId, TaskState.TASK_FINISHED, (long) (Math.random() * 300));
+          break;
+
+        case FINISHED:
+          offerClusterResources();
+          break;
+
+        default:
+          break;
+      }
+    }
+
+    private JobConfiguration createJob(String jobName, Identity owner) {
+      return new JobConfiguration()
+          .setKey(JobKeys.from(owner.getRole(), "test", jobName).newBuilder())
+          .setOwner(owner)
+          .setInstanceCount(5)
+          .setTaskConfig(new TaskConfig()
+              .setOwner(owner)
+              .setJobName(jobName)
+              .setEnvironment("test")
+              .setNumCpus(1.0)
+              .setDiskMb(1024)
+              .setRamMb(1024)
+              .setPackages(ImmutableSet.of(new Package(owner.getRole(), "package", 15)))
+              .setExecutorConfig(new ExecutorConfig("aurora", "opaque")));
+    }
+
+    private void submitJob(JobConfiguration job) {
+      Response response;
+      try {
+        response = thrift.createJob(job, null, new SessionKey());
+      } catch (TException e) {
+        throw Throwables.propagate(e);
+      }
+      LOG.info("Create job response: " + response);
+    }
+  }
+
+  private static class FakeSlave {
+    private final FrameworkID framework;
+    private final String host;
+    private final String rack;
+    private final String slaveId;
+
+    FakeSlave(FrameworkID framework, String host, String rack, String slaveId) {
+      this.framework = framework;
+      this.host = host;
+      this.rack = rack;
+      this.slaveId = slaveId;
+    }
+
+    private static Resource.Builder scalar(String name, double value) {
+      return Resource.newBuilder()
+          .setName(name)
+          .setType(Type.SCALAR)
+          .setScalar(Scalar.newBuilder().setValue(value));
+    }
+
+    private static Attribute.Builder attribute(String name, String value) {
+      return Attribute.newBuilder()
+          .setName(name)
+          .setType(Type.TEXT)
+          .setText(Text.newBuilder().setValue(value));
+    }
+
+    Offer makeOffer(long offerId) {
+      return Offer.newBuilder()
+          .setId(OfferID.newBuilder().setValue("offer" + offerId))
+          .setFrameworkId(framework)
+          .setSlaveId(SlaveID.newBuilder().setValue(slaveId))
+          .setHostname(host)
+          .addResources(scalar(Resources.CPUS, 16))
+          .addResources(scalar(Resources.RAM_MB, 24576))
+          .addResources(scalar(Resources.DISK_MB, 102400))
+          .addAttributes(attribute(ConfigurationManager.RACK_CONSTRAINT, rack))
+          .addAttributes(attribute(ConfigurationManager.HOST_CONSTRAINT, host))
+          .build();
+    }
+  }
+}


Mime
View raw message