Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 558C5200C1D for ; Thu, 16 Feb 2017 21:08:53 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 542E7160B61; Thu, 16 Feb 2017 20:08:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4EA08160B52 for ; Thu, 16 Feb 2017 21:08:52 +0100 (CET) Received: (qmail 41783 invoked by uid 500); 16 Feb 2017 20:08:51 -0000 Mailing-List: contact commits-help@aurora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.apache.org Delivered-To: mailing list commits@aurora.apache.org Received: (qmail 41774 invoked by uid 99); 16 Feb 2017 20:08:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Feb 2017 20:08:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 382E9DFC63; Thu, 16 Feb 2017 20:08:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zmanji@apache.org To: commits@aurora.apache.org Message-Id: <4e758ae870d540808aa9192fdbd179e1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: aurora git commit: Add best effort pulse timestamp recovery. Date: Thu, 16 Feb 2017 20:08:51 +0000 (UTC) archived-at: Thu, 16 Feb 2017 20:08:53 -0000 Repository: aurora Updated Branches: refs/heads/master 9ea897978 -> 4ab4b2b2c Add best effort pulse timestamp recovery. Currently the scheduler causes all coordinated ("pulsed") updates into ROLL_FORWARD_AWAITING_PULSE, or ROLL_BACK_AWAITING_PULSE on scheduler startup/recovery. This is because the last pulse timestamp is not durably stored and the timestamp of the last pulse is set to 0L (aka no pulse yet). In cases where the pulse timeout is larger and the failover is fast or frequent, this casues many updates to unnecessarily transition into a pulse related state until the next pulse. It is posible to avoid these uncessary transitons by traversing the job update events and initializing the last pulse timestamp to the last event if the last event was not a pulse event. Bugs closed: AURORA-1890 Reviewed at https://reviews.apache.org/r/56723/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/4ab4b2b2 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/4ab4b2b2 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/4ab4b2b2 Branch: refs/heads/master Commit: 4ab4b2b2c4ce7c6cc4e79aacde0cb54275f82b67 Parents: 9ea8979 Author: Zameer Manji Authored: Thu Feb 16 12:08:34 2017 -0800 Committer: Zameer Manji Committed: Thu Feb 16 12:08:34 2017 -0800 ---------------------------------------------------------------------- .../thrift/org/apache/aurora/gen/api.thrift | 3 + .../org/apache/aurora/scheduler/base/Jobs.java | 10 +++ .../updater/JobUpdateControllerImpl.java | 36 +++++++++-- .../aurora/scheduler/updater/JobUpdaterIT.java | 64 +++++++++++++++++++- 4 files changed, 108 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/4ab4b2b2/api/src/main/thrift/org/apache/aurora/gen/api.thrift ---------------------------------------------------------------------- diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift b/api/src/main/thrift/org/apache/aurora/gen/api.thrift index efd4e53..3749531 100644 --- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift +++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift @@ -619,6 +619,9 @@ const set ACTIVE_JOB_UPDATE_STATES = [JobUpdateStatus.ROLLING_F JobUpdateStatus.ROLL_BACK_PAUSED, JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE, JobUpdateStatus.ROLL_BACK_AWAITING_PULSE] +/** States the job update can be in while waiting for a pulse. */ +const set AWAITNG_PULSE_JOB_UPDATE_STATES = [JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE, + JobUpdateStatus.ROLL_BACK_AWAITING_PULSE] /** Job update actions that can be applied to job instances. */ enum JobUpdateAction { http://git-wip-us.apache.org/repos/asf/aurora/blob/4ab4b2b2/src/main/java/org/apache/aurora/scheduler/base/Jobs.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/base/Jobs.java b/src/main/java/org/apache/aurora/scheduler/base/Jobs.java index 49e5b2c..8d5f4e5 100644 --- a/src/main/java/org/apache/aurora/scheduler/base/Jobs.java +++ b/src/main/java/org/apache/aurora/scheduler/base/Jobs.java @@ -13,8 +13,12 @@ */ package org.apache.aurora.scheduler.base; +import java.util.EnumSet; + import org.apache.aurora.gen.JobStats; +import org.apache.aurora.gen.JobUpdateStatus; import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.apiConstants; import org.apache.aurora.scheduler.storage.entities.IJobStats; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; @@ -28,6 +32,12 @@ public final class Jobs { } /** + * States of updates that are blocked on pulses. + */ + public static final EnumSet AWAITING_PULSE_STATES = + EnumSet.copyOf(apiConstants.AWAITNG_PULSE_JOB_UPDATE_STATES); + + /** * For a given collection of tasks compute statistics based on the state of the task. * * @param tasks a collection of tasks for which statistics are sought http://git-wip-us.apache.org/repos/asf/aurora/blob/4ab4b2b2/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java index 729c123..e141124 100644 --- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java @@ -14,6 +14,7 @@ package org.apache.aurora.scheduler.updater; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -28,6 +29,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; import com.google.inject.Inject; import org.apache.aurora.common.application.Lifecycle; @@ -84,6 +86,7 @@ import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK; import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD; import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE; import static org.apache.aurora.scheduler.base.AsyncUtil.shutdownOnError; +import static org.apache.aurora.scheduler.base.Jobs.AWAITING_PULSE_STATES; import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.ACTIVE_QUERY; import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.AUTO_RESUME_STATES; @@ -202,7 +205,7 @@ class JobUpdateControllerImpl implements JobUpdateController { JobUpdateStatus status = ROLLING_FORWARD; if (isCoordinatedUpdate(instructions)) { status = ROLL_FORWARD_AWAITING_PULSE; - pulseHandler.initializePulseState(update, status); + pulseHandler.initializePulseState(update, status, 0L); } recordAndChangeJobUpdateStatus( @@ -271,6 +274,29 @@ class JobUpdateControllerImpl implements JobUpdateController { return status -> addAuditData(newEvent(status), auditData); } + private static final Ordering CHRON_ORDERING = + Ordering.from(Comparator.comparingLong(IJobUpdateEvent::getTimestampMs)); + + private long inferLastPulseTimestamp(IJobUpdateDetails details) { + // Pulse timestamps are not durably stored by design. However, on system recovery, + // setting the timestamp of the last pulse to 0L (aka no pulse) is not correct. + // By inspecting the job update events we can infer a reasonable time stamp to initialize to. + // In this case, if the upgrade was not waiting for a pulse previously, we can reuse the + // timestamp of the last event. This does reset the counter for pulses, but reflects the + // most likely behaviour of a healthy system. + + // This is safe because we always write at least one job update event on job update creation + IJobUpdateEvent mostRecent = CHRON_ORDERING.max(details.getUpdateEvents()); + + long ts = 0L; + + if (!AWAITING_PULSE_STATES.contains(mostRecent.getStatus())) { + ts = mostRecent.getTimestampMs(); + } + + return ts; + } + @Override public void systemResume() { storage.write((NoResult.Quiet) storeProvider -> { @@ -284,7 +310,9 @@ class JobUpdateControllerImpl implements JobUpdateController { if (isCoordinatedUpdate(instructions)) { LOG.info("Automatically restoring pulse state for " + key); - pulseHandler.initializePulseState(details.getUpdate(), status); + + long pulseMs = inferLastPulseTimestamp(details); + pulseHandler.initializePulseState(details.getUpdate(), status, pulseMs); } if (AUTO_RESUME_STATES.contains(status)) { @@ -769,11 +797,11 @@ class JobUpdateControllerImpl implements JobUpdateController { this.clock = requireNonNull(clock); } - synchronized void initializePulseState(IJobUpdate update, JobUpdateStatus status) { + synchronized void initializePulseState(IJobUpdate update, JobUpdateStatus status, long ts) { pulseStates.put(update.getSummary().getKey(), new PulseState( status, update.getInstructions().getSettings().getBlockIfNoPulsesAfterMs(), - 0L)); + ts)); } synchronized PulseState pulseAndGet(IJobUpdateKey key) { http://git-wip-us.apache.org/repos/asf/aurora/blob/4ab4b2b2/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java index ea0b89a..30b44f8 100644 --- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java +++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java @@ -30,6 +30,7 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import com.google.common.collect.Ordering; import com.google.common.eventbus.EventBus; +import com.google.common.primitives.Ints; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; @@ -142,6 +143,8 @@ public class JobUpdaterIT extends EasyMockTest { private static final Amount WATCH_TIMEOUT = Amount.of(2000L, Time.MILLISECONDS); private static final Amount FLAPPING_THRESHOLD = Amount.of(1L, Time.MILLISECONDS); private static final Amount ONE_DAY = Amount.of(1L, Time.DAYS); + private static final Amount ONE_HOUR = Amount.of(1L, Time.HOURS); + private static final Amount ONE_MINUTE = Amount.of(1L, Time.MINUTES); private static final ITaskConfig OLD_CONFIG = setExecutorData(TaskTestUtil.makeConfig(JOB), "olddata"); private static final ITaskConfig NEW_CONFIG = setExecutorData(OLD_CONFIG, "newdata"); @@ -470,6 +473,8 @@ public class JobUpdaterIT extends EasyMockTest { storage.write( storeProvider -> saveJobUpdate(storeProvider.getJobUpdateStore(), update, ROLLING_FORWARD)); + clock.advance(ONE_MINUTE); + subscriber.startAsync().awaitRunning(); ImmutableMultimap.Builder actions = ImmutableMultimap.builder(); @@ -494,6 +499,53 @@ public class JobUpdaterIT extends EasyMockTest { } @Test + public void testRecoverLongPulseTimeoutCoordinatedUpdateFromStorage() throws Exception { + // A brief failover in the middle of a rolling forward update with a long pulse timeout should + // mean that after scheduler startup the update is not waiting for a pulse. + expectTaskKilled().times(1); + + control.replay(); + + JobUpdate builder = + setInstanceCount(makeJobUpdate(makeInstanceConfig(0, 0, OLD_CONFIG)), 1).newBuilder(); + builder.getInstructions().getSettings() + .setBlockIfNoPulsesAfterMs(Ints.checkedCast(ONE_HOUR.as(Time.MILLISECONDS))); + IJobUpdate update = IJobUpdate.build(builder); + insertInitialTasks(update); + changeState(JOB, 0, ASSIGNED, STARTING, RUNNING); + clock.advance(ONE_DAY); + + storage.write(storeProvider -> + saveJobUpdate(storeProvider.getJobUpdateStore(), update, ROLL_FORWARD_AWAITING_PULSE)); + + // The first pulse comes after one minute + clock.advance(ONE_MINUTE); + + storage.write( + (NoResult.Quiet) storeProvider -> + saveJobUpdateEvent(storeProvider.getJobUpdateStore(), update, ROLLING_FORWARD)); + + clock.advance(ONE_MINUTE); + + subscriber.startAsync().awaitRunning(); + ImmutableMultimap.Builder actions = ImmutableMultimap.builder(); + + actions.putAll(0, INSTANCE_UPDATING); + // Since the pulse interval is so large and the downtime was so short, the update does not need + // to wait for a pulse. + assertState(ROLLING_FORWARD, actions.build()); + + // Instance 0 is updated. + changeState(JOB, 0, KILLED, ASSIGNED, STARTING, RUNNING); + clock.advance(WATCH_TIMEOUT); + + actions.putAll(0, INSTANCE_UPDATED); + + assertState(ROLLED_FORWARD, actions.build()); + assertEquals(JobUpdatePulseStatus.FINISHED, updater.pulse(UPDATE_ID)); + } + + @Test public void testRecoverAwaitingPulseFromStorage() throws Exception { expectTaskKilled(); @@ -676,6 +728,8 @@ public class JobUpdaterIT extends EasyMockTest { storage.write( storeProvider -> saveJobUpdate(storeProvider.getJobUpdateStore(), update, ROLLING_FORWARD)); + clock.advance(ONE_MINUTE); + subscriber.startAsync().awaitRunning(); ImmutableMultimap.Builder actions = ImmutableMultimap.builder(); @@ -1142,13 +1196,21 @@ public class JobUpdaterIT extends EasyMockTest { } store.saveJobUpdate(update, Optional.of(lock.getToken())); + saveJobUpdateEvent(store, update, status); + return lock; + } + + private void saveJobUpdateEvent( + JobUpdateStore.Mutable store, + IJobUpdate update, + JobUpdateStatus status) { + store.saveJobUpdateEvent( update.getSummary().getKey(), IJobUpdateEvent.build( new JobUpdateEvent() .setStatus(status) .setTimestampMs(clock.nowMillis()))); - return lock; } @Test