aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject incubator-aurora git commit: Implementing pulseJobUpdate RPC.
Date Thu, 12 Feb 2015 00:31:05 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master b8f71fbfc -> 61e6c35f9


Implementing pulseJobUpdate RPC.

Bugs closed: AURORA-1009

Reviewed at https://reviews.apache.org/r/30325/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/61e6c35f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/61e6c35f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/61e6c35f

Branch: refs/heads/master
Commit: 61e6c35f91e959ba6247dddc3fe3524795c5f851
Parents: b8f71fb
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Wed Feb 11 16:30:38 2015 -0800
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Wed Feb 11 16:30:38 2015 -0800

----------------------------------------------------------------------
 .../thrift/org/apache/aurora/gen/api.thrift     | 14 +++-
 .../apache/aurora/auth/CapabilityValidator.java |  3 +-
 .../thrift/SchedulerThriftInterface.java        | 83 ++++++++++++++++----
 .../thrift/SchedulerThriftInterfaceTest.java    | 61 +++++++++++++-
 .../scheduler/thrift/aop/ForwardingThrift.java  |  5 +-
 5 files changed, 143 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/61e6c35f/api/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
index cc22730..2a77f28 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -623,6 +623,16 @@ enum JobUpdatePulseStatus {
   FINISHED = 2
 }
 
