Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3486B18A08 for ; Tue, 8 Dec 2015 18:27:38 +0000 (UTC) Received: (qmail 20787 invoked by uid 500); 8 Dec 2015 18:27:35 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 20689 invoked by uid 500); 8 Dec 2015 18:27:35 -0000 Mailing-List: contact commits-help@aurora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.apache.org Delivered-To: mailing list commits@aurora.apache.org Received: (qmail 20567 invoked by uid 99); 8 Dec 2015 18:27:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Dec 2015 18:27:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C0FA1E0AD4; Tue, 8 Dec 2015 18:27:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wfarner@apache.org To: commits@aurora.apache.org Date: Tue, 08 Dec 2015 18:27:37 -0000 Message-Id: In-Reply-To: <09cf080f8b4147c7b0113c21c3f27bc7@git.apache.org> References: <09cf080f8b4147c7b0113c21c3f27bc7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/5] aurora git commit: Use lambdas throughout the project. http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java index 29331e0..e19bb27 100644 --- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java +++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java @@ -103,12 +103,7 @@ public class SchedulerLifecycle implements EventSubscriber { DEAD } - private static final Predicate> IS_DEAD = new Predicate>() { - @Override - public boolean apply(Transition state) { - return state.getTo() == State.DEAD; - } - }; + private static final Predicate> IS_DEAD = state -> state.getTo() == State.DEAD; private static final Predicate> NOT_DEAD = Predicates.not(IS_DEAD); @@ -252,24 +247,18 @@ public class SchedulerLifecycle implements EventSubscriber { driver.startAsync().awaitRunning(); delayedActions.onRegistrationTimeout( - new Runnable() { - @Override - public void run() { - if (!registrationAcked.get()) { - LOG.severe( - "Framework has not been registered within the tolerated delay."); - stateMachine.transition(State.DEAD); - } + () -> { + if (!registrationAcked.get()) { + LOG.severe( + "Framework has not been registered within the tolerated delay."); + stateMachine.transition(State.DEAD); } }); delayedActions.onAutoFailover( - new Runnable() { - @Override - public void run() { - LOG.info("Triggering automatic failover."); - stateMachine.transition(State.DEAD); - } + () -> { + LOG.info("Triggering automatic failover."); + stateMachine.transition(State.DEAD); }); } }; @@ -278,13 +267,10 @@ public class SchedulerLifecycle implements EventSubscriber { @Override public void execute(Transition transition) { registrationAcked.set(true); - delayedActions.blockingDriverJoin(new Runnable() { - @Override - public void run() { - driver.blockUntilStopped(); - LOG.info("Driver exited, terminating lifecycle."); - stateMachine.transition(State.DEAD); - } + delayedActions.blockingDriverJoin(() -> { + driver.blockUntilStopped(); + LOG.info("Driver exited, terminating lifecycle."); + stateMachine.transition(State.DEAD); }); // TODO(ksweeney): Extract leader advertisement to its own service. @@ -365,16 +351,13 @@ public class SchedulerLifecycle implements EventSubscriber { } private Closure> dieOnError(final Closure> closure) { - return new Closure>() { - @Override - public void execute(Transition transition) { - try { - closure.execute(transition); - } catch (RuntimeException e) { - LOG.log(Level.SEVERE, "Caught unchecked exception: " + e, e); - stateMachine.transition(State.DEAD); - throw e; - } + return transition -> { + try { + closure.execute(transition); + } catch (RuntimeException e) { + LOG.log(Level.SEVERE, "Caught unchecked exception: " + e, e); + stateMachine.transition(State.DEAD); + throw e; } }; } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java index 6d940d1..fb1696a 100644 --- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java +++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java @@ -24,7 +24,6 @@ import java.util.logging.Logger; import javax.inject.Singleton; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.io.Files; @@ -131,15 +130,12 @@ public class SchedulerModule extends AbstractModule { @VisibleForTesting static TierConfig parseTierConfig(Optional config) { - Optional map = config.transform(new Function() { - @Override - public TierConfig apply(String input) { - try { - return new ObjectMapper().readValue(input, TierConfig.class); - } catch (IOException e) { - LOG.severe("Error parsing tier configuration file."); - throw Throwables.propagate(e); - } + Optional map = config.transform(input -> { + try { + return new ObjectMapper().readValue(input, TierConfig.class); + } catch (IOException e) { + LOG.severe("Error parsing tier configuration file."); + throw Throwables.propagate(e); } }); return map.or(TierConfig.EMPTY); http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/TaskStatusHandlerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/TaskStatusHandlerImpl.java b/src/main/java/org/apache/aurora/scheduler/TaskStatusHandlerImpl.java index 3112579..09be688 100644 --- a/src/main/java/org/apache/aurora/scheduler/TaskStatusHandlerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/TaskStatusHandlerImpl.java @@ -38,6 +38,7 @@ import org.apache.aurora.scheduler.state.StateChangeResult; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.stats.CachedCounters; import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.mesos.Protos.TaskStatus; import static java.lang.annotation.ElementType.FIELD; @@ -147,22 +148,19 @@ public class TaskStatusHandlerImpl extends AbstractExecutionThreadService pendingUpdates.drainTo(updates, maxBatchSize - updates.size()); try { - storage.write(new Storage.MutateWork.NoResult.Quiet() { - @Override - public void execute(Storage.MutableStoreProvider storeProvider) { - for (TaskStatus status : updates) { - ScheduleStatus translatedState = Conversions.convertProtoState(status.getState()); - - StateChangeResult result = stateManager.changeState( - storeProvider, - status.getTaskId().getValue(), - Optional.absent(), - translatedState, - formatMessage(status)); - - if (status.hasReason()) { - counters.get(statName(status, result)).incrementAndGet(); - } + storage.write((NoResult.Quiet) (Storage.MutableStoreProvider storeProvider) -> { + for (TaskStatus status : updates) { + ScheduleStatus translatedState = Conversions.convertProtoState(status.getState()); + + StateChangeResult result = stateManager.changeState( + storeProvider, + status.getTaskId().getValue(), + Optional.absent(), + translatedState, + formatMessage(status)); + + if (status.hasReason()) { + counters.get(statName(status, result)).incrementAndGet(); } } }); http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/TaskVars.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/TaskVars.java b/src/main/java/org/apache/aurora/scheduler/TaskVars.java index da9f222..8243098 100644 --- a/src/main/java/org/apache/aurora/scheduler/TaskVars.java +++ b/src/main/java/org/apache/aurora/scheduler/TaskVars.java @@ -46,8 +46,6 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.StoreProvider; -import org.apache.aurora.scheduler.storage.Storage.Work; import org.apache.aurora.scheduler.storage.entities.IAttribute; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; @@ -112,20 +110,10 @@ class TaskVars extends AbstractIdleService implements EventSubscriber { JobKeys.canonicalString(task.getAssignedTask().getTask().getJob())); } - private static final Predicate IS_RACK = new Predicate() { - @Override - public boolean apply(IAttribute attr) { - return "rack".equals(attr.getName()); - } - }; + private static final Predicate IS_RACK = attr -> "rack".equals(attr.getName()); private static final Function ATTR_VALUE = - new Function() { - @Override - public String apply(IAttribute attr) { - return Iterables.getOnlyElement(attr.getValues()); - } - }; + attr -> Iterables.getOnlyElement(attr.getValues()); private Counter getCounter(ScheduleStatus status) { return counters.getUnchecked(getVarName(status)); @@ -145,14 +133,11 @@ class TaskVars extends AbstractIdleService implements EventSubscriber { if (Strings.isNullOrEmpty(task.getAssignedTask().getSlaveHost())) { rack = Optional.absent(); } else { - rack = storage.read(new Work.Quiet>() { - @Override - public Optional apply(StoreProvider storeProvider) { - Optional rack = FluentIterable - .from(AttributeStore.Util.attributesOrNone(storeProvider, host)) - .firstMatch(IS_RACK); - return rack.transform(ATTR_VALUE); - } + rack = storage.read(storeProvider -> { + Optional rack1 = FluentIterable + .from(AttributeStore.Util.attributesOrNone(storeProvider, host)) + .firstMatch(IS_RACK); + return rack1.transform(ATTR_VALUE); }); } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java index 54814b2..ff886d9 100644 --- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java +++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java @@ -13,7 +13,6 @@ */ package org.apache.aurora.scheduler.app; -import java.lang.Thread.UncaughtExceptionHandler; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.List; @@ -167,12 +166,9 @@ public class SchedulerMain { @VisibleForTesting public static void flagConfiguredMain(Module appEnvironmentModule) { AtomicLong uncaughtExceptions = Stats.exportLong("uncaught_exceptions"); - Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - uncaughtExceptions.incrementAndGet(); - LOG.log(Level.SEVERE, "Uncaught exception from " + t + ":" + e, e); - } + Thread.setDefaultUncaughtExceptionHandler((t, e) -> { + uncaughtExceptions.incrementAndGet(); + LOG.log(Level.SEVERE, "Uncaught exception from " + t + ":" + e, e); }); ClientConfig zkClientConfig = FlaggedClientConfig.create(); http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java index eccb864..848cb54 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java +++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java @@ -120,7 +120,7 @@ public class AsyncModule extends AbstractModule { statsProvider.makeGauge(ASYNC_TASKS_GAUGE, executor::getCompletedTaskCount); // Using a lambda rather than method ref to sidestep a bug in PMD that makes it think // delayExecutor is unused. - statsProvider.makeGauge(DELAY_QUEUE_GAUGE, () -> delayExecutor.getQueueSize()); + statsProvider.makeGauge(DELAY_QUEUE_GAUGE, delayExecutor::getQueueSize); } @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/base/Conversions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/base/Conversions.java b/src/main/java/org/apache/aurora/scheduler/base/Conversions.java index ad66cd8..91171b1 100644 --- a/src/main/java/org/apache/aurora/scheduler/base/Conversions.java +++ b/src/main/java/org/apache/aurora/scheduler/base/Conversions.java @@ -72,12 +72,7 @@ public final class Conversions { } private static final Function ATTRIBUTE_NAME = - new Function() { - @Override - public String apply(Protos.Attribute attr) { - return attr.getName(); - } - }; + Protos.Attribute::getName; /** * Typedef to make anonymous implementation more concise. @@ -87,34 +82,28 @@ public final class Conversions { } private static final Function VALUE_CONVERTER = - new Function() { - @Override - public String apply(Protos.Attribute attribute) { - switch (attribute.getType()) { - case SCALAR: - return String.valueOf(attribute.getScalar().getValue()); - - case TEXT: - return attribute.getText().getValue(); - - default: - LOG.finest("Unrecognized attribute type:" + attribute.getType() + " , ignoring."); - return null; - } + attribute -> { + switch (attribute.getType()) { + case SCALAR: + return String.valueOf(attribute.getScalar().getValue()); + + case TEXT: + return attribute.getText().getValue(); + + default: + LOG.finest("Unrecognized attribute type:" + attribute.getType() + " , ignoring."); + return null; } }; - private static final AttributeConverter ATTRIBUTE_CONVERTER = new AttributeConverter() { - @Override - public Attribute apply(Entry> entry) { - // Convert values and filter any that were ignored. - return new Attribute( - entry.getKey(), - FluentIterable.from(entry.getValue()) - .transform(VALUE_CONVERTER) - .filter(Predicates.notNull()) - .toSet()); - } + private static final AttributeConverter ATTRIBUTE_CONVERTER = entry -> { + // Convert values and filter any that were ignored. + return new Attribute( + entry.getKey(), + FluentIterable.from(entry.getValue()) + .transform(VALUE_CONVERTER) + .filter(Predicates.notNull()) + .toSet()); }; /** http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorModule.java b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorModule.java index 84e6be5..d047634 100644 --- a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorModule.java +++ b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorModule.java @@ -17,7 +17,6 @@ import java.util.List; import java.util.Optional; import java.util.stream.Stream; -import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.inject.AbstractModule; @@ -110,16 +109,11 @@ public class ExecutorModule extends AbstractModule { .build()) .addAll(Iterables.transform( GLOBAL_CONTAINER_MOUNTS.get(), - new Function() { - @Override - public Protos.Volume apply(Volume v) { - return Protos.Volume.newBuilder() - .setHostPath(v.getHostPath()) - .setContainerPath(v.getContainerPath()) - .setMode(Protos.Volume.Mode.valueOf(v.getMode().getValue())) - .build(); - } - })) + v -> Protos.Volume.newBuilder() + .setHostPath(v.getHostPath()) + .setContainerPath(v.getContainerPath()) + .setMode(Protos.Volume.Mode.valueOf(v.getMode().getValue())) + .build())) .build(); bind(ExecutorSettings.class).toInstance(new ExecutorSettings( http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java index 9b71802..9eda6e9 100644 --- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java +++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java @@ -25,7 +25,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.collect.Iterables; -import org.apache.aurora.common.base.ExceptionalSupplier; import org.apache.aurora.common.stats.Stats; import org.apache.aurora.common.util.BackoffHelper; import org.apache.aurora.gen.CronCollisionPolicy; @@ -37,6 +36,8 @@ import org.apache.aurora.scheduler.cron.CronException; import org.apache.aurora.scheduler.cron.SanitizedCronJob; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.MutateWork; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; @@ -113,62 +114,59 @@ class AuroraCronJob implements Job { final String path = JobKeys.canonicalString(key); final Optional deferredLaunch = storage.write( - new Storage.MutateWork.Quiet>() { - @Override - public Optional apply(Storage.MutableStoreProvider storeProvider) { - Optional config = storeProvider.getCronJobStore().fetchJob(key); - if (!config.isPresent()) { - LOG.warning(String.format( - "Cron was triggered for %s but no job with that key was found in storage.", - path)); - CRON_JOB_MISFIRES.incrementAndGet(); - return Optional.absent(); - } - - SanitizedCronJob cronJob; - try { - cronJob = SanitizedCronJob.fromUnsanitized(config.get()); - } catch (ConfigurationManager.TaskDescriptionException | CronException e) { - LOG.warning(String.format( - "Invalid cron job for %s in storage - failed to parse with %s", key, e)); - CRON_JOB_PARSE_FAILURES.incrementAndGet(); - return Optional.absent(); - } + (MutateWork.Quiet>) storeProvider -> { + Optional config = storeProvider.getCronJobStore().fetchJob(key); + if (!config.isPresent()) { + LOG.warning(String.format( + "Cron was triggered for %s but no job with that key was found in storage.", + path)); + CRON_JOB_MISFIRES.incrementAndGet(); + return Optional.absent(); + } - CronCollisionPolicy collisionPolicy = cronJob.getCronCollisionPolicy(); - LOG.info(String.format( - "Cron triggered for %s at %s with policy %s", path, new Date(), collisionPolicy)); - CRON_JOB_TRIGGERS.incrementAndGet(); + SanitizedCronJob cronJob; + try { + cronJob = SanitizedCronJob.fromUnsanitized(config.get()); + } catch (ConfigurationManager.TaskDescriptionException | CronException e) { + LOG.warning(String.format( + "Invalid cron job for %s in storage - failed to parse with %s", key, e)); + CRON_JOB_PARSE_FAILURES.incrementAndGet(); + return Optional.absent(); + } - final Query.Builder activeQuery = Query.jobScoped(key).active(); - Set activeTasks = - Tasks.ids(storeProvider.getTaskStore().fetchTasks(activeQuery)); + CronCollisionPolicy collisionPolicy = cronJob.getCronCollisionPolicy(); + LOG.info(String.format( + "Cron triggered for %s at %s with policy %s", path, new Date(), collisionPolicy)); + CRON_JOB_TRIGGERS.incrementAndGet(); - ITaskConfig task = cronJob.getSanitizedConfig().getJobConfig().getTaskConfig(); - Set instanceIds = cronJob.getSanitizedConfig().getInstanceIds(); - if (activeTasks.isEmpty()) { - stateManager.insertPendingTasks(storeProvider, task, instanceIds); + final Query.Builder activeQuery = Query.jobScoped(key).active(); + Set activeTasks = + Tasks.ids(storeProvider.getTaskStore().fetchTasks(activeQuery)); - return Optional.absent(); - } + ITaskConfig task = cronJob.getSanitizedConfig().getJobConfig().getTaskConfig(); + Set instanceIds = cronJob.getSanitizedConfig().getInstanceIds(); + if (activeTasks.isEmpty()) { + stateManager.insertPendingTasks(storeProvider, task, instanceIds); - CRON_JOB_COLLISIONS.incrementAndGet(); - switch (collisionPolicy) { - case KILL_EXISTING: - return Optional.of(new DeferredLaunch(task, instanceIds, activeTasks)); + return Optional.absent(); + } - case RUN_OVERLAP: - LOG.severe(String.format("Ignoring trigger for job %s with deprecated collision" - + "policy RUN_OVERLAP due to unterminated active tasks.", path)); - return Optional.absent(); + CRON_JOB_COLLISIONS.incrementAndGet(); + switch (collisionPolicy) { + case KILL_EXISTING: + return Optional.of(new DeferredLaunch(task, instanceIds, activeTasks)); - case CANCEL_NEW: - return Optional.absent(); + case RUN_OVERLAP: + LOG.severe(String.format("Ignoring trigger for job %s with deprecated collision" + + "policy RUN_OVERLAP due to unterminated active tasks.", path)); + return Optional.absent(); - default: - LOG.severe("Unrecognized cron collision policy: " + collisionPolicy); - return Optional.absent(); - } + case CANCEL_NEW: + return Optional.absent(); + + default: + LOG.severe("Unrecognized cron collision policy: " + collisionPolicy); + return Optional.absent(); } } ); @@ -177,17 +175,14 @@ class AuroraCronJob implements Job { return; } - storage.write(new Storage.MutateWork.NoResult.Quiet() { - @Override - public void execute(Storage.MutableStoreProvider storeProvider) { - for (String taskId : deferredLaunch.get().activeTaskIds) { - stateManager.changeState( - storeProvider, - taskId, - Optional.absent(), - KILLING, - KILL_AUDIT_MESSAGE); - } + storage.write((NoResult.Quiet) (Storage.MutableStoreProvider storeProvider) -> { + for (String taskId : deferredLaunch.get().activeTaskIds) { + stateManager.changeState( + storeProvider, + taskId, + Optional.absent(), + KILLING, + KILL_AUDIT_MESSAGE); } }); @@ -197,26 +192,18 @@ class AuroraCronJob implements Job { try { // NOTE: We block the quartz execution thread here until we've successfully killed our // ancestor. We mitigate this by using a cached thread pool for quartz. - delayedStartBackoff.doUntilSuccess(new ExceptionalSupplier() { - @Override - public Boolean get() { - if (Iterables.isEmpty(Storage.Util.fetchTasks(storage, query))) { - LOG.info("Initiating delayed launch of cron " + path); - storage.write(new Storage.MutateWork.NoResult.Quiet() { - @Override - public void execute(Storage.MutableStoreProvider storeProvider) { - stateManager.insertPendingTasks( - storeProvider, - deferredLaunch.get().task, - deferredLaunch.get().instanceIds); - } - }); - - return true; - } else { - LOG.info("Not yet safe to run cron " + path); - return false; - } + delayedStartBackoff.doUntilSuccess(() -> { + if (Iterables.isEmpty(Storage.Util.fetchTasks(storage, query))) { + LOG.info("Initiating delayed launch of cron " + path); + storage.write((NoResult.Quiet) storeProvider -> stateManager.insertPendingTasks( + storeProvider, + deferredLaunch.get().task, + deferredLaunch.get().instanceIds)); + + return true; + } else { + LOG.info("Not yet safe to run cron " + path); + return false; } }); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java index e377fd8..675e73d 100644 --- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImpl.java @@ -33,8 +33,7 @@ import org.apache.aurora.scheduler.cron.CrontabEntry; import org.apache.aurora.scheduler.cron.SanitizedCronJob; import org.apache.aurora.scheduler.storage.CronJobStore; import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.MutateWork; -import org.apache.aurora.scheduler.storage.Storage.Work; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.quartz.CronTrigger; @@ -69,13 +68,10 @@ class CronJobManagerImpl implements CronJobManager { public void startJobNow(final IJobKey jobKey) throws CronException { requireNonNull(jobKey); - storage.read(new Work() { - @Override - public Void apply(Storage.StoreProvider storeProvider) throws CronException { - checkCronExists(jobKey, storeProvider.getCronJobStore()); - triggerJob(jobKey); - return null; - } + storage.read(storeProvider -> { + checkCronExists(jobKey, storeProvider.getCronJobStore()); + triggerJob(jobKey); + return null; }); } @@ -103,16 +99,13 @@ class CronJobManagerImpl implements CronJobManager { checkNoRunOverlap(config); final IJobKey jobKey = config.getSanitizedConfig().getJobConfig().getKey(); - storage.write(new MutateWork.NoResult() { - @Override - public void execute(Storage.MutableStoreProvider storeProvider) throws CronException { - checkCronExists(jobKey, storeProvider.getCronJobStore()); - - removeJob(jobKey, storeProvider.getCronJobStore()); - descheduleJob(jobKey); - saveJob(config, storeProvider.getCronJobStore()); - scheduleJob(config.getCrontabEntry(), jobKey); - } + storage.write((NoResult) (Storage.MutableStoreProvider storeProvider) -> { + checkCronExists(jobKey, storeProvider.getCronJobStore()); + + removeJob(jobKey, storeProvider.getCronJobStore()); + descheduleJob(jobKey); + saveJob(config, storeProvider.getCronJobStore()); + scheduleJob(config.getCrontabEntry(), jobKey); }); } @@ -122,14 +115,11 @@ class CronJobManagerImpl implements CronJobManager { checkNoRunOverlap(cronJob); final IJobKey jobKey = cronJob.getSanitizedConfig().getJobConfig().getKey(); - storage.write(new MutateWork.NoResult() { - @Override - public void execute(Storage.MutableStoreProvider storeProvider) throws CronException { - checkNotExists(jobKey, storeProvider.getCronJobStore()); + storage.write((NoResult) (Storage.MutableStoreProvider storeProvider) -> { + checkNotExists(jobKey, storeProvider.getCronJobStore()); - saveJob(cronJob, storeProvider.getCronJobStore()); - scheduleJob(cronJob.getCrontabEntry(), jobKey); - } + saveJob(cronJob, storeProvider.getCronJobStore()); + scheduleJob(cronJob.getCrontabEntry(), jobKey); }); } @@ -173,17 +163,14 @@ class CronJobManagerImpl implements CronJobManager { public boolean deleteJob(final IJobKey jobKey) { requireNonNull(jobKey); - return storage.write(new MutateWork.Quiet() { - @Override - public Boolean apply(Storage.MutableStoreProvider storeProvider) { - if (!storeProvider.getCronJobStore().fetchJob(jobKey).isPresent()) { - return false; - } - - removeJob(jobKey, storeProvider.getCronJobStore()); - descheduleJob(jobKey); - return true; + return storage.write(storeProvider -> { + if (!storeProvider.getCronJobStore().fetchJob(jobKey).isPresent()) { + return false; } + + removeJob(jobKey, storeProvider.getCronJobStore()); + descheduleJob(jobKey); + return true; }); } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronSchedulerImpl.java index a71676e..016fee1 100644 --- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronSchedulerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronSchedulerImpl.java @@ -18,7 +18,6 @@ import java.util.logging.Logger; import javax.inject.Inject; -import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.FluentIterable; import com.google.common.collect.Iterables; @@ -53,12 +52,7 @@ class CronSchedulerImpl implements CronScheduler { return Optional.of(Iterables.getOnlyElement( FluentIterable.from(scheduler.getTriggersOfJob(jobKey(jobKey))) .filter(CronTrigger.class) - .transform(new Function() { - @Override - public CrontabEntry apply(CronTrigger trigger) { - return Quartz.crontabEntry(trigger); - } - }))); + .transform(Quartz::crontabEntry))); } catch (SchedulerException e) { LOG.log(Level.SEVERE, "Error reading job " + JobKeys.canonicalString(jobKey) + " cronExpression Quartz: " + e, http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java index 2065c45..e9e0206 100644 --- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java +++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java @@ -27,8 +27,6 @@ import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.DeadEvent; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; -import com.google.common.eventbus.SubscriberExceptionContext; -import com.google.common.eventbus.SubscriberExceptionHandler; import com.google.common.util.concurrent.AbstractIdleService; import com.google.inject.AbstractModule; import com.google.inject.Binder; @@ -81,15 +79,12 @@ public final class PubsubEventModule extends AbstractModule { final AtomicLong subscriberExceptions = statsProvider.makeCounter(EXCEPTIONS_STAT); EventBus eventBus = new AsyncEventBus( executor, - new SubscriberExceptionHandler() { - @Override - public void handleException(Throwable exception, SubscriberExceptionContext context) { - subscriberExceptions.incrementAndGet(); - log.log( - Level.SEVERE, - "Failed to dispatch event to " + context.getSubscriberMethod() + ": " + exception, - exception); - } + (exception, context) -> { + subscriberExceptions.incrementAndGet(); + log.log( + Level.SEVERE, + "Failed to dispatch event to " + context.getSubscriberMethod() + ": " + exception, + exception); } ); @@ -100,12 +95,7 @@ public final class PubsubEventModule extends AbstractModule { @Provides @Singleton EventSink provideEventSink(EventBus eventBus) { - return new EventSink() { - @Override - public void post(PubsubEvent event) { - eventBus.post(event); - } - }; + return eventBus::post; } private class DeadEventHandler { http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java index 5137679..87b9e19 100644 --- a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java +++ b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java @@ -64,13 +64,8 @@ public final class AttributeAggregate { final IJobKey jobKey) { return create( - new Supplier>() { - @Override - public Iterable get() { - return storeProvider.getTaskStore() - .fetchTasks(Query.jobScoped(jobKey).byStatus(Tasks.SLAVE_ASSIGNED_STATES)); - } - }, + () -> storeProvider.getTaskStore() + .fetchTasks(Query.jobScoped(jobKey).byStatus(Tasks.SLAVE_ASSIGNED_STATES)), storeProvider.getAttributeStore()); } @@ -80,43 +75,32 @@ public final class AttributeAggregate { final AttributeStore attributeStore) { final Function> getHostAttributes = - new Function>() { - @Override - public Iterable apply(String host) { - // Note: this assumes we have access to attributes for hosts where all active tasks - // reside. - requireNonNull(host); - return attributeStore.getHostAttributes(host).get().getAttributes(); - } + host -> { + // Note: this assumes we have access to attributes for hosts where all active tasks + // reside. + requireNonNull(host); + return attributeStore.getHostAttributes(host).get().getAttributes(); }; return create(Suppliers.compose( - new Function, Iterable>() { - @Override - public Iterable apply(Iterable tasks) { - return FluentIterable.from(tasks) - .transform(Tasks::scheduledToSlaveHost) - .transformAndConcat(getHostAttributes); - } - }, + tasks -> FluentIterable.from(tasks) + .transform(Tasks::scheduledToSlaveHost) + .transformAndConcat(getHostAttributes), taskSupplier)); } @VisibleForTesting static AttributeAggregate create(Supplier> attributes) { Supplier>> aggregator = Suppliers.compose( - new Function, Multiset>>() { - @Override - public Multiset> apply(Iterable attributes) { - ImmutableMultiset.Builder> builder = ImmutableMultiset.builder(); - for (IAttribute attribute : attributes) { - for (String value : attribute.getValues()) { - builder.add(Pair.of(attribute.getName(), value)); - } + attributes1 -> { + ImmutableMultiset.Builder> builder = ImmutableMultiset.builder(); + for (IAttribute attribute : attributes1) { + for (String value : attribute.getValues()) { + builder.add(Pair.of(attribute.getName(), value)); } - - return builder.build(); } + + return builder.build(); }, attributes ); http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/filter/ConstraintMatcher.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/filter/ConstraintMatcher.java b/src/main/java/org/apache/aurora/scheduler/filter/ConstraintMatcher.java index 2b47821..d4099c2 100644 --- a/src/main/java/org/apache/aurora/scheduler/filter/ConstraintMatcher.java +++ b/src/main/java/org/apache/aurora/scheduler/filter/ConstraintMatcher.java @@ -37,12 +37,7 @@ final class ConstraintMatcher { } private static final Function> GET_VALUES = - new Function>() { - @Override - public Set apply(IAttribute attribute) { - return attribute.getValues(); - } - }; + IAttribute::getValues; /** * Gets the veto (if any) for a scheduling constraint based on the {@link AttributeAggregate} this http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/http/Locks.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/Locks.java b/src/main/java/org/apache/aurora/scheduler/http/Locks.java index e275dd7..0931289 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/Locks.java +++ b/src/main/java/org/apache/aurora/scheduler/http/Locks.java @@ -56,21 +56,12 @@ public class Locks { Maps.uniqueIndex(lockManager.getLocks(), TO_LOCK_KEY), TO_BEAN)).build(); } - private static final Function TO_LOCK_KEY = new Function() { - @Override - public String apply(ILock lock) { - return lock.getKey().getSetField() == LockKey._Fields.JOB + private static final Function TO_LOCK_KEY = + lock -> lock.getKey().getSetField() == LockKey._Fields.JOB ? JobKeys.canonicalString(lock.getKey().getJob()) : "Unknown lock key type: " + lock.getKey().getSetField(); - } - }; - private static final Function TO_BEAN = new Function() { - @Override - public LockBean apply(ILock lock) { - return new LockBean(lock); - } - }; + private static final Function TO_BEAN = LockBean::new; private static final class LockBean { private final ILock lock; http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java b/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java index 5268759..72c8c3e 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java +++ b/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java @@ -34,7 +34,6 @@ import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.StoreProvider; -import org.apache.aurora.scheduler.storage.Storage.Work; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; @@ -57,22 +56,19 @@ public class Maintenance { @GET @Produces(MediaType.APPLICATION_JSON) public Response getHosts() { - return storage.read(new Work.Quiet() { - @Override - public Response apply(StoreProvider storeProvider) { - Multimap hostsByMode = - Multimaps.transformValues( - Multimaps.index( - storeProvider.getAttributeStore().getHostAttributes(), - IHostAttributes::getMode), - HOST_NAME); + return storage.read(storeProvider -> { + Multimap hostsByMode = + Multimaps.transformValues( + Multimaps.index( + storeProvider.getAttributeStore().getHostAttributes(), + IHostAttributes::getMode), + HOST_NAME); - Map hosts = Maps.newHashMap(); - hosts.put(DRAINED, ImmutableSet.copyOf(hostsByMode.get(DRAINED))); - hosts.put(SCHEDULED, ImmutableSet.copyOf(hostsByMode.get(SCHEDULED))); - hosts.put(DRAINING, getTasksByHosts(storeProvider, hostsByMode.get(DRAINING)).asMap()); - return Response.ok(hosts).build(); - } + Map hosts = Maps.newHashMap(); + hosts.put(DRAINED, ImmutableSet.copyOf(hostsByMode.get(DRAINED))); + hosts.put(SCHEDULED, ImmutableSet.copyOf(hostsByMode.get(SCHEDULED))); + hosts.put(DRAINING, getTasksByHosts(storeProvider, hostsByMode.get(DRAINING)).asMap()); + return Response.ok(hosts).build(); }); } @@ -85,10 +81,5 @@ public class Maintenance { } private static final Function HOST_NAME = - new Function() { - @Override - public String apply(IHostAttributes attributes) { - return attributes.getHost(); - } - }; + IHostAttributes::getHost; } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/http/Offers.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/Offers.java b/src/main/java/org/apache/aurora/scheduler/http/Offers.java index 15f1582..80f0824 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/Offers.java +++ b/src/main/java/org/apache/aurora/scheduler/http/Offers.java @@ -61,63 +61,47 @@ public class Offers { FluentIterable.from(offerManager.getOffers()).transform(TO_BEAN).toList()).build(); } - private static final Function EXECUTOR_ID_TOSTRING = - new Function() { - @Override - public String apply(ExecutorID id) { - return id.getValue(); - } - }; + private static final Function EXECUTOR_ID_TOSTRING = ExecutorID::getValue; - private static final Function RANGE_TO_BEAN = new Function() { - @Override - public Object apply(Range range) { - return range.getBegin() + "-" + range.getEnd(); - } - }; + private static final Function RANGE_TO_BEAN = + range -> range.getBegin() + "-" + range.getEnd(); private static final Function ATTRIBUTE_TO_BEAN = - new Function() { - @Override - public Object apply(Attribute attr) { - ImmutableMap.Builder builder = ImmutableMap.builder(); - builder.put("name", attr.getName()); - if (attr.hasScalar()) { - builder.put("scalar", attr.getScalar().getValue()); - } - if (attr.hasRanges()) { - builder.put("ranges", immutable(attr.getRanges().getRangeList(), RANGE_TO_BEAN)); - } - if (attr.hasSet()) { - builder.put("set", attr.getSet().getItemList()); - } - if (attr.hasText()) { - builder.put("text", attr.getText().getValue()); - } - return builder.build(); + attr -> { + ImmutableMap.Builder builder = ImmutableMap.builder(); + builder.put("name", attr.getName()); + if (attr.hasScalar()) { + builder.put("scalar", attr.getScalar().getValue()); + } + if (attr.hasRanges()) { + builder.put("ranges", immutable(attr.getRanges().getRangeList(), RANGE_TO_BEAN)); + } + if (attr.hasSet()) { + builder.put("set", attr.getSet().getItemList()); + } + if (attr.hasText()) { + builder.put("text", attr.getText().getValue()); } + return builder.build(); }; private static final Function RESOURCE_TO_BEAN = - new Function() { - @Override - public Object apply(Resource resource) { - ImmutableMap.Builder builder = ImmutableMap.builder(); - builder.put("name", resource.getName()); - if (resource.hasScalar()) { - builder.put("scalar", resource.getScalar().getValue()); - } - if (resource.hasRanges()) { - builder.put("ranges", immutable(resource.getRanges().getRangeList(), RANGE_TO_BEAN)); - } - if (resource.hasSet()) { - builder.put("set", resource.getSet().getItemList()); - } - if (resource.hasRevocable()) { - builder.put("revocable", "true"); - } - return builder.build(); + resource -> { + ImmutableMap.Builder builder = ImmutableMap.builder(); + builder.put("name", resource.getName()); + if (resource.hasScalar()) { + builder.put("scalar", resource.getScalar().getValue()); } + if (resource.hasRanges()) { + builder.put("ranges", immutable(resource.getRanges().getRangeList(), RANGE_TO_BEAN)); + } + if (resource.hasSet()) { + builder.put("set", resource.getSet().getItemList()); + } + if (resource.hasRevocable()) { + builder.put("revocable", "true"); + } + return builder.build(); }; private static Iterable immutable(Iterable iterable, Function transform) { @@ -125,19 +109,16 @@ public class Offers { } private static final Function> TO_BEAN = - new Function>() { - @Override - public Map apply(HostOffer hostOffer) { - Offer offer = hostOffer.getOffer(); - return ImmutableMap.builder() - .put("id", offer.getId().getValue()) - .put("framework_id", offer.getFrameworkId().getValue()) - .put("slave_id", offer.getSlaveId().getValue()) - .put("hostname", offer.getHostname()) - .put("resources", immutable(offer.getResourcesList(), RESOURCE_TO_BEAN)) - .put("attributes", immutable(offer.getAttributesList(), ATTRIBUTE_TO_BEAN)) - .put("executor_ids", immutable(offer.getExecutorIdsList(), EXECUTOR_ID_TOSTRING)) - .build(); - } + hostOffer -> { + Offer offer = hostOffer.getOffer(); + return ImmutableMap.builder() + .put("id", offer.getId().getValue()) + .put("framework_id", offer.getFrameworkId().getValue()) + .put("slave_id", offer.getSlaveId().getValue()) + .put("hostname", offer.getHostname()) + .put("resources", immutable(offer.getResourcesList(), RESOURCE_TO_BEAN)) + .put("attributes", immutable(offer.getAttributesList(), ATTRIBUTE_TO_BEAN)) + .put("executor_ids", immutable(offer.getExecutorIdsList(), EXECUTOR_ID_TOSTRING)) + .build(); }; } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/http/Quotas.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/Quotas.java b/src/main/java/org/apache/aurora/scheduler/http/Quotas.java index e1bf0cb..399203b 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/Quotas.java +++ b/src/main/java/org/apache/aurora/scheduler/http/Quotas.java @@ -30,8 +30,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.StoreProvider; -import org.apache.aurora.scheduler.storage.Storage.Work; import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; import org.codehaus.jackson.annotate.JsonProperty; @@ -56,33 +54,25 @@ public class Quotas { @GET @Produces(MediaType.APPLICATION_JSON) public Response getQuotas(@QueryParam("role") final String role) { - return storage.read(new Work.Quiet() { - @Override - public Response apply(StoreProvider storeProvider) { - Map quotas; - if (role == null) { - quotas = storeProvider.getQuotaStore().fetchQuotas(); + return storage.read(storeProvider -> { + Map quotas; + if (role == null) { + quotas = storeProvider.getQuotaStore().fetchQuotas(); + } else { + Optional quota = storeProvider.getQuotaStore().fetchQuota(role); + if (quota.isPresent()) { + quotas = ImmutableMap.of(role, quota.get()); } else { - Optional quota = storeProvider.getQuotaStore().fetchQuota(role); - if (quota.isPresent()) { - quotas = ImmutableMap.of(role, quota.get()); - } else { - quotas = ImmutableMap.of(); - } + quotas = ImmutableMap.of(); } - - return Response.ok(Maps.transformValues(quotas, TO_BEAN)).build(); } + + return Response.ok(Maps.transformValues(quotas, TO_BEAN)).build(); }); } private static final Function TO_BEAN = - new Function() { - @Override - public ResourceAggregateBean apply(IResourceAggregate quota) { - return new ResourceAggregateBean(quota.getNumCpus(), quota.getRamMb(), quota.getDiskMb()); - } - }; + quota -> new ResourceAggregateBean(quota.getNumCpus(), quota.getRamMb(), quota.getDiskMb()); private static final class ResourceAggregateBean { private final double cpu; http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/http/Services.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/Services.java b/src/main/java/org/apache/aurora/scheduler/http/Services.java index 7183370..08e4915 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/Services.java +++ b/src/main/java/org/apache/aurora/scheduler/http/Services.java @@ -49,18 +49,15 @@ public final class Services { } private static final Function> SERVICE_TO_BEAN = - new Function>() { - @Override - public Map apply(Service service) { - State state = service.state(); - ImmutableMap.Builder bean = ImmutableMap.builder(); - bean.put("name", service.getClass().getSimpleName()); - bean.put("state", state); - if (state == State.FAILED) { - bean.put("failureCause", service.failureCause().toString()); - } - return bean.build(); + service -> { + State state = service.state(); + ImmutableMap.Builder bean = ImmutableMap.builder(); + bean.put("name", service.getClass().getSimpleName()); + bean.put("state", state); + if (state == State.FAILED) { + bean.put("failureCause", service.failureCause().toString()); } + return bean.build(); }; @GET @@ -68,12 +65,7 @@ public final class Services { public Response getServices() { return Response.ok() .entity(FluentIterable.from(serviceManagers) - .transformAndConcat(new Function>() { - @Override - public Iterable apply(ServiceManagerIface input) { - return input.servicesByState().values(); - } - }) + .transformAndConcat(input -> input.servicesByState().values()) .transform(SERVICE_TO_BEAN) .toList()) .build(); http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/http/Slaves.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/Slaves.java b/src/main/java/org/apache/aurora/scheduler/http/Slaves.java index a9fef25..f63fb7b 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/Slaves.java +++ b/src/main/java/org/apache/aurora/scheduler/http/Slaves.java @@ -26,8 +26,6 @@ import com.google.common.collect.FluentIterable; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; -import org.antlr.stringtemplate.StringTemplate; -import org.apache.aurora.common.base.Closure; import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IAttribute; @@ -38,9 +36,6 @@ import static java.util.Objects.requireNonNull; import static org.apache.aurora.common.base.MorePreconditions.checkNotBlank; -import static org.apache.aurora.scheduler.storage.Storage.StoreProvider; -import static org.apache.aurora.scheduler.storage.Storage.Work; - /** * HTTP interface to serve as a HUD for the mesos slaves tracked in the scheduler. */ @@ -63,21 +58,11 @@ public class Slaves extends JerseyTemplateServlet { } private Iterable getHostAttributes() { - return storage.read(new Work.Quiet>() { - @Override - public Iterable apply(StoreProvider storeProvider) { - return storeProvider.getAttributeStore().getHostAttributes(); - } - }); + return storage.read(storeProvider -> storeProvider.getAttributeStore().getHostAttributes()); } private static final Function TO_SLAVE = - new Function() { - @Override - public Slave apply(IHostAttributes attributes) { - return new Slave(attributes); - } - }; + Slave::new; /** * Fetches the listing of known slaves. @@ -87,14 +72,11 @@ public class Slaves extends JerseyTemplateServlet { @GET @Produces(MediaType.TEXT_HTML) public Response get() { - return fillTemplate(new Closure() { - @Override - public void execute(StringTemplate template) { - template.setAttribute("cluster_name", clusterName); - - template.setAttribute("slaves", - FluentIterable.from(getHostAttributes()).transform(TO_SLAVE).toList()); - } + return fillTemplate(template -> { + template.setAttribute("cluster_name", clusterName); + + template.setAttribute("slaves", + FluentIterable.from(getHostAttributes()).transform(TO_SLAVE).toList()); }); } @@ -129,12 +111,7 @@ public class Slaves extends JerseyTemplateServlet { } private static final Function ATTR_TO_STRING = - new Function() { - @Override - public String apply(IAttribute attr) { - return attr.getName() + "=[" + Joiner.on(",").join(attr.getValues()) + "]"; - } - }; + attr -> attr.getName() + "=[" + Joiner.on(",").join(attr.getValues()) + "]"; public String getAttributes() { return Joiner.on(", ").join( http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/http/StructDump.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/StructDump.java b/src/main/java/org/apache/aurora/scheduler/http/StructDump.java index d66f4ce..4fa5254 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/StructDump.java +++ b/src/main/java/org/apache/aurora/scheduler/http/StructDump.java @@ -25,15 +25,10 @@ import javax.ws.rs.core.Response.Status; import com.google.common.base.Optional; import com.google.common.collect.Iterables; -import org.antlr.stringtemplate.StringTemplate; -import org.apache.aurora.common.base.Closure; import org.apache.aurora.common.thrift.Util; -import org.apache.aurora.gen.JobConfiguration; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.StoreProvider; -import org.apache.aurora.scheduler.storage.Storage.Work; import org.apache.aurora.scheduler.storage.Storage.Work.Quiet; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IJobKey; @@ -78,15 +73,12 @@ public class StructDump extends JerseyTemplateServlet { public Response dumpJob( @PathParam("task") final String taskId) { - return dumpEntity("Task " + taskId, new Work.Quiet>>() { - @Override - public Optional> apply(StoreProvider storeProvider) { - // Deep copy the struct to sidestep any subclass trickery inside the storage system. - return Optional.fromNullable(Iterables.getOnlyElement( - storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)), - null) - .newBuilder()); - } + return dumpEntity("Task " + taskId, storeProvider -> { + // Deep copy the struct to sidestep any subclass trickery inside the storage system. + return Optional.fromNullable(Iterables.getOnlyElement( + storeProvider.getTaskStore().fetchTasks(Query.taskScoped(taskId)), + null) + .newBuilder()); }); } @@ -105,27 +97,19 @@ public class StructDump extends JerseyTemplateServlet { final IJobKey jobKey = JobKeys.from(role, environment, job); return dumpEntity("Cron job " + JobKeys.canonicalString(jobKey), - new Work.Quiet>>() { - @Override - public Optional apply(StoreProvider storeProvider) { - return storeProvider.getCronJobStore().fetchJob(jobKey) - .transform(IJobConfiguration::newBuilder); - } - }); + storeProvider -> storeProvider.getCronJobStore().fetchJob(jobKey) + .transform(IJobConfiguration::newBuilder)); } private Response dumpEntity(final String id, final Quiet>> work) { - return fillTemplate(new Closure() { - @Override - public void execute(StringTemplate template) { - template.setAttribute("id", id); - Optional> struct = storage.read(work); - if (struct.isPresent()) { - template.setAttribute("structPretty", Util.prettyPrint(struct.get())); - template.setAttribute("exception", null); - } else { - template.setAttribute("exception", "Entity not found"); - } + return fillTemplate(template -> { + template.setAttribute("id", id); + Optional> struct = storage.read(work); + if (struct.isPresent()) { + template.setAttribute("structPretty", Util.prettyPrint(struct.get())); + template.setAttribute("exception", null); + } else { + template.setAttribute("exception", "Entity not found"); } }); } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/http/Utilization.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/Utilization.java b/src/main/java/org/apache/aurora/scheduler/http/Utilization.java index 147ef0b..440946b 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/Utilization.java +++ b/src/main/java/org/apache/aurora/scheduler/http/Utilization.java @@ -33,8 +33,6 @@ import javax.ws.rs.core.Response.Status; import com.google.common.base.Function; import com.google.common.collect.FluentIterable; -import org.antlr.stringtemplate.StringTemplate; -import org.apache.aurora.common.base.Closure; import org.apache.aurora.common.base.MorePreconditions; import org.apache.aurora.common.util.templating.StringTemplateHelper; import org.apache.aurora.common.util.templating.StringTemplateHelper.TemplateException; @@ -65,24 +63,16 @@ public class Utilization { private String fillTemplate(Map metrics) { Function, DisplayMetric> transform = - new Function, DisplayMetric>() { - @Override - public DisplayMetric apply(Entry entry) { - return new DisplayMetric(entry.getKey(), entry.getValue()); - } - }; + entry -> new DisplayMetric(entry.getKey(), entry.getValue()); return fillTemplate(FluentIterable.from(metrics.entrySet()).transform(transform).toList()); } private String fillTemplate(final Iterable metrics) { StringWriter output = new StringWriter(); try { - templateHelper.writeTemplate(output, new Closure() { - @Override - public void execute(StringTemplate template) { - template.setAttribute("cluster_name", clusterName); - template.setAttribute("metrics", metrics); - } + templateHelper.writeTemplate(output, template -> { + template.setAttribute("cluster_name", clusterName); + template.setAttribute("metrics", metrics); }); } catch (TemplateException e) { throw new WebApplicationException(e); @@ -152,16 +142,11 @@ public class Utilization { } private static final Function TO_DISPLAY = - new Function() { - @Override - public DisplayMetric apply(GlobalMetric count) { - return new DisplayMetric( - new Display( - count.type.name().replace('_', ' ').toLowerCase(), - count.type.name().toLowerCase()), - count); - } - }; + count -> new DisplayMetric( + new Display( + count.type.name().replace('_', ' ').toLowerCase(), + count.type.name().toLowerCase()), + count); /** * Displays the aggregate utilization for the entire cluster. @@ -197,12 +182,9 @@ public class Utilization { public Response aggregateRoles(@PathParam("metric") final String metric) { final MetricType type = getTypeByName(metric); - Function toKey = new Function() { - @Override - public Display apply(ITaskConfig task) { - String role = task.getJob().getRole(); - return new Display(role, metric + "/" + role); - } + Function toKey = task -> { + String role = task.getJob().getRole(); + return new Display(role, metric + "/" + role); }; Map byRole = counter.computeAggregates(Query.unscoped().active(), type.filter, toKey); @@ -224,12 +206,7 @@ public class Utilization { @PathParam("role") String role) { MetricType type = getTypeByName(metric); - Function toKey = new Function() { - @Override - public Display apply(ITaskConfig task) { - return new Display(task.getJobName(), null); - } - }; + Function toKey = task -> new Display(task.getJobName(), null); Map byJob = counter.computeAggregates(Query.roleScoped(role).active(), type.filter, toKey); return Response.ok(fillTemplate(byJob)).build(); http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/http/api/ApiBeta.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/api/ApiBeta.java b/src/main/java/org/apache/aurora/scheduler/http/api/ApiBeta.java index 690e82e..b889f93 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/api/ApiBeta.java +++ b/src/main/java/org/apache/aurora/scheduler/http/api/ApiBeta.java @@ -13,8 +13,6 @@ */ package org.apache.aurora.scheduler.http.api; -import java.io.IOException; -import java.io.OutputStream; import java.io.OutputStreamWriter; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -150,17 +148,14 @@ public class ApiBeta { final Method method = getApiMethod(methodName, methodMetadata); final Object[] params = readParams(parameters, methodMetadata); - return Response.ok(new StreamingOutput() { - @Override - public void write(OutputStream output) throws IOException { - try { - Object response = method.invoke(api, params); - try (OutputStreamWriter out = new OutputStreamWriter(output, StandardCharsets.UTF_8)) { - GSON.toJson(response, out); - } - } catch (IllegalAccessException | InvocationTargetException e) { - throw Throwables.propagate(e); + return Response.ok((StreamingOutput) output -> { + try { + Object response = method.invoke(api, params); + try (OutputStreamWriter out = new OutputStreamWriter(output, StandardCharsets.UTF_8)) { + GSON.toJson(response, out); } + } catch (IllegalAccessException | InvocationTargetException e) { + throw Throwables.propagate(e); } }).build(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/http/api/GsonMessageBodyHandler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/api/GsonMessageBodyHandler.java b/src/main/java/org/apache/aurora/scheduler/http/api/GsonMessageBodyHandler.java index 41f48b9..44295f8 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/api/GsonMessageBodyHandler.java +++ b/src/main/java/org/apache/aurora/scheduler/http/api/GsonMessageBodyHandler.java @@ -43,12 +43,10 @@ import com.google.gson.ExclusionStrategy; import com.google.gson.FieldAttributes; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import com.google.gson.JsonDeserializationContext; import com.google.gson.JsonDeserializer; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParseException; -import com.google.gson.JsonSerializationContext; import com.google.gson.JsonSerializer; import org.apache.thrift.TFieldIdEnum; @@ -177,56 +175,44 @@ public class GsonMessageBodyHandler public static final Gson GSON = new GsonBuilder() .addSerializationExclusionStrategy(EXCLUDE_THRIFT_FIELDS) - .registerTypeHierarchyAdapter(TUnion.class, new JsonSerializer>() { - @Override - public JsonElement serialize( - TUnion src, - Type typeOfSrc, - JsonSerializationContext context) { - - return context.serialize( - ImmutableMap.of(src.getSetField().getFieldName(), src.getFieldValue())); - } - }) - .registerTypeHierarchyAdapter(TUnion.class, new JsonDeserializer>() { - @Override - public TUnion deserialize( - JsonElement json, - Type typeOfT, - JsonDeserializationContext context) throws JsonParseException { - - JsonObject jsonObject = json.getAsJsonObject(); - if (jsonObject.entrySet().size() != 1) { - throw new JsonParseException( - typeOfT.getClass().getName() + " must have exactly one element"); - } - - if (typeOfT instanceof Class) { - Class clazz = (Class) typeOfT; - Entry item = Iterables.getOnlyElement(jsonObject.entrySet()); - - try { - Field metaDataMapField = clazz.getField("metaDataMap"); - @SuppressWarnings("unchecked") - Map metaDataMap = - (Map) metaDataMapField.get(null); - - for (Map.Entry entry : metaDataMap.entrySet()) { - if (entry.getKey().getFieldName().equals(item.getKey())) { - StructMetaData valueMetaData = (StructMetaData) entry.getValue().valueMetaData; - Object result = context.deserialize(item.getValue(), valueMetaData.structClass); - return createUnion(clazz, entry.getKey(), result); + .registerTypeHierarchyAdapter( + TUnion.class, + (JsonSerializer>) (src, typeOfSrc, context) -> context.serialize( + ImmutableMap.of(src.getSetField().getFieldName(), src.getFieldValue()))) + .registerTypeHierarchyAdapter( + TUnion.class, + (JsonDeserializer>) (json, typeOfT, context) -> { + JsonObject jsonObject = json.getAsJsonObject(); + if (jsonObject.entrySet().size() != 1) { + throw new JsonParseException( + typeOfT.getClass().getName() + " must have exactly one element"); + } + + if (typeOfT instanceof Class) { + Class clazz = (Class) typeOfT; + Entry item = Iterables.getOnlyElement(jsonObject.entrySet()); + + try { + Field metaDataMapField = clazz.getField("metaDataMap"); + @SuppressWarnings("unchecked") + Map metaDataMap = + (Map) metaDataMapField.get(null); + + for (Entry entry : metaDataMap.entrySet()) { + if (entry.getKey().getFieldName().equals(item.getKey())) { + StructMetaData valueMetaData = (StructMetaData) entry.getValue().valueMetaData; + Object result = context.deserialize(item.getValue(), valueMetaData.structClass); + return createUnion(clazz, entry.getKey(), result); + } } - } - throw new RuntimeException("Failed to deserialize " + typeOfT); - } catch (NoSuchFieldException | IllegalAccessException | InstantiationException e) { - throw Throwables.propagate(e); + throw new RuntimeException("Failed to deserialize " + typeOfT); + } catch (NoSuchFieldException | IllegalAccessException | InstantiationException e) { + throw Throwables.propagate(e); + } + } else { + throw new RuntimeException("Unable to deserialize " + typeOfT); } - } else { - throw new RuntimeException("Unable to deserialize " + typeOfT); - } - } - }) + }) .create(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/http/api/security/Kerberos5ShiroRealmModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/api/security/Kerberos5ShiroRealmModule.java b/src/main/java/org/apache/aurora/scheduler/http/api/security/Kerberos5ShiroRealmModule.java index 2a8a8f6..579d27a 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/api/security/Kerberos5ShiroRealmModule.java +++ b/src/main/java/org/apache/aurora/scheduler/http/api/security/Kerberos5ShiroRealmModule.java @@ -160,18 +160,15 @@ public class Kerberos5ShiroRealmModule extends AbstractModule { loginContext.login(); serverCredential = Subject.doAs( loginContext.getSubject(), - new PrivilegedAction() { - @Override - public GSSCredential run() { - try { - return gssManager.createCredential( - null /* Use the service principal name defined in jaas.conf */, - GSSCredential.INDEFINITE_LIFETIME, - new Oid[] {new Oid(GSS_SPNEGO_MECH_OID), new Oid(GSS_KRB5_MECH_OID)}, - GSSCredential.ACCEPT_ONLY); - } catch (GSSException e) { - throw Throwables.propagate(e); - } + (PrivilegedAction) () -> { + try { + return gssManager.createCredential( + null /* Use the service principal name defined in jaas.conf */, + GSSCredential.INDEFINITE_LIFETIME, + new Oid[] {new Oid(GSS_SPNEGO_MECH_OID), new Oid(GSS_KRB5_MECH_OID)}, + GSSCredential.ACCEPT_ONLY); + } catch (GSSException e) { + throw Throwables.propagate(e); } }); } catch (LoginException e) { http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java b/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java index 98f3c1f..6905662 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java +++ b/src/main/java/org/apache/aurora/scheduler/http/api/security/ShiroAuthorizingParamInterceptor.java @@ -14,7 +14,6 @@ package org.apache.aurora.scheduler.http.api.security; import java.lang.reflect.Method; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -151,12 +150,7 @@ class ShiroAuthorizingParamInterceptor implements MethodInterceptor { ImmutableMap., Function>>builder() .putAll(Maps.uniqueIndex( FIELD_GETTERS, - new Function, Class>() { - @Override - public Class apply(FieldGetter input) { - return input.getStructClass(); - } - })) + (Function, Class>) FieldGetter::getStructClass)) .build(); @VisibleForTesting @@ -172,36 +166,31 @@ class ShiroAuthorizingParamInterceptor implements MethodInterceptor { * @see org.apache.aurora.scheduler.http.api.security.AuthorizingParam */ private static Iterable getCandidateMethods(final Method method) { - return new Iterable() { + return () -> new AbstractSequentialIterator(method) { @Override - public Iterator iterator() { - return new AbstractSequentialIterator(method) { - @Override - protected Method computeNext(Method previous) { - String name = previous.getName(); - Class[] parameterTypes = previous.getParameterTypes(); - Class declaringClass = previous.getDeclaringClass(); - - if (declaringClass.isInterface()) { - return null; - } + protected Method computeNext(Method previous) { + String name = previous.getName(); + Class[] parameterTypes = previous.getParameterTypes(); + Class declaringClass = previous.getDeclaringClass(); - Iterable> searchOrder = ImmutableList.>builder() - .addAll(Optional.fromNullable(declaringClass.getSuperclass()).asSet()) - .addAll(ImmutableList.copyOf(declaringClass.getInterfaces())) - .build(); - - for (Class klazz : searchOrder) { - try { - return klazz.getMethod(name, parameterTypes); - } catch (NoSuchMethodException ignored) { - // Expected. - } - } + if (declaringClass.isInterface()) { + return null; + } - return null; + Iterable> searchOrder = ImmutableList.>builder() + .addAll(Optional.fromNullable(declaringClass.getSuperclass()).asSet()) + .addAll(ImmutableList.copyOf(declaringClass.getInterfaces())) + .build(); + + for (Class klazz : searchOrder) { + try { + return klazz.getMethod(name, parameterTypes); + } catch (NoSuchMethodException ignored) { + // Expected. } - }; + } + + return null; } }; } @@ -271,15 +260,12 @@ class ShiroAuthorizingParamInterceptor implements MethodInterceptor { + " field getter was supplied for " + parameterType.getName()); } - return new Function>() { - @Override - public Optional apply(Object[] arguments) { - Optional argument = Optional.fromNullable(arguments[index]); - if (argument.isPresent()) { - return jobKeyGetter.get().apply(argument.get()); - } else { - return Optional.absent(); - } + return arguments -> { + Optional argument = Optional.fromNullable(arguments[index]); + if (argument.isPresent()) { + return jobKeyGetter.get().apply(argument.get()); + } else { + return Optional.absent(); } }; } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLog.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLog.java b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLog.java index 4afd077..cb89be2 100644 --- a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLog.java +++ b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLog.java @@ -164,12 +164,7 @@ public class MesosLog implements org.apache.aurora.scheduler.log.Log { } private static final Function MESOS_ENTRY_TO_ENTRY = - new Function() { - @Override - public LogEntry apply(Log.Entry entry) { - return new LogEntry(entry); - } - }; + LogEntry::new; private final OpStats readStats = new OpStats("read"); private final OpStats appendStats = new OpStats("append"); @@ -312,13 +307,9 @@ public class MesosLog implements org.apache.aurora.scheduler.log.Log { public LogPosition append(final byte[] contents) throws StreamAccessException { requireNonNull(contents); - Log.Position position = mutate(appendStats, new Mutation() { - @Override - public Log.Position apply(WriterInterface logWriter) - throws TimeoutException, Log.WriterFailedException { - return logWriter.append(contents, writeTimeout, writeTimeUnit); - } - }); + Log.Position position = mutate( + appendStats, + logWriter -> logWriter.append(contents, writeTimeout, writeTimeUnit)); return LogPosition.wrap(position); } @@ -330,13 +321,9 @@ public class MesosLog implements org.apache.aurora.scheduler.log.Log { Preconditions.checkArgument(position instanceof LogPosition); final Log.Position before = ((LogPosition) position).unwrap(); - mutate(truncateStats, new Mutation() { - @Override - public Void apply(WriterInterface logWriter) - throws TimeoutException, Log.WriterFailedException { - logWriter.truncate(before, writeTimeout, writeTimeUnit); - return null; - } + mutate(truncateStats, logWriter -> { + logWriter.truncate(before, writeTimeout, writeTimeUnit); + return null; }); } http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java index 64ab611..906b349 100644 --- a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java +++ b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java @@ -149,12 +149,7 @@ public class MesosLogStreamModule extends PrivateModule { @Provides LogInterface provideLogInterface(final Log log) { - return new LogInterface() { - @Override - public Log.Position position(byte[] identity) { - return log.position(identity); - } - }; + return log::position; } @Provides http://git-wip-us.apache.org/repos/asf/aurora/blob/2e237148/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java index 0743cb0..c4af2fd 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java @@ -44,6 +44,7 @@ import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.Storage.MutateWork; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.mesos.Protos.ExecutorID; import org.apache.mesos.Protos.FrameworkID; @@ -130,12 +131,9 @@ public class MesosSchedulerImpl implements Scheduler { log.info("Registered with ID " + frameworkId + ", master: " + masterInfo); - storage.write(new MutateWork.NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider storeProvider) { - storeProvider.getSchedulerStore().saveFrameworkId(frameworkId.getValue()); - } - }); + storage.write( + (NoResult.Quiet) storeProvider -> + storeProvider.getSchedulerStore().saveFrameworkId(frameworkId.getValue())); isRegistered = true; eventSink.post(new DriverRegistered()); } @@ -158,27 +156,24 @@ public class MesosSchedulerImpl implements Scheduler { public void resourceOffers(SchedulerDriver driver, final List offers) { Preconditions.checkState(isRegistered, "Must be registered before receiving offers."); - executor.execute(new Runnable() { - @Override - public void run() { - // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip over - // offers when the host attributes cannot be found. (AURORA-137) - storage.write(new MutateWork.NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider storeProvider) { - for (Offer offer : offers) { - IHostAttributes attributes = - AttributeStore.Util.mergeOffer(storeProvider.getAttributeStore(), offer); - storeProvider.getAttributeStore().saveHostAttributes(attributes); - if (log.isLoggable(Level.FINE)) { - log.log(Level.FINE, String.format("Received offer: %s", offer)); - } - counters.get("scheduler_resource_offers").incrementAndGet(); - offerManager.addOffer(new HostOffer(offer, attributes)); + executor.execute(() -> { + // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip over + // offers when the host attributes cannot be found. (AURORA-137) + storage.write(new MutateWork.NoResult.Quiet() { + @Override + public void execute(MutableStoreProvider storeProvider) { + for (Offer offer : offers) { + IHostAttributes attributes = + AttributeStore.Util.mergeOffer(storeProvider.getAttributeStore(), offer); + storeProvider.getAttributeStore().saveHostAttributes(attributes); + if (log.isLoggable(Level.FINE)) { + log.log(Level.FINE, String.format("Received offer: %s", offer)); } + counters.get("scheduler_resource_offers").incrementAndGet(); + offerManager.addOffer(new HostOffer(offer, attributes)); } - }); - } + } + }); }); } @@ -211,12 +206,7 @@ public class MesosSchedulerImpl implements Scheduler { logger.log(level, message.toString()); } - private static final Function SECONDS_TO_MICROS = new Function() { - @Override - public Long apply(Double seconds) { - return (long) (seconds * 1E6); - } - }; + private static final Function SECONDS_TO_MICROS = seconds -> (long) (seconds * 1E6); @AllowUnchecked @Timed("scheduler_status_update")