Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BB36A18ECE for ; Fri, 18 Dec 2015 11:55:40 +0000 (UTC) Received: (qmail 86137 invoked by uid 500); 18 Dec 2015 11:55:40 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 86094 invoked by uid 500); 18 Dec 2015 11:55:40 -0000 Mailing-List: contact commits-help@falcon.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.apache.org Delivered-To: mailing list commits@falcon.apache.org Received: (qmail 86085 invoked by uid 99); 18 Dec 2015 11:55:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Dec 2015 11:55:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 747F2DFF12; Fri, 18 Dec 2015 11:55:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pallavi@apache.org To: commits@falcon.apache.org Message-Id: <87a63f34053449669fb08012337f21e4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: falcon git commit: FALCON-1639 Implement update feature for native scheduler Date: Fri, 18 Dec 2015 11:55:40 +0000 (UTC) Repository: falcon Updated Branches: refs/heads/master 10fcb9153 -> 4ba652f54 FALCON-1639 Implement update feature for native scheduler Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/4ba652f5 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/4ba652f5 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/4ba652f5 Branch: refs/heads/master Commit: 4ba652f54bdda922f019b8ee3541a2edf4def6b7 Parents: 10fcb91 Author: Pallavi Rao Authored: Fri Dec 18 17:25:01 2015 +0530 Committer: Pallavi Rao Committed: Fri Dec 18 17:25:01 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/falcon/execution/EntityExecutor.java | 7 ++ .../falcon/execution/NotificationHandler.java | 2 +- .../falcon/execution/ProcessExecutor.java | 76 ++++++++++++++++---- .../service/impl/SchedulerService.java | 30 ++++++-- .../org/apache/falcon/predicate/Predicate.java | 6 +- .../org/apache/falcon/state/InstanceState.java | 2 + .../workflow/engine/FalconWorkflowEngine.java | 30 +++++++- .../falcon/workflow/engine/OozieDAGEngine.java | 1 + .../execution/FalconExecutionServiceTest.java | 34 +++++++++ 10 files changed, 164 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/4ba652f5/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 10ac338..2673669 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,8 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + FALCON-1639 Implement update feature for native scheduler (Pallavi Rao) + FALCON-1636 Add Rerun API In Falcon Native Scheduler(Pavan Kumar Kolamuri via Ajay Yadava) FALCON-1562 Documentation for enabling native scheduler in falcon (Pallavi Rao) http://git-wip-us.apache.org/repos/asf/falcon/blob/4ba652f5/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java index bf70dca..d3f2d29 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java @@ -121,4 +121,11 @@ public abstract class EntityExecutor implements NotificationHandler, InstanceSta public PRIORITY getPriority() { return PRIORITY.MEDIUM; } + + /** + * Update the definition of the the entity and re-register to appropriate services. + * @param newEntity + * @throws FalconException + */ + public abstract void update(Entity newEntity) throws FalconException; } http://git-wip-us.apache.org/repos/asf/falcon/blob/4ba652f5/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java b/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java index 2f68ddb..7093cc8 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java @@ -29,7 +29,7 @@ public interface NotificationHandler { * When there are multiple notification handlers for the same event, * the priority determines which handler gets notified first. */ - enum PRIORITY {HIGH(10), MEDIUM(5), LOW(0); + enum PRIORITY {HIGH(0), MEDIUM(5), LOW(10); private final int priority; http://git-wip-us.apache.org/repos/asf/falcon/blob/4ba652f5/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java index e1ec1bd..40fe1b3 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java @@ -20,18 +20,22 @@ package org.apache.falcon.execution; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import java.util.concurrent.ExecutionException; import org.apache.falcon.FalconException; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.ProcessHelper; import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.entity.v0.process.Cluster; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.exception.InvalidStateTransitionException; +import org.apache.falcon.exception.StateStoreException; import org.apache.falcon.notification.service.NotificationServicesRegistry; import org.apache.falcon.notification.service.event.Event; import org.apache.falcon.notification.service.event.EventType; import org.apache.falcon.notification.service.event.JobCompletedEvent; import org.apache.falcon.notification.service.event.RerunEvent; +import org.apache.falcon.notification.service.event.JobScheduledEvent; import org.apache.falcon.notification.service.event.TimeElapsedEvent; import org.apache.falcon.notification.service.impl.AlarmService; import org.apache.falcon.notification.service.impl.JobCompletionService; @@ -53,7 +57,6 @@ import java.util.ArrayList; import java.util.Date; import java.util.Properties; import java.util.TimeZone; -import java.util.concurrent.ExecutionException; /** * This class is responsible for managing execution instances of a process. @@ -65,7 +68,7 @@ public class ProcessExecutor extends EntityExecutor { private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutor.class); protected LoadingCache instances; private Predicate triggerPredicate; - private final Process process; + private Process process; private final StateService stateService = StateService.get(); private final FalconExecutionService executionService = FalconExecutionService.get(); @@ -96,7 +99,7 @@ public class ProcessExecutor extends EntityExecutor { LOG.info("Loading instances for process {} from state store.", process.getName()); reloadInstances(); } - registerForNotifications(); + registerForNotifications(getLastInstanceTime()); } private void dryRun() throws FalconException { @@ -170,6 +173,16 @@ public class ProcessExecutor extends EntityExecutor { } } + // Returns last materialized instance's time. + private Date getLastInstanceTime() throws StateStoreException { + InstanceState instanceState = STATE_STORE.getLastExecutionInstance(process, cluster); + if (instanceState == null) { + return null; + } + return EntityUtil.getNextInstanceTime(instanceState.getInstance().getInstanceTime().toDate(), + EntityUtil.getFrequency(process), EntityUtil.getTimeZone(process), 1); + } + @Override public void resumeAll() throws FalconException { if (instances == null) { @@ -189,7 +202,7 @@ public class ProcessExecutor extends EntityExecutor { LOG.error("Instance suspend failed for : " + instance.getId(), e); } } - registerForNotifications(); + registerForNotifications(getLastInstanceTime()); // Some errors if (errMsg.length() != 0) { throw new FalconException("Some instances failed to resume : " + errMsg.toString()); @@ -286,15 +299,39 @@ public class ProcessExecutor extends EntityExecutor { } @Override + public void update(Entity newEntity) throws FalconException { + Date newEndTime = EntityUtil.getEndTime(newEntity, cluster); + if (newEndTime.before(new Date())) { + throw new FalconException("Entity's end time " + SchemaHelper.formatDateUTC(newEndTime) + + " is before current time. Entity can't be updated. Use remove and add"); + } + LOG.debug("Updating for cluster: {}, entity: {}", cluster, newEntity.toShortString()); + // Unregister from the service that causes an instance to trigger, + // so the new instances are triggered with the new definition. + switch(triggerPredicate.getType()) { + case TIME: + NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.TIME) + .unregister(executionService, getId()); + break; + default: + throw new FalconException("Internal Error : Wrong instance trigger type."); + } + // Update process + process = (Process) newEntity; + // Re-register with new start, end, frequency etc. + registerForNotifications(getLastInstanceTime()); + } + + @Override public Entity getEntity() { return process; } private ProcessExecutionInstance buildInstance(Event event) throws FalconException { - // If a time triggered instance, use nominal time from event + // If a time triggered instance, use instance time from event if (event.getType() == EventType.TIME_ELAPSED) { TimeElapsedEvent timeEvent = (TimeElapsedEvent) event; - LOG.debug("Creating a new process instance for nominal time {}.", timeEvent.getInstanceTime()); + LOG.debug("Creating a new process instance for instance time {}.", timeEvent.getInstanceTime()); return new ProcessExecutionInstance(process, timeEvent.getInstanceTime(), cluster); } else { return new ProcessExecutionInstance(process, DateTime.now(), cluster); @@ -332,11 +369,19 @@ public class ProcessExecutor extends EntityExecutor { ProcessExecutionInstance instance; try { switch (event.getType()) { - // TODO : Handle cases where scheduling fails. case JOB_SCHEDULED: instance = instances.get((InstanceID)event.getTarget()); instance.onEvent(event); - stateService.handleStateChange(instance, InstanceState.EVENT.SCHEDULE, this); + switch(((JobScheduledEvent)event).getStatus()) { + case SUCCESSFUL: + stateService.handleStateChange(instance, InstanceState.EVENT.SCHEDULE, this); + break; + case FAILED: + stateService.handleStateChange(instance, InstanceState.EVENT.FAIL, this); + break; + default: + throw new InvalidStateTransitionException("Invalid job scheduler status."); + } break; case JOB_COMPLETED: instance = instances.get((InstanceID)event.getTarget()); @@ -377,7 +422,7 @@ public class ProcessExecutor extends EntityExecutor { } } } catch (ExecutionException ee) { - throw new FalconException("Unable to cache execution instance", ee); + throw new FalconException("Unable to handle event for execution instance", ee); } } @@ -392,18 +437,15 @@ public class ProcessExecutor extends EntityExecutor { // Registers for all notifications that should trigger an instance. // Currently, only time based triggers are handled. - protected void registerForNotifications() throws FalconException { + protected void registerForNotifications(Date instanceTime) throws FalconException { AlarmService.AlarmRequestBuilder requestBuilder = (AlarmService.AlarmRequestBuilder) NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.TIME) .createRequestBuilder(executionService, getId()); Cluster processCluster = ProcessHelper.getCluster(process, cluster); - InstanceState instanceState = STATE_STORE.getLastExecutionInstance(process, cluster); - // If there are no instances, use process's start, else, use last materialized instance's nominal time - Date startTime = (instanceState == null) ? processCluster.getValidity().getStart() - : EntityUtil.getNextInstanceTime(instanceState.getInstance().getInstanceTime().toDate(), - EntityUtil.getFrequency(process), EntityUtil.getTimeZone(process), 1); + // If there are no instances, use process's start, else, use last materialized instance's time + Date startTime = (instanceTime == null) ? processCluster.getValidity().getStart() : instanceTime; Date endTime = processCluster.getValidity().getEnd(); // TODO : Handle cron based and calendar based time triggers // TODO : Set execution order details. @@ -460,6 +502,8 @@ public class ProcessExecutor extends EntityExecutor { @Override public void onSuspend(ExecutionInstance instance) throws FalconException { + NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE) + .unregister(executionService, instance.getId()); instances.invalidate(instance.getId()); } @@ -470,6 +514,8 @@ public class ProcessExecutor extends EntityExecutor { @Override public void onKill(ExecutionInstance instance) throws FalconException { + NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE) + .unregister(executionService, instance.getId()); instances.invalidate(instance.getId()); } http://git-wip-us.apache.org/repos/asf/falcon/blob/4ba652f5/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java index c524dfa..57a41c8 100644 --- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java +++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java @@ -167,7 +167,7 @@ public class SchedulerService implements FalconNotificationService, Notification @Override public void onRemoval(RemovalNotification> removalNotification) { // When instances are removed due to size... - // Ensure instances are persisted in state store and add to another list of awaited entities. + // Ensure instances are persisted in state store. if (removalNotification.wasEvicted()) { for (ExecutionInstance instance : removalNotification.getValue()) { InstanceState state = new InstanceState(instance); @@ -212,15 +212,22 @@ public class SchedulerService implements FalconNotificationService, Notification JobScheduleRequestBuilder requestBuilder = new JobScheduleRequestBuilder( handler, instance.getId()); requestBuilder.setInstance(instance); - InstanceRunner runner = new InstanceRunner(requestBuilder.build()); + //The update kicks in for new instances, but, when old waiting instances are + // scheduled and it retrieves the parallelism for entity definition, + // it will use the "new" parallelism (if the user has updated it). + // Since there is no versioning of entities yet, + // need to retrieve what was the parallelism when that instance was created. + Integer runParallel = (Integer)predicate.getClauseValue("parallelInstances"); + InstanceRunner runner = new InstanceRunner(requestBuilder.build(), runParallel); runQueue.execute(runner); instances.remove(instance.getInstanceSequence()); + break; } } } } } - } catch (Exception e) { + } catch (ExecutionException e) { throw new FalconException(e); } } @@ -249,10 +256,19 @@ public class SchedulerService implements FalconNotificationService, Notification private int allowedParallelInstances = 1; public InstanceRunner(JobScheduleNotificationRequest request) { + this(request, EntityUtil.getParallel(request.getInstance().getEntity())); + } + + /** + * @param request + * @param runParallel - concurrency at the time the Instance was run, + * coz., runParallel can be updated later by user. + */ + public InstanceRunner(JobScheduleNotificationRequest request, Integer runParallel) { this.request = request; this.instance = request.getInstance(); this.priority = getPriority(instance.getEntity()).getPriority(); - allowedParallelInstances = EntityUtil.getParallel(instance.getEntity()); + allowedParallelInstances = runParallel; } private EntityUtil.JOBPRIORITY getPriority(Entity entity) { @@ -302,7 +318,7 @@ public class SchedulerService implements FalconNotificationService, Notification try { notifyFailureEvent(request); } catch (FalconException fe) { - throw new RuntimeException("Unable to onEvent : " + request.getCallbackId(), fe); + throw new RuntimeException("Unable to invoke onEvent : " + request.getCallbackId(), fe); } } } @@ -328,7 +344,7 @@ public class SchedulerService implements FalconNotificationService, Notification EntityClusterID entityID = instance.getId().getEntityClusterID(); // Instance is awaiting scheduling conditions to be met. Add predicate to that effect. instance.getAwaitingPredicates().add(Predicate.createJobCompletionPredicate(request.getHandler(), - entityID)); + entityID, EntityUtil.getParallel(instance.getEntity()))); updateExecutorAwaitedInstances(entityID); LOG.debug("Schedule conditions not met for instance {}. Awaiting on {}", instance.getId(), entityID); @@ -354,7 +370,7 @@ public class SchedulerService implements FalconNotificationService, Notification for (ExecutionInstance execInstance : request.getDependencies()) { // Dependants should wait for this instance to complete. Add predicate to that effect. instance.getAwaitingPredicates().add(Predicate.createJobCompletionPredicate( - request.getHandler(), execInstance.getId())); + request.getHandler(), execInstance.getId(), EntityUtil.getParallel(instance.getEntity()))); updateExecutorAwaitedInstances(execInstance.getId().getEntityClusterID()); } return false; http://git-wip-us.apache.org/repos/asf/falcon/blob/4ba652f5/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java index c3db685..c7b4f12 100644 --- a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java +++ b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java @@ -172,12 +172,14 @@ public class Predicate implements Serializable { * * @param handler * @param id + * @param parallelInstances * @return */ - public static Predicate createJobCompletionPredicate(NotificationHandler handler, ID id) { + public static Predicate createJobCompletionPredicate(NotificationHandler handler, ID id, int parallelInstances) { return new Predicate(TYPE.JOB_COMPLETION) .addClause("instanceId", id.toString()) - .addClause("handler", handler.getClass().getName()); + .addClause("handler", handler.getClass().getName()) + .addClause("parallelInstances", parallelInstances); } /** http://git-wip-us.apache.org/repos/asf/falcon/blob/4ba652f5/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java index 27dd8d4..b862e4d 100644 --- a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java +++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java @@ -52,6 +52,8 @@ public class InstanceState implements StateMachine instances = new ArrayList<>(); + instances.add(STATE_STORE.getLastExecutionInstance(oldEntity, cluster)); + EXECUTION_SERVICE.getEntityExecutor(oldEntity, cluster).update(newEntity); + StringBuilder result = new StringBuilder(); + result.append(newEntity.toShortString()).append("/Effective Time: ") + .append(getEffectiveTime(newEntity, cluster, instances)); + return result.toString(); } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/4ba652f5/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java index 4786cc3..1425a97 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java @@ -372,6 +372,7 @@ public class OozieDAGEngine implements DAGEngine { public Properties getConfiguration(String externalID) throws DAGEngineException { Properties props = new Properties(); try { + switchUser(); WorkflowJob jobInfo = client.getJobInfo(externalID); Configuration conf = new Configuration(false); conf.addResource(new ByteArrayInputStream(jobInfo.getConf().getBytes())); http://git-wip-us.apache.org/repos/asf/falcon/blob/4ba652f5/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java index 0ddf895..d66972c 100644 --- a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java +++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java @@ -23,6 +23,7 @@ import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.entity.v0.process.Input; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.notification.service.NotificationServicesRegistry; import org.apache.falcon.notification.service.event.DataEvent; @@ -34,6 +35,7 @@ import org.apache.falcon.notification.service.impl.AlarmService; import org.apache.falcon.notification.service.impl.DataAvailabilityService; import org.apache.falcon.notification.service.impl.JobCompletionService; import org.apache.falcon.notification.service.impl.SchedulerService; +import org.apache.falcon.notification.service.request.AlarmRequest; import org.apache.falcon.service.Services; import org.apache.falcon.state.AbstractSchedulerTestBase; import org.apache.falcon.state.EntityClusterID; @@ -52,6 +54,7 @@ import org.apache.hadoop.fs.Path; import org.apache.oozie.client.OozieClientException; import org.apache.oozie.client.WorkflowJob; import org.joda.time.DateTime; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -557,6 +560,37 @@ public class FalconExecutionServiceTest extends AbstractSchedulerTestBase { }; } + @Test + public void testUpdate() throws Exception { + storeEntity(EntityType.PROCESS, "summarize10"); + Process process = getStore().get(EntityType.PROCESS, "summarize10"); + Assert.assertNotNull(process); + EntityID processKey = new EntityID(process); + String clusterName = dfsCluster.getCluster().getName(); + Date now = new Date(); + process.getClusters().getClusters().get(0).getValidity().setStart(now); + + // Schedule a process + Assert.assertEquals(stateStore.getEntity(processKey).getCurrentState(), EntityState.STATE.SUBMITTED); + FalconExecutionService.get().schedule(process); + Assert.assertEquals(stateStore.getEntity(processKey).getCurrentState(), EntityState.STATE.SCHEDULED); + // Simulate a time notification + Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName); + FalconExecutionService.get().onEvent(event); + + // Update the process with a new dummy input + Process newProcess = (Process) process.copy(); + newProcess.getInputs().getInputs().add(new Input()); + EntityExecutor executor = FalconExecutionService.get().getEntityExecutor(process, clusterName); + executor.update(newProcess); + EntityClusterID executorID = new EntityClusterID(process, clusterName); + Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), executorID); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(AlarmRequest.class); + Mockito.verify(mockTimeService, Mockito.atLeast(2)).register(argumentCaptor.capture()); + // The second time registration after update should be after now. + Assert.assertTrue(argumentCaptor.getValue().getStartTime().isAfter(now.getTime())); + } + private Event createEvent(NotificationServicesRegistry.SERVICE type, Process process, String cluster) { EntityClusterID id = new EntityClusterID(process, cluster); switch (type) {