aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject git commit: Add a pubsub event adapter to the JobUpdateController.
Date Tue, 19 Aug 2014 17:52:47 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 20bb549ba -> 4de336819


Add a pubsub event adapter to the JobUpdateController.

Bugs closed: AURORA-613

Reviewed at https://reviews.apache.org/r/24827/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/4de33681
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/4de33681
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/4de33681

Branch: refs/heads/master
Commit: 4de336819dd7bd04419f6538ff555f24b6f740c7
Parents: 20bb549
Author: Bill Farner <wfarner@apache.org>
Authored: Tue Aug 19 10:49:26 2014 -0700
Committer: Bill Farner <wfarner@apache.org>
Committed: Tue Aug 19 10:49:26 2014 -0700

----------------------------------------------------------------------
 .../scheduler/updater/JobUpdateController.java  |  17 ++-
 .../updater/JobUpdateControllerImpl.java        |   7 +-
 .../updater/JobUpdateEventSubscriber.java       | 102 +++++++++++++++++
 .../updater/JobUpdateEventSubscriberTest.java   | 112 +++++++++++++++++++
 4 files changed, 234 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4de33681/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
index 066238c..5eb5117 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateController.java
@@ -66,10 +66,21 @@ interface JobUpdateController {
   void abort(IJobKey job) throws UpdateStateException;
 
   /**
-   * Notifies the updater that an instance has changed state.
+   * Notifies the updater that the state of an instance has changed. A state change could
also mean
+   * deletion.
    *
    * @param instance Identifier fo the instance whose state has changed.
-   * @param deleted {@code true} if the state change was a task deletion, otherwise {@code
false}.
    */
-  void handleStateChange(IInstanceKey instance, boolean deleted);
+  void instanceChangedState(IInstanceKey instance);
+
+  /**
+   * Restores an active update for a job that has been halted due to the scheduler restarting.
+   * This is distinct from {@link #resume(IJobKey)} in that it does not change the state
of the
+   * update, but resumes after a restart of the scheduler process.
+   *
+   * @param job Job to resume.
+   * @throws UpdateStateException If the update cannot resume, such as if the update is already
+   *                              active.
+   */
+  void systemResume(IJobKey job) throws UpdateStateException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4de33681/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index 3bbd6dd..28fd32f 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -42,7 +42,12 @@ public class JobUpdateControllerImpl implements JobUpdateController {
   }
 
   @Override
-  public void handleStateChange(IInstanceKey instance, boolean deleted) {
+  public void instanceChangedState(IInstanceKey instance) {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public void systemResume(IJobKey job) throws UpdateStateException {
     throw new UnsupportedOperationException("Not yet implemented.");
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4de33681/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriber.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriber.java
b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriber.java
new file mode 100644
index 0000000..7be792f
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriber.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+
+import org.apache.aurora.gen.InstanceKey;
+import org.apache.aurora.gen.JobUpdateQuery;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
+import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import static org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+
+/**
+ * A pubsub event subscriber that forwards status updates to the job update controller.
+ */
+class JobUpdateEventSubscriber implements PubsubEvent.EventSubscriber {
+  private final JobUpdateController controller;
+  private final Storage storage;
+
+  @Inject
+  JobUpdateEventSubscriber(JobUpdateController controller, Storage storage) {
+    this.controller = requireNonNull(controller);
+    this.storage = requireNonNull(storage);
+  }
+
+  private static final Function<IScheduledTask, IInstanceKey> TASK_TO_INSTANCE_KEY
=
+      new Function<IScheduledTask, IInstanceKey>() {
+        @Override
+        public IInstanceKey apply(IScheduledTask task) {
+          return IInstanceKey.build(
+              new InstanceKey()
+                  .setJobKey(Tasks.SCHEDULED_TO_JOB_KEY.apply(task).newBuilder())
+                  .setInstanceId(Tasks.SCHEDULED_TO_INSTANCE_ID.apply(task)));
+        }
+      };
+
+  @Subscribe
+  public synchronized void taskChangedState(TaskStateChange change) {
+    controller.instanceChangedState(TASK_TO_INSTANCE_KEY.apply(change.getTask()));
+  }
+
+  @Subscribe
+  public synchronized void tasksDeleted(TasksDeleted event) {
+    Set<IInstanceKey> instances = FluentIterable.from(event.getTasks())
+        .transform(TASK_TO_INSTANCE_KEY)
+        .toSet();
+    for (IInstanceKey instance : instances) {
+      controller.instanceChangedState(instance);
+    }
+  }
+
+  @VisibleForTesting
+  static final IJobUpdateQuery ACTIVE_QUERY = IJobUpdateQuery.build(
+      new JobUpdateQuery().setUpdateStatuses(ImmutableSet.of(ROLLING_FORWARD, ROLLING_BACK)));
+
+  @Subscribe
+  public synchronized void schedulerActive(PubsubEvent.SchedulerActive event)
+      throws UpdateStateException {
+
+    storage.write(new Storage.MutateWork.NoResult<UpdateStateException>() {
+      @Override
+      protected void execute(Storage.MutableStoreProvider storeProvider)
+          throws UpdateStateException {
+
+        for (IJobUpdateSummary summary
+            : storeProvider.getJobUpdateStore().fetchJobUpdateSummaries(ACTIVE_QUERY)) {
+
+          controller.systemResume(summary.getJobKey());
+        }
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4de33681/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriberTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriberTest.java
b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriberTest.java
new file mode 100644
index 0000000..2a7d965
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdateEventSubscriberTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.updater;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.eventbus.EventBus;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.InstanceKey;
+import org.apache.aurora.gen.JobUpdateSummary;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.scheduler.events.PubsubEvent.SchedulerActive;
+import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import static org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import static org.easymock.EasyMock.expect;
+
+public class JobUpdateEventSubscriberTest extends EasyMockTest {
+
+  private static final IJobKey JOB_A = JobKeys.from("role", "env", "name");
+
+  private static final IScheduledTask TASK_A = IScheduledTask.build(
+      new ScheduledTask().setAssignedTask(
+          new AssignedTask()
+              .setInstanceId(5)
+              .setTask(new TaskConfig()
+                  .setOwner(new Identity().setRole(JOB_A.getRole()))
+                  .setEnvironment(JOB_A.getEnvironment())
+                  .setJobName(JOB_A.getName()))));
+  private static final IInstanceKey INSTANCE_A = IInstanceKey.build(
+      new InstanceKey(JOB_A.newBuilder(), TASK_A.getAssignedTask().getInstanceId()));
+
+  private StorageTestUtil storageUtil;
+  private JobUpdateController updater;
+
+  private EventBus eventBus;
+
+  @Before
+  public void setUp() {
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+    updater = createMock(JobUpdateController.class);
+
+    eventBus = new EventBus();
+    eventBus.register(new JobUpdateEventSubscriber(updater, storageUtil.storage));
+  }
+
+  @Test
+  public void testStateChange() throws Exception {
+    updater.instanceChangedState(INSTANCE_A);
+
+    control.replay();
+
+    eventBus.post(TaskStateChange.initialized(TASK_A));
+  }
+
+  @Test
+  public void testDeleted() throws Exception {
+    updater.instanceChangedState(INSTANCE_A);
+
+    control.replay();
+
+    eventBus.post(new TasksDeleted(ImmutableSet.of(TASK_A)));
+  }
+
+  private static final IJobUpdateSummary SUMMARY = IJobUpdateSummary.build(new JobUpdateSummary()
+      .setJobKey(JOB_A.newBuilder()));
+
+  @Test
+  public void testSchedulerStartup() throws Exception {
+    expect(storageUtil.updateStore.fetchJobUpdateSummaries(JobUpdateEventSubscriber.ACTIVE_QUERY))
+        .andReturn(ImmutableList.of(SUMMARY));
+    updater.systemResume(JOB_A);
+
+    control.replay();
+
+    eventBus.post(new SchedulerActive());
+  }
+
+  @Test
+  public void testSchedulerStartupNoUpdates() throws Exception {
+    expect(storageUtil.updateStore.fetchJobUpdateSummaries(JobUpdateEventSubscriber.ACTIVE_QUERY))
+        .andReturn(ImmutableList.<IJobUpdateSummary>of());
+
+    control.replay();
+
+    eventBus.post(new SchedulerActive());
+  }
+}


Mime
View raw message