aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zma...@apache.org
Subject aurora git commit: Replace org.apache.aurora.common.base.Closure with java.util.function.Consumer
Date Fri, 15 Apr 2016 21:03:13 GMT
Repository: aurora
Updated Branches:
  refs/heads/master 02ffef5de -> 03ec02389


Replace org.apache.aurora.common.base.Closure with java.util.function.Consumer

Commons came with a `Closure` type which is identical to the Java 8 type
`Consumer`. This replaces the former with the latter in the interests of
reducing the commons code and fork.

Reviewed at https://reviews.apache.org/r/46167/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/03ec0238
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/03ec0238
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/03ec0238

Branch: refs/heads/master
Commit: 03ec02389544a1c67638b601d9256c339fb014e9
Parents: 02ffef5
Author: Zameer Manji <zmanji@apache.org>
Authored: Fri Apr 15 14:02:57 2016 -0700
Committer: Zameer Manji <zmanji@apache.org>
Committed: Fri Apr 15 14:02:57 2016 -0700

----------------------------------------------------------------------
 .../org/apache/aurora/common/base/Closure.java  |  33 ------
 .../org/apache/aurora/common/base/Closures.java |  93 ---------------
 .../apache/aurora/common/base/Consumers.java    |  89 ++++++++++++++
 .../apache/aurora/common/util/StateMachine.java |  40 +++----
 .../util/templating/StringTemplateHelper.java   |   8 +-
 .../apache/aurora/common/base/ClosuresTest.java | 116 ------------------
 .../aurora/common/base/ConsumersTest.java       | 118 +++++++++++++++++++
 .../aurora/common/util/StateMachineTest.java    |  43 +++----
 .../aurora/scheduler/SchedulerLifecycle.java    |  32 ++---
 .../scheduler/http/JerseyTemplateServlet.java   |   4 +-
 .../scheduler/state/TaskStateMachine.java       |  24 ++--
 .../scheduler/storage/log/LogStorage.java       |  18 +--
 .../scheduler/storage/log/StreamManager.java    |   7 +-
 .../storage/log/StreamManagerImpl.java          |   6 +-
 .../scheduler/storage/log/LogManagerTest.java   |  18 +--
 15 files changed, 308 insertions(+), 341 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/commons/src/main/java/org/apache/aurora/common/base/Closure.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/base/Closure.java b/commons/src/main/java/org/apache/aurora/common/base/Closure.java