+// TODO(maxim): refactor beta updater to use JobUpdateKey (AURORA-1093).
+/** Job update key. */
+struct JobUpdateKey {
+  /** Job being updated */
+  1: JobKey job
+
+  /** Update ID. */
+  2: string id
+}
+
 /** Job update thresholds and limits. */
 struct JobUpdateSettings {
   /** Max number of instances being updated at any given moment. */
@@ -658,7 +668,7 @@ struct JobUpdateSettings {
   * block. A blocked update is unable to continue but retains its current status. It may
only get
   * unblocked by a fresh pulseJobUpdate call.
   */
-  9: i32 blockIfNoPulsesAfterMs
+  9: optional i32 blockIfNoPulsesAfterMs
 }
 
 /** Event marking a state transition in job update lifecycle. */
@@ -1033,7 +1043,7 @@ service AuroraSchedulerManager extends ReadOnlyScheduler {
    * JobUpdateSettings. Unblocks progress if the update was previously blocked.
    * Responds with ResponseCode.INVALID_REQUEST in case an unknown updateId is specified.
    */
-  Response pulseJobUpdate(1: string updateId, 2: SessionKey session)
+  Response pulseJobUpdate(1: JobUpdateKey key, 2: SessionKey session)
 }
 
 struct InstanceConfigRewrite {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/61e6c35f/src/main/java/org/apache/aurora/auth/CapabilityValidator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/auth/CapabilityValidator.java b/src/main/java/org/apache/aurora/auth/CapabilityValidator.java
index 45ef643..198cdf3 100644
--- a/src/main/java/org/apache/aurora/auth/CapabilityValidator.java
+++ b/src/main/java/org/apache/aurora/auth/CapabilityValidator.java
@@ -25,7 +25,8 @@ public interface CapabilityValidator extends SessionValidator {
   enum Capability {
     ROOT,
     MACHINE_MAINTAINER,
-    PROVISIONER
+    PROVISIONER,
+    UPDATE_COORDINATOR,
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/61e6c35f/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index fd4d690..2a9d36a 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -52,7 +52,6 @@ import com.twitter.common.args.constraints.Positive;
 
 import org.apache.aurora.auth.CapabilityValidator;
 import org.apache.aurora.auth.CapabilityValidator.AuditCheck;
-import org.apache.aurora.auth.CapabilityValidator.Capability;
 import org.apache.aurora.auth.SessionValidator.AuthFailedException;
 import org.apache.aurora.gen.AcquireLockResult;
 import org.apache.aurora.gen.AddInstancesConfig;
@@ -80,6 +79,8 @@ import org.apache.aurora.gen.JobSummary;
 import org.apache.aurora.gen.JobSummaryResult;
 import org.apache.aurora.gen.JobUpdate;
 import org.apache.aurora.gen.JobUpdateInstructions;
+import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.JobUpdatePulseStatus;
 import org.apache.aurora.gen.JobUpdateQuery;
 import org.apache.aurora.gen.JobUpdateRequest;
 import org.apache.aurora.gen.JobUpdateSettings;
@@ -92,6 +93,7 @@ import org.apache.aurora.gen.LockValidation;
 import org.apache.aurora.gen.MaintenanceStatusResult;
 import org.apache.aurora.gen.PendingReason;
 import org.apache.aurora.gen.PopulateJobResult;
+import org.apache.aurora.gen.PulseJobUpdateResult;
 import org.apache.aurora.gen.QueryRecoveryResult;
 import org.apache.aurora.gen.ResourceAggregate;
 import org.apache.aurora.gen.Response;
@@ -147,6 +149,7 @@ import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateRequest;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateSettings;
@@ -172,6 +175,10 @@ import static java.util.Objects.requireNonNull;
 import static com.google.common.base.CharMatcher.WHITESPACE;
 import static com.twitter.common.base.MorePreconditions.checkNotBlank;
 
+import static org.apache.aurora.auth.CapabilityValidator.Capability.MACHINE_MAINTAINER;
+import static org.apache.aurora.auth.CapabilityValidator.Capability.PROVISIONER;
+import static org.apache.aurora.auth.CapabilityValidator.Capability.ROOT;
+import static org.apache.aurora.auth.CapabilityValidator.Capability.UPDATE_COORDINATOR;
 import static org.apache.aurora.auth.SessionValidator.SessionContext;
 import static org.apache.aurora.gen.ResponseCode.AUTH_FAILED;
 import static org.apache.aurora.gen.ResponseCode.INVALID_REQUEST;
@@ -750,8 +757,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
 
   private Optional<SessionContext> isAdmin(SessionKey session) {
     try {
-      return Optional.of(
-          sessionValidator.checkAuthorized(session, Capability.ROOT, AuditCheck.REQUIRED));
+      return Optional.of(sessionValidator.checkAuthorized(session, ROOT, AuditCheck.REQUIRED));
     } catch (AuthFailedException e) {
       return Optional.absent();
     }
@@ -881,7 +887,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     return okResponse(Result.getQuotaResult(result));
   }
 
-  @Requires(whitelist = Capability.PROVISIONER)
+  @Requires(whitelist = PROVISIONER)
   @Override
   public Response setQuota(
       final String ownerRole,
@@ -900,7 +906,7 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     }
   }
 
-  @Requires(whitelist = Capability.MACHINE_MAINTAINER)
+  @Requires(whitelist = MACHINE_MAINTAINER)
   @Override
   public Response startMaintenance(Hosts hosts, SessionKey session) {
     return okResponse(Result.startMaintenanceResult(
@@ -908,21 +914,21 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
             .setStatuses(maintenance.startMaintenance(hosts.getHostNames()))));
   }
 
-  @Requires(whitelist = Capability.MACHINE_MAINTAINER)
+  @Requires(whitelist = MACHINE_MAINTAINER)
   @Override
   public Response drainHosts(Hosts hosts, SessionKey session) {
     return okResponse(Result.drainHostsResult(
         new DrainHostsResult().setStatuses(maintenance.drain(hosts.getHostNames()))));
   }
 
-  @Requires(whitelist = Capability.MACHINE_MAINTAINER)
+  @Requires(whitelist = MACHINE_MAINTAINER)
   @Override
   public Response maintenanceStatus(Hosts hosts, SessionKey session) {
     return okResponse(Result.maintenanceStatusResult(
         new MaintenanceStatusResult().setStatuses(maintenance.getStatus(hosts.getHostNames()))));
   }
 
-  @Requires(whitelist = Capability.MACHINE_MAINTAINER)
+  @Requires(whitelist = MACHINE_MAINTAINER)
   @Override
   public Response endMaintenance(Hosts hosts, SessionKey session) {
     return okResponse(Result.endMaintenanceResult(
@@ -942,8 +948,8 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
 
     final SessionContext context;
     try {
-      // TODO(Sathya): Remove this after AOP-style session validation passes in a SessionContext.
-      context = sessionValidator.checkAuthorized(session, Capability.ROOT, AuditCheck.REQUIRED);
+      // TODO(maxim): Remove this after AOP-style session validation passes in a SessionContext.
+      context = sessionValidator.checkAuthorized(session, ROOT, AuditCheck.REQUIRED);
     } catch (AuthFailedException e) {
       return errorResponse(AUTH_FAILED, e);
     }
@@ -1386,6 +1392,10 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
       return invalidResponse(INVALID_MIN_WAIT_TO_RUNNING);
     }
 
+    if (settings.isSetBlockIfNoPulsesAfterMs() && settings.getBlockIfNoPulsesAfterMs()
<= 0) {
+      return invalidResponse(INVALID_PULSE_TIMEOUT);
+    }
+
     final SessionContext context;
     final IJobUpdateRequest request;
     try {
@@ -1530,12 +1540,20 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     return changeJobUpdateState(mutableJobKey, session, ABORT);
   }
 
-  @VisibleForTesting
-  static final String NOT_IMPLEMENTED_MESSAGE = "Not implemented";
-
   @Override
-  public Response pulseJobUpdate(String updateId, SessionKey session) {
-    throw new UnsupportedOperationException(NOT_IMPLEMENTED_MESSAGE);
+  public Response pulseJobUpdate(JobUpdateKey mutableUpdateKey, final SessionKey session)
{
+    IJobUpdateKey updateKey = validateJobUpdateKey(mutableUpdateKey);
+    try {
+      authorizeJobUpdateAction(updateKey, session);
+
+      // TODO(maxim): use IJobUpdateKey to pulse when AURORA-1093 is addressed.
+      JobUpdatePulseStatus result = jobUpdateController.pulse(updateKey.getId());
+      return okResponse(Result.pulseJobUpdateResult(new PulseJobUpdateResult(result)));
+    } catch (AuthFailedException e) {
+      return errorResponse(AUTH_FAILED, e);
+    } catch (UpdateStateException e) {
+      return errorResponse(INVALID_REQUEST, e);
+    }
   }
 
   @Override
@@ -1570,6 +1588,38 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
     }
   }
 
+  private Optional<SessionContext> isUpdateCoordinator(SessionKey session) {
+    try {
+      return Optional.of(
+          sessionValidator.checkAuthorized(session, UPDATE_COORDINATOR, AuditCheck.NONE));
+    } catch (AuthFailedException e) {
+      return Optional.absent();
+    }
+  }
+
+  private SessionContext authorizeJobUpdateAction(IJobUpdateKey key, SessionKey session)
+      throws AuthFailedException {
+
+      Optional<SessionContext> maybeCoordinatorContext = isUpdateCoordinator(session);
+      SessionContext context;
+      if (maybeCoordinatorContext.isPresent()) {
+        context = maybeCoordinatorContext.get();
+      } else {
+        context = sessionValidator.checkAuthenticated(
+            session,
+            ImmutableSet.of(key.getJob().getRole()));
+      }
+
+    return context;
+  }
+
+  private static IJobUpdateKey validateJobUpdateKey(JobUpdateKey mutableKey) {
+    IJobUpdateKey key = IJobUpdateKey.build(mutableKey);
+    JobKeys.assertValid(key.getJob());
+    checkNotBlank(key.getId());
+    return key;
+  }
+
   @VisibleForTesting
   static Optional<String> transitionMessage(String user) {
     return Optional.of("Transition forced by " + user);
@@ -1630,6 +1680,9 @@ class SchedulerThriftInterface implements AuroraAdmin.Iface {
   static final String INVALID_MIN_WAIT_TO_RUNNING =
       "minWaitInInstanceRunningMs must be non-negative.";
 
+  @VisibleForTesting
+  static final String INVALID_PULSE_TIMEOUT = "blockIfNoPulsesAfterMs must be positive.";
+
   private static Response okEmptyResponse()  {
     return emptyResponse().setResponseCode(OK);
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/61e6c35f/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index a9966a8..ee32949 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -71,6 +71,8 @@ import org.apache.aurora.gen.JobSummaryResult;
 import org.apache.aurora.gen.JobUpdate;
 import org.apache.aurora.gen.JobUpdateDetails;
 import org.apache.aurora.gen.JobUpdateInstructions;
+import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.JobUpdatePulseStatus;
 import org.apache.aurora.gen.JobUpdateQuery;
 import org.apache.aurora.gen.JobUpdateRequest;
 import org.apache.aurora.gen.JobUpdateSettings;
@@ -81,6 +83,7 @@ import org.apache.aurora.gen.Lock;
 import org.apache.aurora.gen.LockKey;
 import org.apache.aurora.gen.MesosContainer;
 import org.apache.aurora.gen.PendingReason;
+import org.apache.aurora.gen.PulseJobUpdateResult;
 import org.apache.aurora.gen.QueryRecoveryResult;
 import org.apache.aurora.gen.Range;
 import org.apache.aurora.gen.ResourceAggregate;
@@ -129,6 +132,7 @@ import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
 import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
 import org.apache.aurora.scheduler.storage.entities.ILock;
@@ -150,6 +154,7 @@ import org.junit.Test;
 import static org.apache.aurora.auth.CapabilityValidator.Capability.MACHINE_MAINTAINER;
 import static org.apache.aurora.auth.CapabilityValidator.Capability.PROVISIONER;
 import static org.apache.aurora.auth.CapabilityValidator.Capability.ROOT;
+import static org.apache.aurora.auth.CapabilityValidator.Capability.UPDATE_COORDINATOR;
 import static org.apache.aurora.auth.SessionValidator.SessionContext;
 import static org.apache.aurora.gen.LockValidation.CHECKED;
 import static org.apache.aurora.gen.LockValidation.UNCHECKED;
@@ -203,6 +208,8 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   private static final String TASK_ID = "task_id";
   private static final String UPDATE_ID = "82d6d790-3212-11e3-aa6e-0800200c9a74";
   private static final UUID UU_ID = UUID.fromString(UPDATE_ID);
+  private static final IJobUpdateKey UPDATE_KEY =
+      IJobUpdateKey.build(new JobUpdateKey(JOB_KEY.newBuilder(), UPDATE_ID));
 
   private static final IResourceAggregate QUOTA =
       IResourceAggregate.build(new ResourceAggregate(10.0, 1024, 2048));
@@ -2748,6 +2755,18 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   }
 
   @Test
+  public void testStartUpdateFailsInvalidPulseTimeout() throws Exception {
+    control.replay();
+
+    JobUpdateRequest updateRequest = buildServiceJobUpdateRequest();
+    updateRequest.getSettings().setBlockIfNoPulsesAfterMs(-1);
+
+    assertEquals(
+        invalidResponse(SchedulerThriftInterface.INVALID_PULSE_TIMEOUT),
+        thrift.startJobUpdate(updateRequest, SESSION));
+  }
+
+  @Test
   public void testStartUpdateFailsAuth() throws Exception {
     JobUpdateRequest request = buildServiceJobUpdateRequest(populatedTask());
     expectAuth(ROLE, false);
@@ -3027,12 +3046,48 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
   }
 
   @Test
-  public void testPulseJobUpdate() throws Exception {
+  public void testPulseJobUpdatePulsedAsCoordinator() throws Exception {
+    expectAuth(UPDATE_COORDINATOR, true);
+    expect(jobUpdateController.pulse(UPDATE_ID)).andReturn(JobUpdatePulseStatus.OK);
+
     control.replay();
 
     assertEquals(
-        errorResponse(SchedulerThriftInterface.NOT_IMPLEMENTED_MESSAGE),
-        thrift.pulseJobUpdate("update", SESSION));
+        okResponse(Result.pulseJobUpdateResult(new PulseJobUpdateResult(JobUpdatePulseStatus.OK))),
+        thrift.pulseJobUpdate(UPDATE_KEY.newBuilder(), SESSION));
+  }
+
+  @Test
+  public void testPulseJobUpdatePulsedAsUser() throws Exception {
+    expectAuth(UPDATE_COORDINATOR, false);
+    expectAuth(ROLE, true);
+    expect(jobUpdateController.pulse(UPDATE_ID)).andReturn(JobUpdatePulseStatus.OK);
+
+    control.replay();
+
+    assertEquals(
+        okResponse(Result.pulseJobUpdateResult(new PulseJobUpdateResult(JobUpdatePulseStatus.OK))),
+        thrift.pulseJobUpdate(UPDATE_KEY.newBuilder(), SESSION));
+  }
+
+  @Test
+  public void testPulseJobUpdateFails() throws Exception {
+    expectAuth(UPDATE_COORDINATOR, true);
+    expect(jobUpdateController.pulse(UPDATE_ID)).andThrow(new UpdateStateException("failure"));
+
+    control.replay();
+
+    assertResponse(INVALID_REQUEST, thrift.pulseJobUpdate(UPDATE_KEY.newBuilder(), SESSION));
+  }
+
+  @Test
+  public void testPulseJobUpdateFailsAuth() throws Exception {
+    expectAuth(UPDATE_COORDINATOR, false);
+    expectAuth(ROLE, false);
+
+    control.replay();
+
+    assertResponse(AUTH_FAILED, thrift.pulseJobUpdate(UPDATE_KEY.newBuilder(), SESSION));
   }
 
   private static JobConfiguration makeProdJob() {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/61e6c35f/src/test/java/org/apache/aurora/scheduler/thrift/aop/ForwardingThrift.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/aop/ForwardingThrift.java b/src/test/java/org/apache/aurora/scheduler/thrift/aop/ForwardingThrift.java
index 4bf6392..bc26c4c 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/aop/ForwardingThrift.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/aop/ForwardingThrift.java
@@ -20,6 +20,7 @@ import org.apache.aurora.gen.AuroraAdmin;
 import org.apache.aurora.gen.Hosts;
 import org.apache.aurora.gen.JobConfiguration;
 import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.JobUpdateKey;
 import org.apache.aurora.gen.JobUpdateQuery;
 import org.apache.aurora.gen.JobUpdateRequest;
 import org.apache.aurora.gen.Lock;
@@ -285,8 +286,8 @@ abstract class ForwardingThrift implements AuroraAdmin.Iface {
   }
 
   @Override
-  public Response pulseJobUpdate(String updateId, SessionKey session) throws TException {
-    return delegate.pulseJobUpdate(updateId, session);
+  public Response pulseJobUpdate(JobUpdateKey key, SessionKey session) throws TException
{
+    return delegate.pulseJobUpdate(key, session);
   }
 
   @Override


Mime
View raw message