aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [3/5] aurora git commit: Use lambdas throughout the project.
Date Tue, 08 Dec 2015 18:27:36 GMT
http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
index 1b03f47..9ca5381 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java
@@ -20,7 +20,6 @@ import java.util.logging.Logger;
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.protobuf.ByteString;
@@ -35,7 +34,6 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IDockerContainer;
-import org.apache.aurora.scheduler.storage.entities.IDockerParameter;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.mesos.Protos;
@@ -156,12 +154,8 @@ public interface MesosTaskFactory {
 
       IDockerContainer config = taskConfig.getContainer().getDocker();
       Iterable<Protos.Parameter> parameters = Iterables.transform(config.getParameters(),
-          new Function<IDockerParameter, Protos.Parameter>() {
-            @Override public Protos.Parameter apply(IDockerParameter item) {
-              return Protos.Parameter.newBuilder().setKey(item.getName())
-                .setValue(item.getValue()).build();
-            }
-          });
+          item -> Protos.Parameter.newBuilder().setKey(item.getName())
+            .setValue(item.getValue()).build());
 
       ContainerInfo.DockerInfo.Builder dockerBuilder = ContainerInfo.DockerInfo.newBuilder()
           .setImage(config.getImage()).addAllParameters(parameters);

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
index 35c30f1..1b1443d 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java
@@ -70,12 +70,7 @@ class SchedulerDriverService extends AbstractIdleService implements Driver {
   @Override
   protected void startUp() {
     Optional<String> frameworkId = storage.read(
-        new Storage.Work.Quiet<Optional<String>>() {
-          @Override
-          public Optional<String> apply(Storage.StoreProvider storeProvider) {
-            return storeProvider.getSchedulerStore().fetchFrameworkId();
-          }
-        });
+        storeProvider -> storeProvider.getSchedulerStore().fetchFrameworkId());
 
     LOG.info("Connecting to mesos master: " + driverSettings.getMasterUri());
     if (!driverSettings.getCredentials().isPresent()) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java b/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
index 8788f4d..f783e7f 100644
--- a/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
+++ b/src/main/java/org/apache/aurora/scheduler/metadata/NearestFit.java
@@ -19,7 +19,6 @@ import java.util.Set;
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Functions;
 import com.google.common.base.Ticker;
 import com.google.common.cache.CacheBuilder;
@@ -39,7 +38,6 @@ import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
 import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
 import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
 /**
  * Tracks vetoes against scheduling decisions and maintains the closest fit among all the vetoes
@@ -92,12 +90,7 @@ public class NearestFit implements EventSubscriber {
   @Subscribe
   public synchronized void remove(TasksDeleted deletedEvent) {
     fitByGroupKey.invalidateAll(Iterables.transform(deletedEvent.getTasks(), Functions.compose(
-        new Function<ITaskConfig, TaskGroupKey>() {
-          @Override
-          public TaskGroupKey apply(ITaskConfig task) {
-            return TaskGroupKey.from(task);
-          }
-        },
+        TaskGroupKey::from,
         Tasks::getConfig)));
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
index 88c9f66..aa22473 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
@@ -190,12 +190,7 @@ public interface OfferManager extends EventSubscriber {
       } else {
         hostOffers.add(offer);
         executor.execute(
-            new Runnable() {
-              @Override
-              public void run() {
-                removeAndDecline(offer.getOffer().getId());
-              }
-            },
+            () -> removeAndDecline(offer.getOffer().getId()),
             returnDelay.get());
       }
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java b/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java
index ffc109e..70390f6 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java
@@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit;
 import javax.inject.Inject;
 
 import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
 import com.google.common.base.Ticker;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
@@ -85,12 +84,7 @@ public class BiCache<K, V> {
 
     statsProvider.makeGauge(
         settings.cacheSizeStatName,
-        new Supplier<Long>() {
-          @Override
-          public Long get() {
-            return cache.size();
-          }
-        });
+        cache::size);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
index 5061767..5687bc5 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
@@ -116,69 +116,66 @@ public class PendingTaskProcessor implements Runnable {
   @Override
   public void run() {
     metrics.recordTaskProcessorRun();
-    storage.read(new Storage.Work.Quiet<Void>() {
-      @Override
-      public Void apply(StoreProvider store) {
-        Multimap<String, PreemptionVictim> slavesToActiveTasks =
-            clusterState.getSlavesToActiveTasks();
+    storage.read(store -> {
+      Multimap<String, PreemptionVictim> slavesToActiveTasks =
+          clusterState.getSlavesToActiveTasks();
 
-        if (slavesToActiveTasks.isEmpty()) {
-          // No preemption victims to consider.
-          return null;
-        }
-
-        // Group the offers by slave id so they can be paired with active tasks from the same slave.
-        Map<String, HostOffer> slavesToOffers =
-            Maps.uniqueIndex(offerManager.getOffers(), OFFER_TO_SLAVE_ID);
-
-        Set<String> allSlaves = Sets.newHashSet(Iterables.concat(
-            slavesToOffers.keySet(),
-            slavesToActiveTasks.keySet()));
-
-        // The algorithm below attempts to find a reservation for every task group by matching
-        // it against all available slaves until a preemption slot is found. Groups are evaluated
-        // in a round-robin fashion to ensure fairness (e.g.: G1, G2, G3, G1, G2).
-        // A slave is removed from further matching once a reservation is made. Similarly, all
-        // identical task group instances are removed from further iteration if none of the
-        // available slaves could yield a preemption proposal. A consuming iterator is used for
-        // task groups to ensure iteration order is preserved after a task group is removed.
-        LoadingCache<IJobKey, AttributeAggregate> jobStates = attributeCache(store);
-        List<TaskGroupKey> pendingGroups = fetchIdlePendingGroups(store);
-        Iterator<TaskGroupKey> groups = Iterators.consumingIterator(pendingGroups.iterator());
-        while (!pendingGroups.isEmpty()) {
-          boolean matched = false;
-          TaskGroupKey group = groups.next();
-          ITaskConfig task = group.getTask();
-
-          metrics.recordPreemptionAttemptFor(task);
-          Iterator<String> slaveIterator = allSlaves.iterator();
-          while (slaveIterator.hasNext()) {
-            String slaveId = slaveIterator.next();
-            Optional<ImmutableSet<PreemptionVictim>> candidates =
-                preemptionVictimFilter.filterPreemptionVictims(
-                    task,
-                    slavesToActiveTasks.get(slaveId),
-                    jobStates.getUnchecked(task.getJob()),
-                    Optional.fromNullable(slavesToOffers.get(slaveId)),
-                    store);
+      if (slavesToActiveTasks.isEmpty()) {
+        // No preemption victims to consider.
+        return null;
+      }
 
-            metrics.recordSlotSearchResult(candidates, task);
-            if (candidates.isPresent()) {
-              // Slot found -> remove slave to avoid multiple task reservations.
-              slaveIterator.remove();
-              slotCache.put(new PreemptionProposal(candidates.get(), slaveId), group);
-              matched = true;
-              break;
-            }
-          }
-          if (!matched) {
-            // No slot found for the group -> remove group and reset group iterator.
-            pendingGroups.removeAll(ImmutableSet.of(group));
-            groups = Iterators.consumingIterator(pendingGroups.iterator());
+      // Group the offers by slave id so they can be paired with active tasks from the same slave.
+      Map<String, HostOffer> slavesToOffers =
+          Maps.uniqueIndex(offerManager.getOffers(), OFFER_TO_SLAVE_ID);
+
+      Set<String> allSlaves = Sets.newHashSet(Iterables.concat(
+          slavesToOffers.keySet(),
+          slavesToActiveTasks.keySet()));
+
+      // The algorithm below attempts to find a reservation for every task group by matching
+      // it against all available slaves until a preemption slot is found. Groups are evaluated
+      // in a round-robin fashion to ensure fairness (e.g.: G1, G2, G3, G1, G2).
+      // A slave is removed from further matching once a reservation is made. Similarly, all
+      // identical task group instances are removed from further iteration if none of the
+      // available slaves could yield a preemption proposal. A consuming iterator is used for
+      // task groups to ensure iteration order is preserved after a task group is removed.
+      LoadingCache<IJobKey, AttributeAggregate> jobStates = attributeCache(store);
+      List<TaskGroupKey> pendingGroups = fetchIdlePendingGroups(store);
+      Iterator<TaskGroupKey> groups = Iterators.consumingIterator(pendingGroups.iterator());
+      while (!pendingGroups.isEmpty()) {
+        boolean matched = false;
+        TaskGroupKey group = groups.next();
+        ITaskConfig task = group.getTask();
+
+        metrics.recordPreemptionAttemptFor(task);
+        Iterator<String> slaveIterator = allSlaves.iterator();
+        while (slaveIterator.hasNext()) {
+          String slaveId = slaveIterator.next();
+          Optional<ImmutableSet<PreemptionVictim>> candidates =
+              preemptionVictimFilter.filterPreemptionVictims(
+                  task,
+                  slavesToActiveTasks.get(slaveId),
+                  jobStates.getUnchecked(task.getJob()),
+                  Optional.fromNullable(slavesToOffers.get(slaveId)),
+                  store);
+
+          metrics.recordSlotSearchResult(candidates, task);
+          if (candidates.isPresent()) {
+            // Slot found -> remove slave to avoid multiple task reservations.
+            slaveIterator.remove();
+            slotCache.put(new PreemptionProposal(candidates.get(), slaveId), group);
+            matched = true;
+            break;
           }
         }
-        return null;
+        if (!matched) {
+          // No slot found for the group -> remove group and reset group iterator.
+          pendingGroups.removeAll(ImmutableSet.of(group));
+          groups = Iterators.consumingIterator(pendingGroups.iterator());
+        }
       }
+      return null;
     });
   }
 
@@ -225,12 +222,7 @@ public class PendingTaskProcessor implements Runnable {
   }
 
   private static final Function<IAssignedTask, TaskGroupKey> ASSIGNED_TO_GROUP_KEY =
-      new Function<IAssignedTask, TaskGroupKey>() {
-        @Override
-        public TaskGroupKey apply(IAssignedTask task) {
-          return TaskGroupKey.from(task.getTask());
-        }
-      };
+      task -> TaskGroupKey.from(task.getTask());
 
   private final Predicate<IScheduledTask> hasCachedSlot = new Predicate<IScheduledTask>() {
     @Override
@@ -248,10 +240,5 @@ public class PendingTaskProcessor implements Runnable {
   };
 
   private static final Function<HostOffer, String> OFFER_TO_SLAVE_ID =
-      new Function<HostOffer, String>() {
-        @Override
-        public String apply(HostOffer offer) {
-          return offer.getOffer().getSlaveId().getValue();
-        }
-      };
+      offer -> offer.getOffer().getSlaveId().getValue();
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
index edfa202..7f84e90 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
@@ -97,28 +97,13 @@ public interface PreemptionVictimFilter {
     }
 
     private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
-        new Function<HostOffer, ResourceSlot>() {
-          @Override
-          public ResourceSlot apply(HostOffer offer) {
-            return Resources.from(offer.getOffer()).filter(Resources.NON_REVOCABLE).slot();
-          }
-        };
+        offer -> Resources.from(offer.getOffer()).filter(Resources.NON_REVOCABLE).slot();
 
     private static final Function<HostOffer, String> OFFER_TO_HOST =
-        new Function<HostOffer, String>() {
-          @Override
-          public String apply(HostOffer offer) {
-            return offer.getOffer().getHostname();
-          }
-        };
+        offer -> offer.getOffer().getHostname();
 
     private static final Function<PreemptionVictim, String> VICTIM_TO_HOST =
-        new Function<PreemptionVictim, String>() {
-          @Override
-          public String apply(PreemptionVictim victim) {
-            return victim.getSlaveHost();
-          }
-        };
+        PreemptionVictim::getSlaveHost;
 
     private final Function<PreemptionVictim, ResourceSlot> victimToResources =
         new Function<PreemptionVictim, ResourceSlot>() {
@@ -200,24 +185,21 @@ public interface PreemptionVictimFilter {
      *     with {@code preemptableTask}.
      */
     private static Predicate<PreemptionVictim> preemptionFilter(final ITaskConfig pendingTask) {
-      return new Predicate<PreemptionVictim>() {
-        @Override
-        public boolean apply(PreemptionVictim possibleVictim) {
-          boolean pendingIsProduction = pendingTask.isProduction();
-          boolean victimIsProduction = possibleVictim.isProduction();
-
-          if (pendingIsProduction && !victimIsProduction) {
-            return true;
-          } else if (pendingIsProduction == victimIsProduction) {
-            // If production flags are equal, preemption is based on priority within the same role.
-            if (pendingTask.getJob().getRole().equals(possibleVictim.getRole())) {
-              return pendingTask.getPriority() > possibleVictim.getPriority();
-            } else {
-              return false;
-            }
+      return possibleVictim -> {
+        boolean pendingIsProduction = pendingTask.isProduction();
+        boolean victimIsProduction = possibleVictim.isProduction();
+
+        if (pendingIsProduction && !victimIsProduction) {
+          return true;
+        } else if (pendingIsProduction == victimIsProduction) {
+          // If production flags are equal, preemption is based on priority within the same role.
+          if (pendingTask.getJob().getRole().equals(possibleVictim.getRole())) {
+            return pendingTask.getPriority() > possibleVictim.getPriority();
           } else {
             return false;
           }
+        } else {
+          return false;
         }
       };
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java
index fc9dac8..d108742 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java
@@ -32,10 +32,7 @@ import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.preemptor.BiCache.BiCacheSettings;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 
 import static java.util.Objects.requireNonNull;
 
@@ -154,14 +151,6 @@ public class PreemptorModule extends AbstractModule {
     }
   }
 
-  private static final Preemptor NULL_PREEMPTOR = new Preemptor() {
-    @Override
-    public Optional<String> attemptPreemptionFor(
-        IAssignedTask task,
-        AttributeAggregate jobState,
-        Storage.MutableStoreProvider storeProvider) {
-
-      return Optional.absent();
-    }
-  };
+  private static final Preemptor NULL_PREEMPTOR =
+      (task, jobState, storeProvider) -> Optional.absent();
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java
index 96393eb..f29b7cf 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java
@@ -28,7 +28,7 @@ import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.util.Clock;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 
 import static java.util.Objects.requireNonNull;
@@ -76,23 +76,15 @@ class JobUpdateHistoryPruner extends AbstractIdleService {
   @Override
   protected void startUp() {
     executor.scheduleAtFixedRate(
-        new Runnable() {
-          @Override
-          public void run() {
-            storage.write(new MutateWork.NoResult.Quiet() {
-              @Override
-              public void execute(MutableStoreProvider storeProvider) {
-                Set<IJobUpdateKey> prunedUpdates = storeProvider.getJobUpdateStore().pruneHistory(
-                    settings.maxUpdatesPerJob,
-                    clock.nowMillis() - settings.maxHistorySize.as(Time.MILLISECONDS));
+        () -> storage.write((NoResult.Quiet) (MutableStoreProvider storeProvider) -> {
+          Set<IJobUpdateKey> prunedUpdates = storeProvider.getJobUpdateStore().pruneHistory(
+              settings.maxUpdatesPerJob,
+              clock.nowMillis() - settings.maxHistorySize.as(Time.MILLISECONDS));
 
-                LOG.info(prunedUpdates.isEmpty()
-                    ? "No job update history to prune."
-                    : "Pruned job update history: " + Joiner.on(",").join(prunedUpdates));
-              }
-            });
-          }
-        },
+          LOG.info(prunedUpdates.isEmpty()
+              ? "No job update history to prune."
+              : "Pruned job update history: " + Joiner.on(",").join(prunedUpdates));
+        }),
         settings.pruneInterval.as(Time.MILLISECONDS),
         settings.pruneInterval.as(Time.MILLISECONDS),
         TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
index bb1fc8b..d1108a3 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
@@ -35,6 +35,7 @@ import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
@@ -122,12 +123,8 @@ public class TaskHistoryPruner implements EventSubscriber {
 
   private void deleteTasks(final Set<String> taskIds) {
     LOG.info("Pruning inactive tasks " + taskIds);
-    storage.write(new Storage.MutateWork.NoResult.Quiet() {
-      @Override
-      public void execute(Storage.MutableStoreProvider storeProvider) {
-        stateManager.deleteTasks(storeProvider, taskIds);
-      }
-    });
+    storage.write(
+        (NoResult.Quiet) storeProvider -> stateManager.deleteTasks(storeProvider, taskIds));
   }
 
   @VisibleForTesting
@@ -142,32 +139,26 @@ public class TaskHistoryPruner implements EventSubscriber {
 
     LOG.fine("Prune task " + taskId + " in " + timeRemaining + " ms.");
     executor.execute(
-        new Runnable() {
-          @Override
-          public void run() {
-            LOG.info("Pruning expired inactive task " + taskId);
-            deleteTasks(ImmutableSet.of(taskId));
-          }
+        () -> {
+          LOG.info("Pruning expired inactive task " + taskId);
+          deleteTasks(ImmutableSet.of(taskId));
         },
         Amount.of(timeRemaining, Time.MILLISECONDS));
 
-    executor.execute(new Runnable() {
-      @Override
-      public void run() {
-        Iterable<IScheduledTask> inactiveTasks =
-            Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey));
-        int numInactiveTasks = Iterables.size(inactiveTasks);
-        int tasksToPrune = numInactiveTasks - settings.perJobHistoryGoal;
-        if (tasksToPrune > 0 && numInactiveTasks > settings.perJobHistoryGoal) {
-          Set<String> toPrune = FluentIterable
-              .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks))
-              .filter(safeToDelete)
-              .limit(tasksToPrune)
-              .transform(Tasks::id)
-              .toSet();
-          if (!toPrune.isEmpty()) {
-            deleteTasks(toPrune);
-          }
+    executor.execute(() -> {
+      Iterable<IScheduledTask> inactiveTasks =
+          Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey));
+      int numInactiveTasks = Iterables.size(inactiveTasks);
+      int tasksToPrune = numInactiveTasks - settings.perJobHistoryGoal;
+      if (tasksToPrune > 0 && numInactiveTasks > settings.perJobHistoryGoal) {
+        Set<String> toPrune = FluentIterable
+            .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks))
+            .filter(safeToDelete)
+            .limit(tasksToPrune)
+            .transform(Tasks::id)
+            .toSet();
+        if (!toPrune.isEmpty()) {
+          deleteTasks(toPrune);
         }
       }
     });

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
index a12910e..c18836a 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
@@ -358,37 +358,29 @@ public interface QuotaManager {
       final Multimap<IJobKey, ITaskConfig> taskConfigsByKey = tasks.index(ITaskConfig::getJob);
       return addAll(Iterables.transform(
           cronTemplates,
-          new Function<IJobConfiguration, IResourceAggregate>() {
-            @Override
-            public IResourceAggregate apply(IJobConfiguration config) {
-              return max(
-                  scale(config.getTaskConfig(), config.getInstanceCount()),
-                  fromTasks(taskConfigsByKey.get(config.getKey())));
-            }
-          }));
+          config -> max(
+              scale(config.getTaskConfig(), config.getInstanceCount()),
+              fromTasks(taskConfigsByKey.get(config.getKey())))));
     }
 
     private static Predicate<IAssignedTask> buildNonUpdatingTasksFilter(
         final Map<IJobKey, IJobUpdateInstructions> roleJobUpdates) {
 
-      return new Predicate<IAssignedTask>() {
-        @Override
-        public boolean apply(IAssignedTask task) {
-          Optional<IJobUpdateInstructions> update = Optional.fromNullable(
-              roleJobUpdates.get(task.getTask().getJob()));
-
-          if (update.isPresent()) {
-            IJobUpdateInstructions instructions = update.get();
-            RangeSet<Integer> initialInstances = getInstanceIds(instructions.getInitialState());
-            RangeSet<Integer> desiredInstances = getInstanceIds(instructions.isSetDesiredState()
-                ? ImmutableSet.of(instructions.getDesiredState())
-                : ImmutableSet.of());
-
-            int instanceId = task.getInstanceId();
-            return !initialInstances.contains(instanceId) && !desiredInstances.contains(instanceId);
-          }
-          return true;
+      return task -> {
+        Optional<IJobUpdateInstructions> update = Optional.fromNullable(
+            roleJobUpdates.get(task.getTask().getJob()));
+
+        if (update.isPresent()) {
+          IJobUpdateInstructions instructions = update.get();
+          RangeSet<Integer> initialInstances = getInstanceIds(instructions.getInitialState());
+          RangeSet<Integer> desiredInstances = getInstanceIds(instructions.isSetDesiredState()
+              ? ImmutableSet.of(instructions.getDesiredState())
+              : ImmutableSet.of());
+
+          int instanceId = task.getInstanceId();
+          return !initialInstances.contains(instanceId) && !desiredInstances.contains(instanceId);
         }
+        return true;
       };
     }
 
@@ -397,12 +389,7 @@ public interface QuotaManager {
         String role) {
 
       Function<IJobUpdateSummary, IJobUpdate> fetchUpdate =
-          new Function<IJobUpdateSummary, IJobUpdate>() {
-            @Override
-            public IJobUpdate apply(IJobUpdateSummary summary) {
-              return jobUpdateStore.fetchJobUpdate(summary.getKey()).get();
-            }
-          };
+          summary -> jobUpdateStore.fetchJobUpdate(summary.getKey()).get();
 
       return Maps.transformValues(
           FluentIterable.from(jobUpdateStore.fetchJobUpdateSummaries(updateQuery(role)))
@@ -419,23 +406,13 @@ public interface QuotaManager {
     }
 
     private static final Function<ITaskConfig, IResourceAggregate> CONFIG_RESOURCES =
-        new Function<ITaskConfig, IResourceAggregate>() {
-          @Override
-          public IResourceAggregate apply(ITaskConfig config) {
-            return IResourceAggregate.build(new ResourceAggregate()
-                .setNumCpus(config.getNumCpus())
-                .setRamMb(config.getRamMb())
-                .setDiskMb(config.getDiskMb()));
-          }
-        };
+        config -> IResourceAggregate.build(new ResourceAggregate()
+            .setNumCpus(config.getNumCpus())
+            .setRamMb(config.getRamMb())
+            .setDiskMb(config.getDiskMb()));
 
     private static final Function<IInstanceTaskConfig, IResourceAggregate> INSTANCE_RESOURCES =
-        new Function<IInstanceTaskConfig, IResourceAggregate>() {
-          @Override
-          public IResourceAggregate apply(IInstanceTaskConfig config) {
-            return scale(config.getTask(), getUpdateInstanceCount(config.getInstances()));
-          }
-        };
+        config -> scale(config.getTask(), getUpdateInstanceCount(config.getInstances()));
 
     private static IResourceAggregate instructionsToResources(
         Iterable<IInstanceTaskConfig> instructions) {
@@ -459,20 +436,17 @@ public interface QuotaManager {
     private static Function<IJobUpdateInstructions, IResourceAggregate> updateResources(
         final Predicate<IInstanceTaskConfig> instanceFilter) {
 
-      return new Function<IJobUpdateInstructions, IResourceAggregate>() {
-        @Override
-        public IResourceAggregate apply(IJobUpdateInstructions instructions) {
-          Iterable<IInstanceTaskConfig> initialState =
-              Iterables.filter(instructions.getInitialState(), instanceFilter);
-          Iterable<IInstanceTaskConfig> desiredState = Iterables.filter(
-              Optional.fromNullable(instructions.getDesiredState()).asSet(),
-              instanceFilter);
-
-          // Calculate result as max(existing, desired) per resource type.
-          return max(
-              instructionsToResources(initialState),
-              instructionsToResources(desiredState));
-        }
+      return instructions -> {
+        Iterable<IInstanceTaskConfig> initialState =
+            Iterables.filter(instructions.getInitialState(), instanceFilter);
+        Iterable<IInstanceTaskConfig> desiredState = Iterables.filter(
+            Optional.fromNullable(instructions.getDesiredState()).asSet(),
+            instanceFilter);
+
+        // Calculate result as max(existing, desired) per resource type.
+        return max(
+            instructionsToResources(initialState),
+            instructionsToResources(desiredState));
       };
     }
 
@@ -518,12 +492,7 @@ public interface QuotaManager {
     }
 
     private static final Function<IJobUpdate, IJobKey> UPDATE_TO_JOB_KEY =
-        new Function<IJobUpdate, IJobKey>() {
-          @Override
-          public IJobKey apply(IJobUpdate input) {
-            return input.getSummary().getKey().getJob();
-          }
-        };
+        input -> input.getSummary().getKey().getJob();
 
     private static int getUpdateInstanceCount(Set<IRange> ranges) {
       int instanceCount = 0;

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
index c797914..cb5c93e 100644
--- a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
@@ -107,19 +107,16 @@ public class TaskReconciler extends AbstractIdleService {
   protected void startUp() {
     // Schedule explicit reconciliation.
     executor.scheduleAtFixedRate(
-        new Runnable() {
-          @Override
-          public void run() {
-            ImmutableSet<Protos.TaskStatus> active = FluentIterable
-                .from(Storage.Util.fetchTasks(
-                    storage,
-                    Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES)))
-                .transform(TASK_TO_PROTO)
-                .toSet();
-
-            driver.reconcileTasks(active);
-            explicitRuns.incrementAndGet();
-          }
+        () -> {
+          ImmutableSet<Protos.TaskStatus> active = FluentIterable
+              .from(Storage.Util.fetchTasks(
+                  storage,
+                  Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES)))
+              .transform(TASK_TO_PROTO)
+              .toSet();
+
+          driver.reconcileTasks(active);
+          explicitRuns.incrementAndGet();
         },
         settings.explicitDelayMinutes,
         settings.explicitInterval.as(MINUTES),
@@ -127,12 +124,9 @@ public class TaskReconciler extends AbstractIdleService {
 
     // Schedule implicit reconciliation.
     executor.scheduleAtFixedRate(
-        new Runnable() {
-          @Override
-          public void run() {
-            driver.reconcileTasks(ImmutableSet.of());
-            implicitRuns.incrementAndGet();
-          }
+        () -> {
+          driver.reconcileTasks(ImmutableSet.of());
+          implicitRuns.incrementAndGet();
         },
         settings.implicitDelayMinutes,
         settings.implicitInterval.as(MINUTES),

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
index bfe094b..7c09f7c 100644
--- a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
@@ -39,8 +39,6 @@ import org.apache.aurora.scheduler.storage.Storage;
 
 import static java.util.Objects.requireNonNull;
 
-import static org.apache.aurora.scheduler.storage.Storage.MutateWork;
-
 /**
  * Observes task transitions and identifies tasks that are 'stuck' in a transient state.  Stuck
  * tasks will be transitioned to the LOST state.
@@ -118,17 +116,12 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber {
         // canceled, but in the event of a state transition race, including transientState
         // prevents an unintended task timeout.
         // Note: This requires LOST transitions trigger Driver.killTask.
-        StateChangeResult result = storage.write(new MutateWork.Quiet<StateChangeResult>() {
-          @Override
-          public StateChangeResult apply(Storage.MutableStoreProvider storeProvider) {
-            return stateManager.changeState(
-                storeProvider,
-                taskId,
-                Optional.of(newState),
-                ScheduleStatus.LOST,
-                TIMEOUT_MESSAGE);
-          }
-        });
+        StateChangeResult result = storage.write(storeProvider -> stateManager.changeState(
+            storeProvider,
+            taskId,
+            Optional.of(newState),
+            ScheduleStatus.LOST,
+            TIMEOUT_MESSAGE));
 
         if (result == StateChangeResult.SUCCESS) {
           LOG.info("Timeout reached for task " + taskId + ":" + taskId);

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
index 66d5a10..c044ebe 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
@@ -100,12 +100,9 @@ public class TaskGroups implements EventSubscriber {
     this.backoff = requireNonNull(settings.taskGroupBackoff);
     this.rescheduleCalculator = requireNonNull(rescheduleCalculator);
 
-    this.taskScheduler = new TaskScheduler() {
-      @Override
-      public boolean schedule(String taskId) {
-        settings.rateLimiter.acquire();
-        return taskScheduler.schedule(taskId);
-      }
+    this.taskScheduler = taskId -> {
+      settings.rateLimiter.acquire();
+      return taskScheduler.schedule(taskId);
     };
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
index f1b11d6..7930c6c 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
@@ -40,7 +40,6 @@ import org.apache.aurora.scheduler.preemptor.Preemptor;
 import org.apache.aurora.scheduler.state.TaskAssigner;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -111,12 +110,7 @@ public interface TaskScheduler extends EventSubscriber {
     public boolean schedule(final String taskId) {
       attemptsFired.incrementAndGet();
       try {
-        return storage.write(new MutateWork.Quiet<Boolean>() {
-          @Override
-          public Boolean apply(MutableStoreProvider store) {
-            return scheduleTask(store, taskId);
-          }
-        });
+        return storage.write(store -> scheduleTask(store, taskId));
       } catch (RuntimeException e) {
         // We catch the generic unchecked exception here to ensure tasks are not abandoned
         // if there is a transient issue resulting in an unchecked exception.

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
index 837bab7..787309a 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
@@ -74,22 +74,17 @@ class TaskThrottler implements EventSubscriber {
       long delayMs = Math.max(0, readyAtMs - clock.nowMillis());
       throttleStats.accumulate(delayMs);
       executor.execute(
-          new Runnable() {
+          () -> storage.write(new Storage.MutateWork.NoResult.Quiet() {
             @Override
-            public void run() {
-              storage.write(new Storage.MutateWork.NoResult.Quiet() {
-                @Override
-                public void execute(Storage.MutableStoreProvider storeProvider) {
-                  stateManager.changeState(
-                      storeProvider,
-                      stateChange.getTaskId(),
-                      Optional.of(THROTTLED),
-                      PENDING,
-                      Optional.absent());
-                }
-              });
+            public void execute(Storage.MutableStoreProvider storeProvider) {
+              stateManager.changeState(
+                  storeProvider,
+                  stateChange.getTaskId(),
+                  Optional.of(THROTTLED),
+                  PENDING,
+                  Optional.absent());
             }
-          },
+          }),
           Amount.of(delayMs, Time.MILLISECONDS));
     }
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java b/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
index 54fa45a..3ddac8b 100644
--- a/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
+++ b/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
@@ -103,12 +103,7 @@ class MetricCalculator implements Runnable {
   }
 
   private static final Predicate<ITaskConfig> IS_SERVICE =
-      new Predicate<ITaskConfig>() {
-        @Override
-        public boolean apply(ITaskConfig task) {
-          return task.isIsService();
-        }
-      };
+      ITaskConfig::isIsService;
 
   private final LoadingCache<String, Counter> metricCache;
   private final Storage storage;

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java
index 88b2d10..4f243aa 100644
--- a/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java
@@ -160,12 +160,7 @@ interface SlaAlgorithm {
             IScheduledTask::getStatus);
 
     private static final Function<IScheduledTask, ITaskEvent> TASK_TO_EVENT =
-        new Function<IScheduledTask, ITaskEvent>() {
-          @Override
-          public ITaskEvent apply(IScheduledTask task) {
-            return Tasks.getLatestEvent(task);
-          }
-        };
+        Tasks::getLatestEvent;
 
     private JobUptime(float percentile) {
       this.percentile = percentile;
@@ -176,12 +171,7 @@ interface SlaAlgorithm {
       List<Long> uptimes = FluentIterable.from(tasks)
           .filter(IS_RUNNING)
           .transform(Functions.compose(
-              new Function<ITaskEvent, Long>() {
-                @Override
-                public Long apply(ITaskEvent event) {
-                  return timeFrame.upperEndpoint() - event.getTimestamp();
-                }
-              },
+              event -> timeFrame.upperEndpoint() - event.getTimestamp(),
               TASK_TO_EVENT)).toList();
 
       return (int) Math.floor((double) SlaUtil.percentile(uptimes, percentile) / 1000);
@@ -278,102 +268,86 @@ interface SlaAlgorithm {
     }
 
     private static final Function<IScheduledTask, InstanceId> TO_ID =
-        new Function<IScheduledTask, InstanceId>() {
-          @Override
-          public InstanceId apply(IScheduledTask task) {
-            return new InstanceId(
-                task.getAssignedTask().getTask().getJob(),
-                task.getAssignedTask().getInstanceId());
-          }
-        };
+        task -> new InstanceId(
+            task.getAssignedTask().getTask().getJob(),
+            task.getAssignedTask().getInstanceId());
 
     private static final Function<ITaskEvent, Long> TASK_EVENT_TO_TIMESTAMP =
-        new Function<ITaskEvent, Long>() {
-          @Override
-          public Long apply(ITaskEvent taskEvent) {
-            return taskEvent.getTimestamp();
-          }
-        };
+        ITaskEvent::getTimestamp;
 
     /**
      * Combine all task events per given instance into the unified sorted instance history view.
      */
     private static final Function<Collection<IScheduledTask>, List<ITaskEvent>> TO_SORTED_EVENTS =
-        new Function<Collection<IScheduledTask>, List<ITaskEvent>>() {
-          @Override
-          public List<ITaskEvent> apply(Collection<IScheduledTask> tasks) {
-            List<ITaskEvent> result = Lists.newLinkedList();
-            for (IScheduledTask task : tasks) {
-              result.addAll(task.getTaskEvents());
-            }
-
-            return Ordering.natural()
-                .onResultOf(TASK_EVENT_TO_TIMESTAMP).immutableSortedCopy(result);
+        tasks -> {
+          List<ITaskEvent> result = Lists.newLinkedList();
+          for (IScheduledTask task : tasks) {
+            result.addAll(task.getTaskEvents());
           }
+
+          return Ordering.natural()
+              .onResultOf(TASK_EVENT_TO_TIMESTAMP).immutableSortedCopy(result);
         };
 
     /**
      * Convert instance history into the {@link SlaState} based {@link Interval} list.
      */
     private static final Function<List<ITaskEvent>, List<Interval>> TASK_EVENTS_TO_INTERVALS =
-        new Function<List<ITaskEvent>, List<Interval>>() {
-          @Override
-          public List<Interval> apply(List<ITaskEvent> events) {
-
-            ImmutableList.Builder<Interval> intervals = ImmutableList.builder();
-            Pair<SlaState, Long> current = Pair.of(SlaState.REMOVED, 0L);
-
-            for (ITaskEvent event : events) {
-              long timestamp = event.getTimestamp();
-
-              // Event status in the instance timeline signifies either of the following:
-              // - termination of the existing SlaState interval AND start of a new one;
-              // - continuation of the existing matching SlaState interval.
-              switch (event.getStatus()) {
-                case LOST:
-                case DRAINING:
-                case PREEMPTING:
-                  current = updateIntervals(timestamp, SlaState.DOWN, current, intervals);
-                  break;
-
-                case PENDING:
-                case ASSIGNED:
-                case STARTING:
-                  if (current.getFirst() != SlaState.DOWN) {
-                    current = updateIntervals(timestamp, SlaState.REMOVED, current, intervals);
-                  }
-                  break;
-
-                case THROTTLED:
-                case FINISHED:
-                case RESTARTING:
-                case FAILED:
-                case KILLING:
+        events -> {
+
+          ImmutableList.Builder<Interval> intervals = ImmutableList.builder();
+          Pair<SlaState, Long> current = Pair.of(SlaState.REMOVED, 0L);
+
+          for (ITaskEvent event : events) {
+            long timestamp = event.getTimestamp();
+
+            // Event status in the instance timeline signifies either of the following:
+            // - termination of the existing SlaState interval AND start of a new one;
+            // - continuation of the existing matching SlaState interval.
+            switch (event.getStatus()) {
+              case LOST:
+              case DRAINING:
+              case PREEMPTING:
+                current = updateIntervals(timestamp, SlaState.DOWN, current, intervals);
+                break;
+
+              case PENDING:
+              case ASSIGNED:
+              case STARTING:
+                if (current.getFirst() != SlaState.DOWN) {
                   current = updateIntervals(timestamp, SlaState.REMOVED, current, intervals);
-                  break;
-
-                case RUNNING:
-                  current = updateIntervals(timestamp, SlaState.UP, current, intervals);
-                  break;
-
-                case KILLED:
-                  if (current.getFirst() == SlaState.UP) {
-                    current = updateIntervals(timestamp, SlaState.DOWN, current, intervals);
-                  }
-                  break;
+                }
+                break;
+
+              case THROTTLED:
+              case FINISHED:
+              case RESTARTING:
+              case FAILED:
+              case KILLING:
+                current = updateIntervals(timestamp, SlaState.REMOVED, current, intervals);
+                break;
+
+              case RUNNING:
+                current = updateIntervals(timestamp, SlaState.UP, current, intervals);
+                break;
+
+              case KILLED:
+                if (current.getFirst() == SlaState.UP) {
+                  current = updateIntervals(timestamp, SlaState.DOWN, current, intervals);
+                }
+                break;
 
-                case INIT:
-                  // Ignore.
-                  break;
+              case INIT:
+                // Ignore.
+                break;
 
-                default:
-                  throw new IllegalArgumentException("Unsupported status:" + event.getStatus());
-              }
+              default:
+                throw new IllegalArgumentException("Unsupported status:" + event.getStatus());
             }
-            // Add the last event interval.
-            intervals.add(new Interval(current.getFirst(), current.getSecond(), Long.MAX_VALUE));
-            return intervals.build();
           }
+          // Add the last event interval.
+          intervals.add(new Interval(current.getFirst(), current.getSecond(), Long.MAX_VALUE));
+          return intervals.build();
         };
 
     private static Pair<SlaState, Long> updateIntervals(

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
index 4827a0d..bf7c084 100644
--- a/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
@@ -17,7 +17,6 @@ import java.util.Map;
 
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
-import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableListMultimap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
@@ -62,12 +61,7 @@ interface SlaGroup {
             "sla_cpu_large_", Range.openClosed(MEDIUM.getNumCpus(), LARGE.getNumCpus()),
             "sla_cpu_xlarge_", Range.openClosed(LARGE.getNumCpus(), XLARGE.getNumCpus()),
             "sla_cpu_xxlarge_", Range.greaterThan(XLARGE.getNumCpus())),
-        new Function<IScheduledTask, Double>() {
-          @Override
-          public Double apply(IScheduledTask task) {
-            return task.getAssignedTask().getTask().getNumCpus();
-          }
-        }
+        task -> task.getAssignedTask().getTask().getNumCpus()
     )),
     RESOURCE_RAM(new Resource<>(
         ImmutableMap.of(
@@ -76,12 +70,7 @@ interface SlaGroup {
             "sla_ram_large_", Range.openClosed(MEDIUM.getRamMb(), LARGE.getRamMb()),
             "sla_ram_xlarge_", Range.openClosed(LARGE.getRamMb(), XLARGE.getRamMb()),
             "sla_ram_xxlarge_", Range.greaterThan(XLARGE.getRamMb())),
-        new Function<IScheduledTask, Long>() {
-          @Override
-          public Long apply(IScheduledTask task) {
-            return task.getAssignedTask().getTask().getRamMb();
-          }
-        }
+        task -> task.getAssignedTask().getTask().getRamMb()
     )),
     RESOURCE_DISK(new Resource<>(
         ImmutableMap.of(
@@ -90,12 +79,7 @@ interface SlaGroup {
             "sla_disk_large_", Range.openClosed(MEDIUM.getDiskMb(), LARGE.getDiskMb()),
             "sla_disk_xlarge_", Range.openClosed(LARGE.getDiskMb(), XLARGE.getDiskMb()),
             "sla_disk_xxlarge_", Range.greaterThan(XLARGE.getDiskMb())),
-        new Function<IScheduledTask, Long>() {
-          @Override
-          public Long apply(IScheduledTask task) {
-            return task.getAssignedTask().getTask().getDiskMb();
-          }
-        }
+        task -> task.getAssignedTask().getTask().getDiskMb()
     ));
 
     private SlaGroup group;
@@ -129,12 +113,7 @@ interface SlaGroup {
   class Cluster implements SlaGroup {
     @Override
     public Multimap<String, IScheduledTask> createNamedGroups(Iterable<IScheduledTask> tasks) {
-      return Multimaps.index(tasks, new Function<IScheduledTask, String>() {
-        @Override
-        public String apply(IScheduledTask task) {
-          return "sla_cluster_";
-        }
-      });
+      return Multimaps.index(tasks, task -> "sla_cluster_");
     }
   }
 
@@ -159,11 +138,8 @@ interface SlaGroup {
           ImmutableListMultimap.builder();
 
       for (final Map.Entry<String, Range<T>> entry : map.entrySet()) {
-        result.putAll(entry.getKey(), Iterables.filter(tasks, new Predicate<IScheduledTask>() {
-          @Override
-          public boolean apply(IScheduledTask task) {
-            return entry.getValue().contains(function.apply(task));
-          }
+        result.putAll(entry.getKey(), Iterables.filter(tasks, task -> {
+          return entry.getValue().contains(function.apply(task));
         }));
       }
       return result.build();

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
index 7660022..59c9786 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/LockManagerImpl.java
@@ -26,10 +26,7 @@ import org.apache.aurora.gen.LockKey._Fields;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.storage.LockStore;
 import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.Work;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.entities.ILock;
 import org.apache.aurora.scheduler.storage.entities.ILockKey;
 
@@ -54,54 +51,42 @@ public class LockManagerImpl implements LockManager {
 
   @Override
   public ILock acquireLock(final ILockKey lockKey, final String user) throws LockException {
-    return storage.write(new MutateWork<ILock, LockException>() {
-      @Override
-      public ILock apply(Storage.MutableStoreProvider storeProvider)
-          throws LockException {
-
-        LockStore.Mutable lockStore = storeProvider.getLockStore();
-        Optional<ILock> existingLock = lockStore.fetchLock(lockKey);
-
-        if (existingLock.isPresent()) {
-          throw new LockException(String.format(
-              "Operation for: %s is already in progress. Started at: %s. Current owner: %s.",
-              formatLockKey(lockKey),
-              new Date(existingLock.get().getTimestampMs()).toString(),
-              existingLock.get().getUser()));
-        }
-
-        ILock lock = ILock.build(new Lock()
-            .setKey(lockKey.newBuilder())
-            .setToken(tokenGenerator.createNew().toString())
-            .setTimestampMs(clock.nowMillis())
-            .setUser(user));
-
-        lockStore.saveLock(lock);
-        return lock;
+    return storage.write(storeProvider -> {
+
+      LockStore.Mutable lockStore = storeProvider.getLockStore();
+      Optional<ILock> existingLock = lockStore.fetchLock(lockKey);
+
+      if (existingLock.isPresent()) {
+        throw new LockException(String.format(
+            "Operation for: %s is already in progress. Started at: %s. Current owner: %s.",
+            formatLockKey(lockKey),
+            new Date(existingLock.get().getTimestampMs()).toString(),
+            existingLock.get().getUser()));
       }
+
+      ILock lock = ILock.build(new Lock()
+          .setKey(lockKey.newBuilder())
+          .setToken(tokenGenerator.createNew().toString())
+          .setTimestampMs(clock.nowMillis())
+          .setUser(user));
+
+      lockStore.saveLock(lock);
+      return lock;
     });
   }
 
   @Override
   public void releaseLock(final ILock lock) {
-    storage.write(new MutateWork.NoResult.Quiet() {
-      @Override
-      public void execute(MutableStoreProvider storeProvider) {
-        storeProvider.getLockStore().removeLock(lock.getKey());
-      }
-    });
+    storage.write(
+        (NoResult.Quiet) storeProvider -> storeProvider.getLockStore().removeLock(lock.getKey()));
   }
 
   @Override
   public void validateIfLocked(final ILockKey context, Optional<ILock> heldLock)
       throws LockException {
 
-    Optional<ILock> stored = storage.read(new Work.Quiet<Optional<ILock>>() {
-      @Override
-      public Optional<ILock> apply(StoreProvider storeProvider) {
-        return storeProvider.getLockStore().fetchLock(context);
-      }
-    });
+    Optional<ILock> stored = storage.read(
+        storeProvider -> storeProvider.getLockStore().fetchLock(context));
 
     // The implementation below assumes the following use cases:
     // +-----------+-----------------+----------+
@@ -125,12 +110,7 @@ public class LockManagerImpl implements LockManager {
 
   @Override
   public Iterable<ILock> getLocks() {
-    return storage.read(new Work.Quiet<Iterable<ILock>>() {
-      @Override
-      public Iterable<ILock> apply(StoreProvider storeProvider) {
-        return storeProvider.getLockStore().fetchLocks();
-      }
-    });
+    return storage.read(storeProvider -> storeProvider.getLockStore().fetchLocks());
   }
 
   private static String formatLockKey(ILockKey lockKey) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
index 617ee54..60ebfdf 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -38,9 +38,7 @@ import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.Work;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
@@ -153,24 +151,21 @@ public interface MaintenanceController {
     public void taskChangedState(final TaskStateChange change) {
       if (Tasks.isTerminated(change.getNewState())) {
         final String host = change.getTask().getAssignedTask().getSlaveHost();
-        storage.write(new MutateWork.NoResult.Quiet() {
-          @Override
-          public void execute(MutableStoreProvider store) {
-            // If the task _was_ associated with a draining host, and it was the last task on the
-            // host.
-            Optional<IHostAttributes> attributes =
-                store.getAttributeStore().getHostAttributes(host);
-            if (attributes.isPresent() && attributes.get().getMode() == DRAINING) {
-              Query.Builder builder = Query.slaveScoped(host).active();
-              Iterable<IScheduledTask> activeTasks = store.getTaskStore().fetchTasks(builder);
-              if (Iterables.isEmpty(activeTasks)) {
-                LOG.info(String.format("Moving host %s into DRAINED", host));
-                setMaintenanceMode(store, ImmutableSet.of(host), DRAINED);
-              } else {
-                LOG.info(String.format("Host %s is DRAINING with active tasks: %s",
-                    host,
-                    Tasks.ids(activeTasks)));
-              }
+        storage.write((NoResult.Quiet) (MutableStoreProvider store) -> {
+          // If the task _was_ associated with a draining host, and it was the last task on the
+          // host.
+          Optional<IHostAttributes> attributes =
+              store.getAttributeStore().getHostAttributes(host);
+          if (attributes.isPresent() && attributes.get().getMode() == DRAINING) {
+            Query.Builder builder = Query.slaveScoped(host).active();
+            Iterable<IScheduledTask> activeTasks = store.getTaskStore().fetchTasks(builder);
+            if (Iterables.isEmpty(activeTasks)) {
+              LOG.info(String.format("Moving host %s into DRAINED", host));
+              setMaintenanceMode(store, ImmutableSet.of(host), DRAINED);
+            } else {
+              LOG.info(String.format("Host %s is DRAINING with active tasks: %s",
+                  host,
+                  Tasks.ids(activeTasks)));
             }
           }
         });
@@ -179,12 +174,8 @@ public interface MaintenanceController {
 
     @Override
     public Set<HostStatus> startMaintenance(final Set<String> hosts) {
-      return storage.write(new MutateWork.Quiet<Set<HostStatus>>() {
-        @Override
-        public Set<HostStatus> apply(MutableStoreProvider storeProvider) {
-          return setMaintenanceMode(storeProvider, hosts, MaintenanceMode.SCHEDULED);
-        }
-      });
+      return storage.write(
+          storeProvider -> setMaintenanceMode(storeProvider, hosts, MaintenanceMode.SCHEDULED));
     }
 
     @VisibleForTesting
@@ -193,73 +184,41 @@ public interface MaintenanceController {
 
     @Override
     public Set<HostStatus> drain(final Set<String> hosts) {
-      return storage.write(new MutateWork.Quiet<Set<HostStatus>>() {
-        @Override
-        public Set<HostStatus> apply(MutableStoreProvider store) {
-          return watchDrainingTasks(store, hosts);
-        }
-      });
+      return storage.write(store -> watchDrainingTasks(store, hosts));
     }
 
     private static final Function<IHostAttributes, String> HOST_NAME =
-        new Function<IHostAttributes, String>() {
-          @Override
-          public String apply(IHostAttributes attributes) {
-            return attributes.getHost();
-          }
-        };
+        IHostAttributes::getHost;
 
     private static final Function<IHostAttributes, HostStatus> ATTRS_TO_STATUS =
-        new Function<IHostAttributes, HostStatus>() {
-          @Override
-          public HostStatus apply(IHostAttributes attributes) {
-            return new HostStatus().setHost(attributes.getHost()).setMode(attributes.getMode());
-          }
-        };
+        attributes -> new HostStatus().setHost(attributes.getHost()).setMode(attributes.getMode());
 
     private static final Function<HostStatus, MaintenanceMode> GET_MODE =
-        new Function<HostStatus, MaintenanceMode>() {
-          @Override
-          public MaintenanceMode apply(HostStatus status) {
-            return status.getMode();
-          }
-        };
+        HostStatus::getMode;
 
     @Override
     public MaintenanceMode getMode(final String host) {
-      return storage.read(new Work.Quiet<MaintenanceMode>() {
-        @Override
-        public MaintenanceMode apply(StoreProvider storeProvider) {
-          return storeProvider.getAttributeStore().getHostAttributes(host)
-              .transform(ATTRS_TO_STATUS)
-              .transform(GET_MODE)
-              .or(MaintenanceMode.NONE);
-        }
-      });
+      return storage.read(storeProvider -> storeProvider.getAttributeStore().getHostAttributes(host)
+          .transform(ATTRS_TO_STATUS)
+          .transform(GET_MODE)
+          .or(MaintenanceMode.NONE));
     }
 
     @Override
     public Set<HostStatus> getStatus(final Set<String> hosts) {
-      return storage.read(new Work.Quiet<Set<HostStatus>>() {
-        @Override
-        public Set<HostStatus> apply(StoreProvider storeProvider) {
-          // Warning - this is filtering _all_ host attributes.  If using this to frequently query
-          // for a small set of hosts, a getHostAttributes variant should be added.
-          return FluentIterable.from(storeProvider.getAttributeStore().getHostAttributes())
-              .filter(Predicates.compose(Predicates.in(hosts), HOST_NAME))
-              .transform(ATTRS_TO_STATUS).toSet();
-        }
+      return storage.read(storeProvider -> {
+        // Warning - this is filtering _all_ host attributes.  If using this to frequently query
+        // for a small set of hosts, a getHostAttributes variant should be added.
+        return FluentIterable.from(storeProvider.getAttributeStore().getHostAttributes())
+            .filter(Predicates.compose(Predicates.in(hosts), HOST_NAME))
+            .transform(ATTRS_TO_STATUS).toSet();
       });
     }
 
     @Override
     public Set<HostStatus> endMaintenance(final Set<String> hosts) {
-      return storage.write(new MutateWork.Quiet<Set<HostStatus>>() {
-        @Override
-        public Set<HostStatus> apply(MutableStoreProvider storeProvider) {
-          return setMaintenanceMode(storeProvider, hosts, MaintenanceMode.NONE);
-        }
-      });
+      return storage.write(
+          storeProvider -> setMaintenanceMode(storeProvider, hosts, MaintenanceMode.NONE));
     }
 
     private Set<HostStatus> setMaintenanceMode(

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index f27c93b..6503af2 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -54,7 +54,6 @@ import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
 import org.apache.aurora.scheduler.state.SideEffect.Action;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -63,7 +62,6 @@ import org.apache.mesos.Protos.SlaveID;
 import static java.util.Objects.requireNonNull;
 
 import static org.apache.aurora.common.base.MorePreconditions.checkNotBlank;
-
 import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
 import static org.apache.aurora.gen.ScheduleStatus.INIT;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
@@ -121,12 +119,7 @@ public class StateManagerImpl implements StateManager {
 
     // Done outside the write transaction to minimize the work done inside a transaction.
     Set<IScheduledTask> scheduledTasks = FluentIterable.from(instanceIds)
-        .transform(new Function<Integer, IScheduledTask>() {
-          @Override
-          public IScheduledTask apply(Integer instanceId) {
-            return createTask(instanceId, task);
-          }
-        }).toSet();
+        .transform(instanceId -> createTask(instanceId, task)).toSet();
 
     Iterable<IScheduledTask> existingTasks = storeProvider.getTaskStore().fetchTasks(
         Query.jobScoped(task.getJob()).active());
@@ -182,16 +175,13 @@ public class StateManagerImpl implements StateManager {
     Query.Builder query = Query.taskScoped(taskId);
 
     storeProvider.getUnsafeTaskStore().mutateTasks(query,
-        new Function<IScheduledTask, IScheduledTask>() {
-          @Override
-          public IScheduledTask apply(IScheduledTask task) {
-            ScheduledTask builder = task.newBuilder();
-            builder.getAssignedTask()
-                .setAssignedPorts(assignedPorts)
-                .setSlaveHost(slaveHost)
-                .setSlaveId(slaveId.getValue());
-            return IScheduledTask.build(builder);
-          }
+        task -> {
+          ScheduledTask builder = task.newBuilder();
+          builder.getAssignedTask()
+              .setAssignedPorts(assignedPorts)
+              .setSlaveHost(slaveHost)
+              .setSlaveId(slaveId.getValue());
+          return IScheduledTask.build(builder);
         });
 
     StateChangeResult changeResult = updateTaskAndExternalState(
@@ -213,15 +203,12 @@ public class StateManagerImpl implements StateManager {
 
   @VisibleForTesting
   static final Supplier<String> LOCAL_HOST_SUPPLIER = Suppliers.memoize(
-      new Supplier<String>() {
-        @Override
-        public String get() {
-          try {
-            return InetAddress.getLocalHost().getHostName();
-          } catch (UnknownHostException e) {
-            LOG.log(Level.SEVERE, "Failed to get self hostname.");
-            throw Throwables.propagate(e);
-          }
+      () -> {
+        try {
+          return InetAddress.getLocalHost().getHostName();
+        } catch (UnknownHostException e) {
+          LOG.log(Level.SEVERE, "Failed to get self hostname.");
+          throw Throwables.propagate(e);
         }
       });
 
@@ -252,12 +239,7 @@ public class StateManagerImpl implements StateManager {
   }
 
   private static final Function<SideEffect, Action> GET_ACTION =
-      new Function<SideEffect, Action>() {
-        @Override
-        public Action apply(SideEffect sideEffect) {
-          return sideEffect.getAction();
-        }
-      };
+      SideEffect::getAction;
 
   private static final List<Action> ACTIONS_IN_ORDER = ImmutableList.of(
       Action.INCREMENT_FAILURES,
@@ -309,13 +291,8 @@ public class StateManagerImpl implements StateManager {
 
       switch (sideEffect.getAction()) {
         case INCREMENT_FAILURES:
-          taskStore.mutateTasks(query, new TaskMutation() {
-            @Override
-            public IScheduledTask apply(IScheduledTask task) {
-              return IScheduledTask.build(
-                  task.newBuilder().setFailureCount(task.getFailureCount() + 1));
-            }
-          });
+          taskStore.mutateTasks(query, task1 -> IScheduledTask.build(
+              task1.newBuilder().setFailureCount(task1.getFailureCount() + 1)));
           break;
 
         case SAVE_STATE:
@@ -323,18 +300,15 @@ public class StateManagerImpl implements StateManager {
               upToDateTask.isPresent(),
               "Operation expected task " + taskId + " to be present.");
 
-          taskStore.mutateTasks(query, new TaskMutation() {
-            @Override
-            public IScheduledTask apply(IScheduledTask task) {
-              ScheduledTask mutableTask = task.newBuilder();
-              mutableTask.setStatus(targetState.get());
-              mutableTask.addToTaskEvents(new TaskEvent()
-                  .setTimestamp(clock.nowMillis())
-                  .setStatus(targetState.get())
-                  .setMessage(transitionMessage.orNull())
-                  .setScheduler(LOCAL_HOST_SUPPLIER.get()));
-              return IScheduledTask.build(mutableTask);
-            }
+          taskStore.mutateTasks(query, task1 -> {
+            ScheduledTask mutableTask = task1.newBuilder();
+            mutableTask.setStatus(targetState.get());
+            mutableTask.addToTaskEvents(new TaskEvent()
+                .setTimestamp(clock.nowMillis())
+                .setStatus(targetState.get())
+                .setMessage(transitionMessage.orNull())
+                .setScheduler(LOCAL_HOST_SUPPLIER.get()));
+            return IScheduledTask.build(mutableTask);
           });
           events.add(
               PubsubEvent.TaskStateChange.transition(

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index de7ebb3..50868ea 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -23,14 +23,12 @@ import java.util.logging.Logger;
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.FluentIterable;
 
 import org.apache.aurora.common.inject.TimedInterceptor.Timed;
 import org.apache.aurora.common.stats.Stats;
-
 import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.Resources;
 import org.apache.aurora.scheduler.TierInfo;
@@ -117,12 +115,7 @@ public interface TaskAssigner {
 
       final Iterator<String> names = requestedPorts.iterator();
       Map<String, Integer> portsByName = FluentIterable.from(selectedPorts)
-          .uniqueIndex(new Function<Object, String>() {
-            @Override
-            public String apply(Object input) {
-              return names.next();
-            }
-          });
+          .uniqueIndex(input -> names.next());
 
       IAssignedTask assigned = stateManager.assignTask(
           storeProvider,

http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
index 9ace5b0..b8d8bf9 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
@@ -91,12 +91,7 @@ class TaskStateMachine {
   private final Set<SideEffect> sideEffects = Sets.newHashSet();
 
   private static final Function<ScheduleStatus, TaskState> STATUS_TO_TASK_STATE =
-      new Function<ScheduleStatus, TaskState>() {
-        @Override
-        public TaskState apply(ScheduleStatus input) {
-          return TaskState.valueOf(input.name());
-        }
-      };
+      input -> TaskState.valueOf(input.name());
 
   private static final Function<IScheduledTask, TaskState> SCHEDULED_TO_TASK_STATE =
       Functions.compose(STATUS_TO_TASK_STATE, IScheduledTask::getStatus);
@@ -183,52 +178,46 @@ class TaskStateMachine {
             .build());
 
     final Closure<Transition<TaskState>> manageRestartingTask =
-        new Closure<Transition<TaskState>>() {
-          @Override
-          public void execute(Transition<TaskState> transition) {
-            switch (transition.getTo()) {
-              case ASSIGNED:
-                addFollowup(KILL);
-                break;
-
-              case STARTING:
-                addFollowup(KILL);
-                break;
-
-              case RUNNING:
-                addFollowup(KILL);
-                break;
-
-              case LOST:
-                addFollowup(KILL);
-                addFollowup(RESCHEDULE);
-                break;
-
-              case FINISHED:
-                addFollowup(RESCHEDULE);
-                break;
-
-              case FAILED:
-                addFollowup(RESCHEDULE);
-                break;
-
-              case KILLED:
-                addFollowup(RESCHEDULE);
-                break;
-
-              default:
-                // No-op.
-            }
+        transition -> {
+          switch (transition.getTo()) {
+            case ASSIGNED:
+              addFollowup(KILL);
+              break;
+
+            case STARTING:
+              addFollowup(KILL);
+              break;
+
+            case RUNNING:
+              addFollowup(KILL);
+              break;
+
+            case LOST:
+              addFollowup(KILL);
+              addFollowup(RESCHEDULE);
+              break;
+
+            case FINISHED:
+              addFollowup(RESCHEDULE);
+              break;
+
+            case FAILED:
+              addFollowup(RESCHEDULE);
+              break;
+
+            case KILLED:
+              addFollowup(RESCHEDULE);
+              break;
+
+            default:
+              // No-op.
           }
         };
 
     // To be called on a task transitioning into the FINISHED state.
-    final Command rescheduleIfService = new Command() {
-      @Override
-      public void execute() {
-        if (task.get().getAssignedTask().getTask().isIsService()) {
-          addFollowup(RESCHEDULE);
-        }
+    final Command rescheduleIfService = () -> {
+      if (task.get().getAssignedTask().getTask().isIsService()) {
+        addFollowup(RESCHEDULE);
       }
     };
 
@@ -275,46 +264,43 @@ class TaskStateMachine {
                 .to(STARTING, RUNNING, FINISHED, FAILED, RESTARTING, DRAINING,
                     KILLED, KILLING, LOST, PREEMPTING)
                 .withCallback(
-                    new Closure<Transition<TaskState>>() {
-                      @Override
-                      public void execute(Transition<TaskState> transition) {
-                        switch (transition.getTo()) {
-                          case FINISHED:
-                            rescheduleIfService.execute();
-                            break;
-
-                          case PREEMPTING:
-                            addFollowup(KILL);
-                            break;
-
-                          case FAILED:
-                            incrementFailuresMaybeReschedule.execute();
-                            break;
-
-                          case RESTARTING:
-                            addFollowup(KILL);
-                            break;
-
-                          case DRAINING:
-                            addFollowup(KILL);
-                            break;
-
-                          case KILLED:
-                            addFollowup(RESCHEDULE);
-                            break;
-
-                          case LOST:
-                            addFollowup(RESCHEDULE);
-                            addFollowup(KILL);
-                            break;
-
-                          case KILLING:
-                            addFollowup(KILL);
-                            break;
-
-                          default:
-                            // No-op.
-                        }
+                    transition -> {
+                      switch (transition.getTo()) {
+                        case FINISHED:
+                          rescheduleIfService.execute();
+                          break;
+
+                        case PREEMPTING:
+                          addFollowup(KILL);
+                          break;
+
+                        case FAILED:
+                          incrementFailuresMaybeReschedule.execute();
+                          break;
+
+                        case RESTARTING:
+                          addFollowup(KILL);
+                          break;
+
+                        case DRAINING:
+                          addFollowup(KILL);
+                          break;
+
+                        case KILLED:
+                          addFollowup(RESCHEDULE);
+                          break;
+
+                        case LOST:
+                          addFollowup(RESCHEDULE);
+                          addFollowup(KILL);
+                          break;
+
+                        case KILLING:
+                          addFollowup(KILL);
+                          break;
+
+                        default:
+                          // No-op.
                       }
                     }
                 ))
@@ -323,45 +309,42 @@ class TaskStateMachine {
                 .to(RUNNING, FINISHED, FAILED, RESTARTING, DRAINING, KILLING,
                     KILLED, LOST, PREEMPTING)
                 .withCallback(
-                    new Closure<Transition<TaskState>>() {
-                      @Override
-                      public void execute(Transition<TaskState> transition) {
-                        switch (transition.getTo()) {
-                          case FINISHED:
-                            rescheduleIfService.execute();
-                            break;
-
-                          case RESTARTING:
-                            addFollowup(KILL);
-                            break;
-
-                          case DRAINING:
-                            addFollowup(KILL);
-                            break;
-
-                          case PREEMPTING:
-                            addFollowup(KILL);
-                            break;
-
-                          case FAILED:
-                            incrementFailuresMaybeReschedule.execute();
-                            break;
-
-                          case KILLED:
-                            addFollowup(RESCHEDULE);
-                            break;
-
-                          case KILLING:
-                            addFollowup(KILL);
-                            break;
-
-                          case LOST:
-                            addFollowup(RESCHEDULE);
-                            break;
-
-                          default:
-                            // No-op.
-                        }
+                    transition -> {
+                      switch (transition.getTo()) {
+                        case FINISHED:
+                          rescheduleIfService.execute();
+                          break;
+
+                        case RESTARTING:
+                          addFollowup(KILL);
+                          break;
+
+                        case DRAINING:
+                          addFollowup(KILL);
+                          break;
+
+                        case PREEMPTING:
+                          addFollowup(KILL);
+                          break;
+
+                        case FAILED:
+                          incrementFailuresMaybeReschedule.execute();
+                          break;
+
+                        case KILLED:
+                          addFollowup(RESCHEDULE);
+                          break;
+
+                        case KILLING:
+                          addFollowup(KILL);
+                          break;
+
+                        case LOST:
+                          addFollowup(RESCHEDULE);
+                          break;
+
+                        default:
+                          // No-op.
                       }
                     }
                 ))
@@ -369,45 +352,42 @@ class TaskStateMachine {
             Rule.from(RUNNING)
                 .to(FINISHED, RESTARTING, DRAINING, FAILED, KILLING, KILLED, LOST, PREEMPTING)
                 .withCallback(
-                    new Closure<Transition<TaskState>>() {
-                      @Override
-                      public void execute(Transition<TaskState> transition) {
-                        switch (transition.getTo()) {
-                          case FINISHED:
-                            rescheduleIfService.execute();
-                            break;
-
-                          case PREEMPTING:
-                            addFollowup(KILL);
-                            break;
-
-                          case RESTARTING:
-                            addFollowup(KILL);
-                            break;
-
-                          case DRAINING:
-                            addFollowup(KILL);
-                            break;
-
-                          case FAILED:
-                            incrementFailuresMaybeReschedule.execute();
-                            break;
-
-                          case KILLED:
-                            addFollowup(RESCHEDULE);
-                            break;
-
-                          case KILLING:
-                            addFollowup(KILL);
-                            break;
-
-                          case LOST:
-                            addFollowup(RESCHEDULE);
-                            break;
-
-                          default:
-                            // No-op.
-                        }
+                    transition -> {
+                      switch (transition.getTo()) {
+                        case FINISHED:
+                          rescheduleIfService.execute();
+                          break;
+
+                        case PREEMPTING:
+                          addFollowup(KILL);
+                          break;
+
+                        case RESTARTING:
+                          addFollowup(KILL);
+                          break;
+
+                        case DRAINING:
+                          addFollowup(KILL);
+                          break;
+
+                        case FAILED:
+                          incrementFailuresMaybeReschedule.execute();
+                          break;
+
+                        case KILLED:
+                          addFollowup(RESCHEDULE);
+                          break;
+
+                        case KILLING:
+                          addFollowup(KILL);
+                          break;
+
+                        case LOST:
+                          addFollowup(RESCHEDULE);
+                          break;
+
+                        default:
+                          // No-op.
                       }
                     }
                 ))
@@ -495,12 +475,7 @@ class TaskStateMachine {
   }
 
   private Closure<Transition<TaskState>> addFollowupClosure(final Action action) {
-    return new Closure<Transition<TaskState>>() {
-      @Override
-      public void execute(Transition<TaskState> item) {
-        addFollowup(action);
-      }
-    };
+    return item -> addFollowup(action);
   }
 
   /**
@@ -549,12 +524,7 @@ class TaskStateMachine {
    */
   @Nullable
   ScheduleStatus getPreviousState() {
-    return previousState.transform(new Function<TaskState, ScheduleStatus>() {
-      @Override
-      public ScheduleStatus apply(TaskState item) {
-        return item.getStatus().orNull();
-      }
-    }).orNull();
+    return previousState.transform(item -> item.getStatus().orNull()).orNull();
   }
 
   @Override


Mime
View raw message