deleted file mode 100644
index 42c9d11..0000000
--- a/commons/src/main/java/org/apache/aurora/common/base/Closure.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.base;
-
-/**
- * A closure that does not throw any checked exceptions.
- *
- * @param <T> Closure value type.
- *
- * @author John Sirois
- */
-@FunctionalInterface
-public interface Closure<T> {
-  // convenience typedef
-
-  /**
-   * Performs a unit of work on item
-   *
-   * @param item the item to perform work against
-   */
-  void execute(T item);
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/commons/src/main/java/org/apache/aurora/common/base/Closures.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/base/Closures.java b/commons/src/main/java/org/apache/aurora/common/base/Closures.java
deleted file mode 100644
index 0953119..0000000
--- a/commons/src/main/java/org/apache/aurora/common/base/Closures.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.base;
-
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Utilities for dealing with Closures.
- *
- * @author John Sirois
- */
-public final class Closures {
-
-  private static final Closure<?> NOOP = item -> {
-    // noop
-  };
-
-  private Closures() {
-    // utility
-  }
-
-  /**
-   * Combines multiple closures into a single closure, whose calls are replicated sequentially
-   * in the order that they were provided.
-   * If an exception is encountered from a closure it propagates to the top-level closure and the
-   * remaining closures are not executed.
-   *
-   * @param closures Closures to combine.
-   * @param <T> Type accepted by the closures.
-   * @return A single closure that will fan out all calls to {@link Closure#execute(Object)} to
-   *    the wrapped closures.
-   */
-  public static <T> Closure<T> combine(Iterable<Closure<T>> closures) {
-    checkNotNull(closures);
-    checkArgument(Iterables.all(closures, Predicates.notNull()));
-
-    final Iterable<Closure<T>> closuresCopy = ImmutableList.copyOf(closures);
-
-    return item -> {
-      for (Closure<T> closure : closuresCopy) {
-        closure.execute(item);
-      }
-    };
-  }
-
-  /**
-   * Applies a filter to a closure, such that the closure will only be called when the filter is
-   * satisfied (returns {@code true}}.
-   *
-   * @param filter Filter to determine when {@code closure} is called.
-   * @param closure Closure to filter.
-   * @param <T> Type handled by the filter and the closure.
-   * @return A filtered closure.
-   */
-  public static <T> Closure<T> filter(final Predicate<T> filter, final Closure<T> closure) {
-    checkNotNull(filter);
-    checkNotNull(closure);
-
-    return item -> {
-      if (filter.apply(item)) {
-        closure.execute(item);
-      }
-    };
-  }
-
-  /**
-   * Returns a closure that will do nothing.
-   *
-   * @param <T> The closure argument type.
-   * @return A closure that does nothing.
-   */
-  @SuppressWarnings("unchecked")
-  public static <T> Closure<T> noop() {
-    return (Closure<T>) NOOP;
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/commons/src/main/java/org/apache/aurora/common/base/Consumers.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/base/Consumers.java b/commons/src/main/java/org/apache/aurora/common/base/Consumers.java
new file mode 100644
index 0000000..06f3269
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/base/Consumers.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.base;
+
+import java.util.List;
+import java.util.function.Consumer;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Utilities for dealing with Consumers.
+ *
+ * @author John Sirois
+ */
+public final class Consumers {
+
+  private static final Consumer<?> NOOP = item -> {
+    // noop
+  };
+
+  private Consumers() {
+    // utility
+  }
+
+  /**
+   * Combines multiple consumers into a single consumer, whose calls are replicated sequentially
+   * in the order that they were provided.
+   * If an exception is encountered from a consumer it propagates to the top-level consumer and the
+   * remaining consumer are not executed.
+   *
+   * @param consumers Consumers to combine.
+   * @param <T> Type accepted by the consumers.
+   * @return A single consumer that will fan out all calls to {@link Consumer#accept(Object)} to
+   *    the wrapped consumers.
+   */
+  public static <T> Consumer<T> combine(List<Consumer<T>> consumers) {
+    checkNotNull(consumers);
+    checkArgument(Iterables.all(consumers, Predicates.notNull()));
+
+    return consumers.stream().reduce(noop(), Consumer::andThen);
+  }
+
+  /**
+   * Applies a filter to a consumer, such that the consumer will only be called when the filter is
+   * satisfied (returns {@code true}}.
+   *
+   * @param filter Filter to determine when {@code consumer} is called.
+   * @param consumer Consumer to filter.
+   * @param <T> Type handled by the filter and the consumer.
+   * @return A filtered consumer.
+   */
+  public static <T> Consumer<T> filter(final Predicate<T> filter, final Consumer<T> consumer) {
+    checkNotNull(filter);
+    checkNotNull(consumer);
+
+    return item -> {
+      if (filter.apply(item)) {
+        consumer.accept(item);
+      }
+    };
+  }
+
+  /**
+   * Returns a consumer that will do nothing.
+   *
+   * @param <T> The consumer argument type.
+   * @return A consumer that does nothing.
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> Consumer<T> noop() {
+    return (Consumer<T>) NOOP;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/commons/src/main/java/org/apache/aurora/common/util/StateMachine.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/util/StateMachine.java b/commons/src/main/java/org/apache/aurora/common/util/StateMachine.java
index e8aa000..785c5f1 100644
--- a/commons/src/main/java/org/apache/aurora/common/util/StateMachine.java
+++ b/commons/src/main/java/org/apache/aurora/common/util/StateMachine.java
@@ -19,6 +19,7 @@ import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
@@ -30,8 +31,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 
-import org.apache.aurora.common.base.Closure;
-import org.apache.aurora.common.base.Closures;
+import org.apache.aurora.common.base.Consumers;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +58,7 @@ public class StateMachine<T> {
   // Stores mapping from states to the states that the machine is allowed to transition into.
   private final Multimap<T, T> stateTransitions;
 
-  private final Closure<Transition<T>> transitionCallback;
+  private final Consumer<Transition<T>> transitionCallback;
   private final boolean throwOnBadTransition;
 
   private volatile T currentState;
@@ -69,7 +69,7 @@ public class StateMachine<T> {
   private StateMachine(String name,
       T initialState,
       Multimap<T, T> stateTransitions,
-      Closure<Transition<T>> transitionCallback,
+      Consumer<Transition<T>> transitionCallback,
       boolean throwOnBadTransition) {
     this.name = name;
     this.currentState = initialState;
@@ -157,7 +157,7 @@ public class StateMachine<T> {
       writeLock.unlock();
     }
 
-    transitionCallback.execute(new Transition<T>(currentCopy, nextState, transitionAllowed));
+    transitionCallback.accept(new Transition<T>(currentCopy, nextState, transitionAllowed));
     return transitionAllowed;
   }
 
@@ -186,17 +186,17 @@ public class StateMachine<T> {
   public static class Rule<T> {
     private final T from;
     private final Set<T> to;
-    private final Closure<Transition<T>> callback;
+    private final Consumer<Transition<T>> callback;
 
     private Rule(T from) {
       this(from, ImmutableSet.<T>of());
     }
 
     private Rule(T from, Set<T> to) {
-      this(from, to, Closures.<Transition<T>>noop());
+      this(from, to, Consumers.<Transition<T>>noop());
     }
 
-    private Rule(T from, Set<T> to, Closure<Transition<T>> callback) {
+    private Rule(T from, Set<T> to, Consumer<Transition<T>> callback) {
       this.from = checkNotNull(from);
       this.to = checkNotNull(to);
       this.callback = checkNotNull(callback);
@@ -210,7 +210,7 @@ public class StateMachine<T> {
      * @return A new rule that is identical to this rule, but with the provided
      *     callback
      */
-    public Rule<T> withCallback(Closure<Transition<T>> callback) {
+    public Rule<T> withCallback(Consumer<Transition<T>> callback) {
       return new Rule<T>(from, to, callback);
     }
 
@@ -280,7 +280,7 @@ public class StateMachine<T> {
     private final String name;
     private T initialState;
     private final Multimap<T, T> stateTransitions = HashMultimap.create();
-    private final List<Closure<Transition<T>>> transitionCallbacks = Lists.newArrayList();
+    private final List<Consumer<Transition<T>>> transitionCallbacks = Lists.newArrayList();
     private boolean throwOnBadTransition = true;
 
     public Builder(String name) {
@@ -319,8 +319,8 @@ public class StateMachine<T> {
      * @param transitionStates Allowed transitions from {@code state}.
      * @return A reference to the builder.
      */
-    public Builder<T> addState(Closure<Transition<T>> callback, T state,
-        Set<T> transitionStates) {
+    public Builder<T> addState(Consumer<Transition<T>> callback, T state,
+                               Set<T> transitionStates) {
       checkNotNull(callback);
       checkNotNull(state);
 
@@ -335,15 +335,15 @@ public class StateMachine<T> {
     }
 
     /**
-     * Varargs version of {@link #addState(Closure, Object, java.util.Set)}.
+     * Varargs version of {@link #addState(Consumer, Object, java.util.Set)}.
      *
      * @param callback Callback to notify of any transition attempted from the state.
      * @param state State to add.
      * @param transitionStates Allowed transitions from {@code state}.
      * @return A reference to the builder.
      */
-    public Builder<T> addState(Closure<Transition<T>> callback, T state,
-        T... transitionStates) {
+    public Builder<T> addState(Consumer<Transition<T>> callback, T state,
+                               T... transitionStates) {
       Set<T> states = ImmutableSet.copyOf(transitionStates);
       Preconditions.checkArgument(Iterables.all(states, Predicates.notNull()));
 
@@ -360,12 +360,12 @@ public class StateMachine<T> {
      * @return A reference to the builder.
      */
     public Builder<T> addState(T state, T... transitionStates) {
-      return addState(Closures.<Transition<T>>noop(), state, transitionStates);
+      return addState(Consumers.<Transition<T>>noop(), state, transitionStates);
     }
 
     private void onTransition(Predicate<Transition<T>> transitionFilter,
-        Closure<Transition<T>> handler) {
-      onAnyTransition(Closures.filter(transitionFilter, handler));
+        Consumer<Transition<T>> handler) {
+      onAnyTransition(Consumers.filter(transitionFilter, handler));
     }
 
     /**
@@ -375,7 +375,7 @@ public class StateMachine<T> {
      * @param handler Callback to notify of transition attempts.
      * @return A reference to the builder.
      */
-    public Builder<T> onAnyTransition(Closure<Transition<T>> handler) {
+    public Builder<T> onAnyTransition(Consumer<Transition<T>> handler) {
       transitionCallbacks.add(handler);
       return this;
     }
@@ -413,7 +413,7 @@ public class StateMachine<T> {
       return new StateMachine<T>(name,
           initialState,
           stateTransitions,
-          Closures.combine(transitionCallbacks),
+          Consumers.combine(transitionCallbacks),
           throwOnBadTransition);
     }
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/commons/src/main/java/org/apache/aurora/common/util/templating/StringTemplateHelper.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/util/templating/StringTemplateHelper.java b/commons/src/main/java/org/apache/aurora/common/util/templating/StringTemplateHelper.java
index bcfa003..947e42e 100644
--- a/commons/src/main/java/org/apache/aurora/common/util/templating/StringTemplateHelper.java
+++ b/commons/src/main/java/org/apache/aurora/common/util/templating/StringTemplateHelper.java
@@ -15,13 +15,13 @@ package org.apache.aurora.common.util.templating;
 
 import java.io.IOException;
 import java.io.Writer;
+import java.util.function.Consumer;
 
 import com.google.common.base.Preconditions;
 
 import org.antlr.stringtemplate.AutoIndentWriter;
 import org.antlr.stringtemplate.StringTemplate;
 import org.antlr.stringtemplate.StringTemplateGroup;
-import org.apache.aurora.common.base.Closure;
 import org.apache.aurora.common.base.MorePreconditions;
 
 /**
@@ -74,19 +74,19 @@ public class StringTemplateHelper {
    * the unpopulated template object.
    *
    * @param out Template output writer.
-   * @param parameterSetter Closure to populate the template.
+   * @param parameterSetter Consumer to populate the template.
    * @throws TemplateException If an exception was encountered while populating the template.
    */
   public void writeTemplate(
       Writer out,
-      Closure<StringTemplate> parameterSetter) throws TemplateException {
+      Consumer<StringTemplate> parameterSetter) throws TemplateException {
 
     Preconditions.checkNotNull(out);
     Preconditions.checkNotNull(parameterSetter);
 
     StringTemplate stringTemplate = group.getInstanceOf(templatePath);
     try {
-      parameterSetter.execute(stringTemplate);
+      parameterSetter.accept(stringTemplate);
       stringTemplate.write(new AutoIndentWriter(out));
     } catch (IOException e) {
       throw new TemplateException("Failed to write template: " + e, e);

http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/commons/src/test/java/org/apache/aurora/common/base/ClosuresTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/base/ClosuresTest.java b/commons/src/test/java/org/apache/aurora/common/base/ClosuresTest.java
deleted file mode 100644
index 07bf23c..0000000
--- a/commons/src/test/java/org/apache/aurora/common/base/ClosuresTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.base;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
-
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.junit.Test;
-
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.fail;
-
-/**
- * @author John Sirois
- */
-public class ClosuresTest extends EasyMockTest {
-
-  private static final Clazz<Closure<Integer>> INT_CLOSURE_CLZ = new Clazz<Closure<Integer>>() { };
-  static class Thrown extends RuntimeException { }
-
-  @Test
-  public void testCombine() {
-    Closure<Integer> work1 = createMock(INT_CLOSURE_CLZ);
-    Closure<Integer> work2 = createMock(INT_CLOSURE_CLZ);
-
-    @SuppressWarnings("unchecked") // Needed because type information lost in vargs.
-    Closure<Integer> wrapper = Closures.combine(ImmutableList.of(work1, work2));
-
-    work1.execute(1);
-    work2.execute(1);
-
-    work1.execute(2);
-    work2.execute(2);
-
-    control.replay();
-
-    wrapper.execute(1);
-    wrapper.execute(2);
-  }
-
-  @Test
-  public void testCombineOneThrows() {
-    Closure<Integer> work1 = createMock(INT_CLOSURE_CLZ);
-    Closure<Integer> work2 = createMock(INT_CLOSURE_CLZ);
-    Closure<Integer> work3 = createMock(INT_CLOSURE_CLZ);
-
-    @SuppressWarnings("unchecked") // Needed because type information lost in vargs.
-    Closure<Integer> wrapper = Closures.combine(ImmutableList.of(work1, work2, work3));
-
-    work1.execute(1);
-    expectLastCall().andThrow(new Thrown());
-
-    work1.execute(2);
-    work2.execute(2);
-    expectLastCall().andThrow(new Thrown());
-
-    work1.execute(3);
-    work2.execute(3);
-    work3.execute(3);
-    expectLastCall().andThrow(new Thrown());
-
-    control.replay();
-
-    try {
-      wrapper.execute(1);
-      fail("Should have thrown.");
-    } catch (Thrown e) {
-      // Expected.
-    }
-
-    try {
-      wrapper.execute(2);
-      fail("Should have thrown.");
-    } catch (Thrown e) {
-      // Expected.
-    }
-
-    try {
-      wrapper.execute(3);
-      fail("Should have thrown.");
-    } catch (Thrown e) {
-      // Expected.
-    }
-  }
-
-  @Test
-  public void testFilter() {
-    Predicate<Integer> filter = createMock(new Clazz<Predicate<Integer>>() { });
-    Closure<Integer> work = createMock(INT_CLOSURE_CLZ);
-
-    expect(filter.apply(1)).andReturn(true);
-    work.execute(1);
-
-    expect(filter.apply(2)).andReturn(false);
-
-    Closure<Integer> filtered = Closures.filter(filter, work);
-
-    control.replay();
-
-    filtered.execute(1);
-    filtered.execute(2);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/commons/src/test/java/org/apache/aurora/common/base/ConsumersTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/base/ConsumersTest.java b/commons/src/test/java/org/apache/aurora/common/base/ConsumersTest.java
new file mode 100644
index 0000000..f25dd19
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/base/ConsumersTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.base;
+
+import java.util.function.Consumer;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.fail;
+
+/**
+ * @author John Sirois
+ */
+public class ConsumersTest extends EasyMockTest {
+
+  private static final Clazz<Consumer<Integer>> INT_CLOSURE_CLZ = new Clazz<Consumer<Integer>>() { };
+  static class Thrown extends RuntimeException { }
+
+  @Test
+  public void testCombine() {
+    Consumer<Integer> work1 = createMock(INT_CLOSURE_CLZ);
+    Consumer<Integer> work2 = createMock(INT_CLOSURE_CLZ);
+
+    @SuppressWarnings("unchecked") // Needed because type information lost in vargs.
+        Consumer<Integer> wrapper = Consumers.combine(ImmutableList.of(work1, work2));
+
+    work1.accept(1);
+    work2.accept(1);
+
+    work1.accept(2);
+    work2.accept(2);
+
+    control.replay();
+
+    wrapper.accept(1);
+    wrapper.accept(2);
+  }
+
+  @Test
+  public void testCombineOneThrows() {
+    Consumer<Integer> work1 = createMock(INT_CLOSURE_CLZ);
+    Consumer<Integer> work2 = createMock(INT_CLOSURE_CLZ);
+    Consumer<Integer> work3 = createMock(INT_CLOSURE_CLZ);
+
+    @SuppressWarnings("unchecked") // Needed because type information lost in vargs.
+        Consumer<Integer> wrapper = Consumers.combine(ImmutableList.of(work1, work2, work3));
+
+    work1.accept(1);
+    expectLastCall().andThrow(new Thrown());
+
+    work1.accept(2);
+    work2.accept(2);
+    expectLastCall().andThrow(new Thrown());
+
+    work1.accept(3);
+    work2.accept(3);
+    work3.accept(3);
+    expectLastCall().andThrow(new Thrown());
+
+    control.replay();
+
+    try {
+      wrapper.accept(1);
+      fail("Should have thrown.");
+    } catch (Thrown e) {
+      // Expected.
+    }
+
+    try {
+      wrapper.accept(2);
+      fail("Should have thrown.");
+    } catch (Thrown e) {
+      // Expected.
+    }
+
+    try {
+      wrapper.accept(3);
+      fail("Should have thrown.");
+    } catch (Thrown e) {
+      // Expected.
+    }
+  }
+
+  @Test
+  public void testFilter() {
+    Predicate<Integer> filter = createMock(new Clazz<Predicate<Integer>>() { });
+    Consumer<Integer> work = createMock(INT_CLOSURE_CLZ);
+
+    expect(filter.apply(1)).andReturn(true);
+    work.accept(1);
+
+    expect(filter.apply(2)).andReturn(false);
+
+    Consumer<Integer> filtered = Consumers.filter(filter, work);
+
+    control.replay();
+
+    filtered.accept(1);
+    filtered.accept(2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/commons/src/test/java/org/apache/aurora/common/util/StateMachineTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/util/StateMachineTest.java b/commons/src/test/java/org/apache/aurora/common/util/StateMachineTest.java
index 9591e82..1c0ee47 100644
--- a/commons/src/test/java/org/apache/aurora/common/util/StateMachineTest.java
+++ b/commons/src/test/java/org/apache/aurora/common/util/StateMachineTest.java
@@ -13,8 +13,9 @@
  */
 package org.apache.aurora.common.util;
 
-import org.apache.aurora.common.base.Closure;
-import org.apache.aurora.common.base.Closures;
+import java.util.function.Consumer;
+
+import org.apache.aurora.common.base.Consumers;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.common.util.StateMachine.Rule;
 import org.apache.aurora.common.util.StateMachine.Transition;
@@ -199,32 +200,32 @@ public class StateMachineTest extends EasyMockTest {
     assertThat(machine.getState(), is(A));
   }
 
-  private static final Clazz<Closure<Transition<String>>> TRANSITION_CLOSURE_CLZ =
-      new Clazz<Closure<Transition<String>>>() {};
+  private static final Clazz<Consumer<Transition<String>>> TRANSITION_CLOSURE_CLZ =
+      new Clazz<Consumer<Transition<String>>>() {};
 
   @Test
   public void testTransitionCallbacks() {
-    Closure<Transition<String>> anyTransition = createMock(TRANSITION_CLOSURE_CLZ);
-    Closure<Transition<String>> fromA = createMock(TRANSITION_CLOSURE_CLZ);
-    Closure<Transition<String>> fromB = createMock(TRANSITION_CLOSURE_CLZ);
+    Consumer<Transition<String>> anyTransition = createMock(TRANSITION_CLOSURE_CLZ);
+    Consumer<Transition<String>> fromA = createMock(TRANSITION_CLOSURE_CLZ);
+    Consumer<Transition<String>> fromB = createMock(TRANSITION_CLOSURE_CLZ);
 
     Transition<String> aToB = new Transition<>(A, B, true);
-    anyTransition.execute(aToB);
-    fromA.execute(aToB);
+    anyTransition.accept(aToB);
+    fromA.accept(aToB);
 
     Transition<String> bToB = new Transition<>(B, B, false);
-    anyTransition.execute(bToB);
-    fromB.execute(bToB);
+    anyTransition.accept(bToB);
+    fromB.accept(bToB);
 
     Transition<String> bToC = new Transition<>(B, C, true);
-    anyTransition.execute(bToC);
-    fromB.execute(bToC);
+    anyTransition.accept(bToC);
+    fromB.accept(bToC);
 
-    anyTransition.execute(new Transition<>(C, B, true));
+    anyTransition.accept(new Transition<>(C, B, true));
 
     Transition<String> bToD = new Transition<>(B, D, true);
-    anyTransition.execute(bToD);
-    fromB.execute(bToD);
+    anyTransition.accept(bToD);
+    fromB.accept(bToD);
 
     control.replay();
 
@@ -247,10 +248,10 @@ public class StateMachineTest extends EasyMockTest {
 
   @Test
   public void testFilteredTransitionCallbacks() {
-    Closure<Transition<String>> aToBHandler = createMock(TRANSITION_CLOSURE_CLZ);
-    Closure<Transition<String>> impossibleHandler = createMock(TRANSITION_CLOSURE_CLZ);
+    Consumer<Transition<String>> aToBHandler = createMock(TRANSITION_CLOSURE_CLZ);
+    Consumer<Transition<String>> impossibleHandler = createMock(TRANSITION_CLOSURE_CLZ);
 
-    aToBHandler.execute(new Transition<>(A, B, true));
+    aToBHandler.accept(new Transition<>(A, B, true));
 
     control.replay();
 
@@ -258,9 +259,9 @@ public class StateMachineTest extends EasyMockTest {
         .initialState(A)
         .addState(Rule
             .from(A).to(B, C)
-            .withCallback(Closures.filter(Transition.to(B), aToBHandler)))
+            .withCallback(Consumers.filter(Transition.to(B), aToBHandler)))
         .addState(Rule.from(B).to(A)
-            .withCallback(Closures.filter(Transition.to(B), impossibleHandler)))
+            .withCallback(Consumers.filter(Transition.to(B), impossibleHandler)))
         .addState(Rule.from(C).noTransitions())
         .build();
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
index debe899..195ab91 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 
 import javax.inject.Inject;
 import javax.inject.Qualifier;
@@ -39,8 +40,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.aurora.GuavaUtils.ServiceManagerIface;
 import org.apache.aurora.common.application.Lifecycle;
 import org.apache.aurora.common.application.ShutdownRegistry;
-import org.apache.aurora.common.base.Closure;
-import org.apache.aurora.common.base.Closures;
+import org.apache.aurora.common.base.Consumers;
 import org.apache.aurora.common.base.ExceptionalCommand;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
@@ -222,17 +222,17 @@ public class SchedulerLifecycle implements EventSubscriber {
       }
     });
 
-    final Closure<Transition<State>> prepareStorage = new Closure<Transition<State>>() {
+    final Consumer<Transition<State>> prepareStorage = new Consumer<Transition<State>>() {
       @Override
-      public void execute(Transition<State> transition) {
+      public void accept(Transition<State> transition) {
         storage.prepare();
         stateMachine.transition(State.STORAGE_PREPARED);
       }
     };
 
-    final Closure<Transition<State>> handleLeading = new Closure<Transition<State>>() {
+    final Consumer<Transition<State>> handleLeading = new Consumer<Transition<State>>() {
       @Override
-      public void execute(Transition<State> transition) {
+      public void accept(Transition<State> transition) {
         LOG.info("Elected as leading scheduler!");
 
         storage.start(stores -> {
@@ -258,9 +258,9 @@ public class SchedulerLifecycle implements EventSubscriber {
       }
     };
 
-    final Closure<Transition<State>> handleRegistered = new Closure<Transition<State>>() {
+    final Consumer<Transition<State>> handleRegistered = new Consumer<Transition<State>>() {
       @Override
-      public void execute(Transition<State> transition) {
+      public void accept(Transition<State> transition) {
         registrationAcked.set(true);
         delayedActions.blockingDriverJoin(() -> {
           driver.blockUntilStopped();
@@ -279,10 +279,10 @@ public class SchedulerLifecycle implements EventSubscriber {
       }
     };
 
-    final Closure<Transition<State>> shutDown = new Closure<Transition<State>>() {
+    final Consumer<Transition<State>> shutDown = new Consumer<Transition<State>>() {
       private final AtomicBoolean invoked = new AtomicBoolean(false);
       @Override
-      public void execute(Transition<State> transition) {
+      public void accept(Transition<State> transition) {
         if (!invoked.compareAndSet(false, true)) {
           LOG.info("Shutdown already invoked, ignoring extra call.");
           return;
@@ -314,18 +314,18 @@ public class SchedulerLifecycle implements EventSubscriber {
         .initialState(State.IDLE)
         .logTransitions()
         .addState(
-            dieOnError(Closures.filter(NOT_DEAD, prepareStorage)),
+            dieOnError(Consumers.filter(NOT_DEAD, prepareStorage)),
             State.IDLE,
             State.PREPARING_STORAGE, State.DEAD)
         .addState(
             State.PREPARING_STORAGE,
             State.STORAGE_PREPARED, State.DEAD)
         .addState(
-            dieOnError(Closures.filter(NOT_DEAD, handleLeading)),
+            dieOnError(Consumers.filter(NOT_DEAD, handleLeading)),
             State.STORAGE_PREPARED,
             State.LEADER_AWAITING_REGISTRATION, State.DEAD)
         .addState(
-            dieOnError(Closures.filter(NOT_DEAD, handleRegistered)),
+            dieOnError(Consumers.filter(NOT_DEAD, handleRegistered)),
             State.LEADER_AWAITING_REGISTRATION,
             State.ACTIVE, State.DEAD)
         .addState(
@@ -337,16 +337,16 @@ public class SchedulerLifecycle implements EventSubscriber {
             State.DEAD
         )
         .onAnyTransition(
-            Closures.filter(IS_DEAD, shutDown))
+            Consumers.filter(IS_DEAD, shutDown))
         .build();
 
     this.leadershipListener = new SchedulerCandidateImpl(stateMachine, leaderControl);
   }
 
-  private Closure<Transition<State>> dieOnError(final Closure<Transition<State>> closure) {
+  private Consumer<Transition<State>> dieOnError(final Consumer<Transition<State>> closure) {
     return transition -> {
       try {
-        closure.execute(transition);
+        closure.accept(transition);
       } catch (RuntimeException e) {
         LOG.error("Caught unchecked exception: " + e, e);
         stateMachine.transition(State.DEAD);

http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/src/main/java/org/apache/aurora/scheduler/http/JerseyTemplateServlet.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/JerseyTemplateServlet.java b/src/main/java/org/apache/aurora/scheduler/http/JerseyTemplateServlet.java
index 302388d..73455a0 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/JerseyTemplateServlet.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/JerseyTemplateServlet.java
@@ -14,12 +14,12 @@
 package org.apache.aurora.scheduler.http;
 
 import java.io.StringWriter;
+import java.util.function.Consumer;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
 
 import org.antlr.stringtemplate.StringTemplate;
-import org.apache.aurora.common.base.Closure;
 import org.apache.aurora.common.util.templating.StringTemplateHelper;
 import org.apache.aurora.common.util.templating.StringTemplateHelper.TemplateException;
 
@@ -36,7 +36,7 @@ class JerseyTemplateServlet {
     templateHelper = new StringTemplateHelper(getClass(), templatePath, true);
   }
 
-  protected final Response fillTemplate(Closure<StringTemplate> populator) {
+  protected final Response fillTemplate(Consumer<StringTemplate> populator) {
     StringWriter output = new StringWriter();
     try {
       templateHelper.writeTemplate(output, populator);

http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
index 6fd2951..23f256a 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
@@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.state;
 
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 
 import javax.annotation.Nullable;
 
@@ -27,9 +28,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
-import org.apache.aurora.common.base.Closure;
-import org.apache.aurora.common.base.Closures;
 import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.base.Consumers;
 import org.apache.aurora.common.base.MorePreconditions;
 import org.apache.aurora.common.stats.Stats;
 import org.apache.aurora.common.util.StateMachine;
@@ -168,17 +168,17 @@ class TaskStateMachine {
           "A task that does not exist must start in DELETED state.");
     }
 
-    Closure<Transition<TaskState>> manageTerminatedTasks = Closures.combine(
-        ImmutableList.<Closure<Transition<TaskState>>>builder()
+    Consumer<Transition<TaskState>> manageTerminatedTasks = Consumers.combine(
+        ImmutableList.<Consumer<Transition<TaskState>>>builder()
             // Kill a task that we believe to be terminated when an attempt is made to revive.
             .add(
-                Closures.filter(Transition.to(ASSIGNED, STARTING, RUNNING),
+                Consumers.filter(Transition.to(ASSIGNED, STARTING, RUNNING),
                     addFollowupClosure(KILL)))
             // Remove a terminated task that is requested to be deleted.
-            .add(Closures.filter(Transition.to(DELETED), addFollowupClosure(DELETE)))
+            .add(Consumers.filter(Transition.to(DELETED), addFollowupClosure(DELETE)))
             .build());
 
-    final Closure<Transition<TaskState>> manageRestartingTask =
+    final Consumer<Transition<TaskState>> manageRestartingTask =
         transition -> {
           switch (transition.getTo()) {
             case ASSIGNED:
@@ -243,8 +243,8 @@ class TaskStateMachine {
       }
     };
 
-    final Closure<Transition<TaskState>> deleteIfKilling =
-        Closures.filter(Transition.to(KILLING), addFollowupClosure(DELETE));
+    final Consumer<Transition<TaskState>> deleteIfKilling =
+        Consumers.filter(Transition.to(KILLING), addFollowupClosure(DELETE));
 
     stateMachine = StateMachine.<TaskState>builder(name)
         .logTransitions()
@@ -432,9 +432,9 @@ class TaskStateMachine {
         // Since we want this action to be performed last in the transition sequence, the callback
         // must be the last chained transition callback.
         .onAnyTransition(
-            new Closure<Transition<TaskState>>() {
+            new Consumer<Transition<TaskState>>() {
               @Override
-              public void execute(final Transition<TaskState> transition) {
+              public void accept(final Transition<TaskState> transition) {
                 if (transition.isValidStateChange()) {
                   TaskState from = transition.getFrom();
                   TaskState to = transition.getTo();
@@ -475,7 +475,7 @@ class TaskStateMachine {
     sideEffects.add(sideEffect);
   }
 
-  private Closure<Transition<TaskState>> addFollowupClosure(final Action action) {
+  private Consumer<Transition<TaskState>> addFollowupClosure(final Action action) {
     return item -> addFollowup(action);
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index 5143668..f586186 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -19,6 +19,7 @@ import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
 
 import javax.inject.Inject;
 
@@ -29,7 +30,6 @@ import com.google.common.util.concurrent.MoreExecutors;
 
 import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
 import org.apache.aurora.common.application.ShutdownRegistry;
-import org.apache.aurora.common.base.Closure;
 import org.apache.aurora.common.inject.TimedInterceptor.Timed;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
@@ -204,8 +204,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
   private final SlidingStats writerWaitStats =
       new SlidingStats("log_storage_write_lock_wait", "ns");
 
-  private final Map<LogEntry._Fields, Closure<LogEntry>> logEntryReplayActions;
-  private final Map<Op._Fields, Closure<Op>> transactionReplayActions;
+  private final Map<LogEntry._Fields, Consumer<LogEntry>> logEntryReplayActions;
+  private final Map<Op._Fields, Consumer<Op>> transactionReplayActions;
 
   @Inject
   LogStorage(
@@ -303,8 +303,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
   }
 
   @VisibleForTesting
-  final Map<LogEntry._Fields, Closure<LogEntry>> buildLogEntryReplayActions() {
-    return ImmutableMap.<LogEntry._Fields, Closure<LogEntry>>builder()
+  final Map<LogEntry._Fields, Consumer<LogEntry>> buildLogEntryReplayActions() {
+    return ImmutableMap.<LogEntry._Fields, Consumer<LogEntry>>builder()
         .put(LogEntry._Fields.SNAPSHOT, logEntry -> {
           Snapshot snapshot = logEntry.getSnapshot();
           LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp()));
@@ -322,8 +322,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
   }
 
   @VisibleForTesting
-  final Map<Op._Fields, Closure<Op>> buildTransactionReplayActions() {
-    return ImmutableMap.<Op._Fields, Closure<Op>>builder()
+  final Map<Op._Fields, Consumer<Op>> buildTransactionReplayActions() {
+    return ImmutableMap.<Op._Fields, Consumer<Op>>builder()
         .put(
             Op._Fields.SAVE_FRAMEWORK_ID,
             op -> writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId()))
@@ -451,7 +451,7 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
       throw new IllegalStateException("Unknown log entry type: " + entryField);
     }
 
-    logEntryReplayActions.get(entryField).execute(logEntry);
+    logEntryReplayActions.get(entryField).accept(logEntry);
   }
 
   private void replayOp(Op op) {
@@ -460,7 +460,7 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
       throw new IllegalStateException("Unknown transaction op: " + opField);
     }
 
-    transactionReplayActions.get(opField).execute(op);
+    transactionReplayActions.get(opField).accept(op);
   }
 
   private void scheduleSnapshots() {

http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
index 76a574f..ea147c0 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
@@ -13,7 +13,8 @@
  */
 package org.apache.aurora.scheduler.storage.log;
 
-import org.apache.aurora.common.base.Closure;
+import java.util.function.Consumer;
+
 import org.apache.aurora.gen.storage.LogEntry;
 import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.scheduler.log.Log;
@@ -24,7 +25,7 @@ import static org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
 
 /**
  * Manages interaction with the log stream.  Log entries can be
- * {@link #readFromBeginning(Closure) read from} the beginning,
+ * {@link #readFromBeginning(Consumer) read from} the beginning,
  * a {@link #startTransaction() transaction} consisting of one or more local storage
  * operations can be committed atomically, or the log can be compacted by
  * {@link #snapshot(org.apache.aurora.gen.storage.Snapshot) snapshotting}.
@@ -39,7 +40,7 @@ public interface StreamManager {
    * @throws InvalidPositionException if the given position is not found in the log.
    * @throws StreamAccessException if there is a problem reading from the log.
    */
-  void readFromBeginning(Closure<LogEntry> reader)
+  void readFromBeginning(Consumer<LogEntry> reader)
       throws CodingException, InvalidPositionException, StreamAccessException;
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
index 766ec2d..baf2647 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
@@ -19,6 +19,7 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 
 import javax.annotation.Nullable;
 import javax.inject.Inject;
@@ -32,7 +33,6 @@ import com.google.common.hash.Hasher;
 import com.google.common.primitives.Bytes;
 import com.google.inject.assistedinject.Assisted;
 
-import org.apache.aurora.common.base.Closure;
 import org.apache.aurora.common.stats.Stats;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.storage.Frame;
@@ -95,7 +95,7 @@ class StreamManagerImpl implements StreamManager {
   }
 
   @Override
-  public void readFromBeginning(Closure<LogEntry> reader)
+  public void readFromBeginning(Consumer<LogEntry> reader)
       throws CodingException, InvalidPositionException, StreamAccessException {
 
     Iterator<Log.Entry> entries = stream.readAll();
@@ -116,7 +116,7 @@ class StreamManagerImpl implements StreamManager {
               snapshotDeduplicator.reduplicate(logEntry.getDeduplicatedSnapshot()));
         }
 
-        reader.execute(logEntry);
+        reader.accept(logEntry);
         vars.entriesRead.incrementAndGet();
       }
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
index 0256c06..7344051 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
@@ -20,6 +20,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.function.Consumer;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
@@ -32,7 +33,6 @@ import com.google.common.hash.Hashing;
 
 import org.apache.aurora.codec.ThriftBinaryCodec;
 import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
-import org.apache.aurora.common.base.Closure;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Data;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
@@ -111,7 +111,7 @@ public class LogManagerTest extends EasyMockTest {
   public void testStreamManagerReadFromUnknownNone() throws CodingException {
     expect(stream.readAll()).andReturn(Iterators.emptyIterator());
 
-    Closure<LogEntry> reader = createMock(new Clazz<Closure<LogEntry>>() { });
+    Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { });
 
     control.replay();
 
@@ -126,8 +126,8 @@ public class LogManagerTest extends EasyMockTest {
     expect(entry1.contents()).andReturn(encode(transaction1));
     expect(stream.readAll()).andReturn(Iterators.singletonIterator(entry1));
 
-    Closure<LogEntry> reader = createMock(new Clazz<Closure<LogEntry>>() { });
-    reader.execute(transaction1);
+    Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { });
+    reader.accept(transaction1);
 
     control.replay();
 
@@ -468,9 +468,9 @@ public class LogManagerTest extends EasyMockTest {
 
     expect(stream.readAll()).andReturn(entries.iterator());
 
-    Closure<LogEntry> reader = createMock(new Clazz<Closure<LogEntry>>() { });
-    reader.execute(transaction1);
-    reader.execute(transaction2);
+    Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { });
+    reader.accept(transaction1);
+    reader.accept(transaction2);
 
     StreamManager streamManager = createStreamManager(message.chunkSize);
     control.replay();
@@ -493,8 +493,8 @@ public class LogManagerTest extends EasyMockTest {
 
     expect(stream.readAll()).andReturn(ImmutableList.of(snapshotEntry).iterator());
 
-    Closure<LogEntry> reader = createMock(new Clazz<Closure<LogEntry>>() { });
-    reader.execute(snapshotLogEntry);
+    Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { });
+    reader.accept(snapshotLogEntry);
 
     control.replay();
 


Mime
View raw message