falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject falcon git commit: FALCON-1639 Implement update feature for native scheduler
Date Fri, 18 Dec 2015 11:55:40 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 10fcb9153 -> 4ba652f54


FALCON-1639 Implement update feature for native scheduler


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/4ba652f5
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/4ba652f5
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/4ba652f5

Branch: refs/heads/master
Commit: 4ba652f54bdda922f019b8ee3541a2edf4def6b7
Parents: 10fcb91
Author: Pallavi Rao <pallavi.rao@inmobi.com>
Authored: Fri Dec 18 17:25:01 2015 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Fri Dec 18 17:25:01 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/falcon/execution/EntityExecutor.java |  7 ++
 .../falcon/execution/NotificationHandler.java   |  2 +-
 .../falcon/execution/ProcessExecutor.java       | 76 ++++++++++++++++----
 .../service/impl/SchedulerService.java          | 30 ++++++--
 .../org/apache/falcon/predicate/Predicate.java  |  6 +-
 .../org/apache/falcon/state/InstanceState.java  |  2 +
 .../workflow/engine/FalconWorkflowEngine.java   | 30 +++++++-
 .../falcon/workflow/engine/OozieDAGEngine.java  |  1 +
 .../execution/FalconExecutionServiceTest.java   | 34 +++++++++
 10 files changed, 164 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/4ba652f5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 10ac338..2673669 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-1639 Implement update feature for native scheduler (Pallavi Rao)
+
     FALCON-1636 Add Rerun API In Falcon Native Scheduler(Pavan Kumar Kolamuri via Ajay Yadava)
 
     FALCON-1562 Documentation for enabling native scheduler in falcon (Pallavi Rao)

http://git-wip-us.apache.org/repos/asf/falcon/blob/4ba652f5/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
index bf70dca..d3f2d29 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
@@ -121,4 +121,11 @@ public abstract class EntityExecutor implements NotificationHandler,
InstanceSta
     public PRIORITY getPriority() {
         return PRIORITY.MEDIUM;
     }
+
+    /**
+     * Update the definition of the the entity and re-register to appropriate services.
+     * @param newEntity
+     * @throws FalconException
+     */
+    public abstract void update(Entity newEntity) throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/4ba652f5/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java
b/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java
index 2f68ddb..7093cc8 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java
@@ -29,7 +29,7 @@ public interface NotificationHandler {
      * When there are multiple notification handlers for the same event,
      * the priority determines which handler gets notified first.
      */
-    enum PRIORITY {HIGH(10), MEDIUM(5), LOW(0);
+    enum PRIORITY {HIGH(0), MEDIUM(5), LOW(10);
 
 
         private final int priority;

http://git-wip-us.apache.org/repos/asf/falcon/blob/4ba652f5/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
index e1ec1bd..40fe1b3 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
@@ -20,18 +20,22 @@ package org.apache.falcon.execution;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import java.util.concurrent.ExecutionException;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.exception.InvalidStateTransitionException;
+import org.apache.falcon.exception.StateStoreException;
 import org.apache.falcon.notification.service.NotificationServicesRegistry;
 import org.apache.falcon.notification.service.event.Event;
 import org.apache.falcon.notification.service.event.EventType;
 import org.apache.falcon.notification.service.event.JobCompletedEvent;
 import org.apache.falcon.notification.service.event.RerunEvent;
+import org.apache.falcon.notification.service.event.JobScheduledEvent;
 import org.apache.falcon.notification.service.event.TimeElapsedEvent;
 import org.apache.falcon.notification.service.impl.AlarmService;
 import org.apache.falcon.notification.service.impl.JobCompletionService;
@@ -53,7 +57,6 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.Properties;
 import java.util.TimeZone;
-import java.util.concurrent.ExecutionException;
 
 /**
  * This class is responsible for managing execution instances of a process.
@@ -65,7 +68,7 @@ public class ProcessExecutor extends EntityExecutor {
     private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutor.class);
     protected LoadingCache<InstanceID, ProcessExecutionInstance> instances;
     private Predicate triggerPredicate;
-    private final Process process;
+    private Process process;
     private final StateService stateService = StateService.get();
     private final FalconExecutionService executionService = FalconExecutionService.get();
 
@@ -96,7 +99,7 @@ public class ProcessExecutor extends EntityExecutor {
             LOG.info("Loading instances for process {} from state store.", process.getName());
             reloadInstances();
         }
-        registerForNotifications();
+        registerForNotifications(getLastInstanceTime());
     }
 
     private void dryRun() throws FalconException {
@@ -170,6 +173,16 @@ public class ProcessExecutor extends EntityExecutor {
         }
     }
 
+    //  Returns last materialized instance's time.
+    private Date getLastInstanceTime() throws StateStoreException {
+        InstanceState instanceState = STATE_STORE.getLastExecutionInstance(process, cluster);
+        if (instanceState == null) {
+            return null;
+        }
+        return EntityUtil.getNextInstanceTime(instanceState.getInstance().getInstanceTime().toDate(),
+                EntityUtil.getFrequency(process), EntityUtil.getTimeZone(process), 1);
+    }
+
     @Override
     public void resumeAll() throws FalconException {
         if (instances == null) {
@@ -189,7 +202,7 @@ public class ProcessExecutor extends EntityExecutor {
                 LOG.error("Instance suspend failed for : " + instance.getId(), e);
             }
         }
-        registerForNotifications();
+        registerForNotifications(getLastInstanceTime());
         // Some errors
         if (errMsg.length() != 0) {
             throw new FalconException("Some instances failed to resume : " + errMsg.toString());
@@ -286,15 +299,39 @@ public class ProcessExecutor extends EntityExecutor {
     }
 
     @Override
+    public void update(Entity newEntity) throws FalconException {
+        Date newEndTime = EntityUtil.getEndTime(newEntity, cluster);
+        if (newEndTime.before(new Date())) {
+            throw new FalconException("Entity's end time " + SchemaHelper.formatDateUTC(newEndTime)
+                    + " is before current time. Entity can't be updated. Use remove and add");
+        }
+        LOG.debug("Updating for cluster: {}, entity: {}", cluster, newEntity.toShortString());
+        // Unregister from the service that causes an instance to trigger,
+        // so the new instances are triggered with the new definition.
+        switch(triggerPredicate.getType()) {
+        case TIME:
+            NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.TIME)
+                    .unregister(executionService, getId());
+            break;
+        default:
+            throw new FalconException("Internal Error : Wrong instance trigger type.");
+        }
+        // Update process
+        process = (Process) newEntity;
+        // Re-register with new start, end, frequency etc.
+        registerForNotifications(getLastInstanceTime());
+    }
+
+    @Override
     public Entity getEntity() {
         return process;
     }
 
     private ProcessExecutionInstance buildInstance(Event event) throws FalconException {
-        // If a time triggered instance, use nominal time from event
+        // If a time triggered instance, use instance time from event
         if (event.getType() == EventType.TIME_ELAPSED) {
             TimeElapsedEvent timeEvent = (TimeElapsedEvent) event;
-            LOG.debug("Creating a new process instance for nominal time {}.", timeEvent.getInstanceTime());
+            LOG.debug("Creating a new process instance for instance time {}.", timeEvent.getInstanceTime());
             return new ProcessExecutionInstance(process, timeEvent.getInstanceTime(), cluster);
         } else {
             return new ProcessExecutionInstance(process, DateTime.now(), cluster);
@@ -332,11 +369,19 @@ public class ProcessExecutor extends EntityExecutor {
         ProcessExecutionInstance instance;
         try {
             switch (event.getType()) {
-            // TODO : Handle cases where scheduling fails.
             case JOB_SCHEDULED:
                 instance = instances.get((InstanceID)event.getTarget());
                 instance.onEvent(event);
-                stateService.handleStateChange(instance, InstanceState.EVENT.SCHEDULE, this);
+                switch(((JobScheduledEvent)event).getStatus()) {
+                case SUCCESSFUL:
+                    stateService.handleStateChange(instance, InstanceState.EVENT.SCHEDULE,
this);
+                    break;
+                case FAILED:
+                    stateService.handleStateChange(instance, InstanceState.EVENT.FAIL, this);
+                    break;
+                default:
+                    throw new InvalidStateTransitionException("Invalid job scheduler status.");
+                }
                 break;
             case JOB_COMPLETED:
                 instance = instances.get((InstanceID)event.getTarget());
@@ -377,7 +422,7 @@ public class ProcessExecutor extends EntityExecutor {
                 }
             }
         } catch (ExecutionException ee) {
-            throw new FalconException("Unable to cache execution instance", ee);
+            throw new FalconException("Unable to handle event for execution instance", ee);
         }
     }
 
@@ -392,18 +437,15 @@ public class ProcessExecutor extends EntityExecutor {
 
     // Registers for all notifications that should trigger an instance.
     // Currently, only time based triggers are handled.
-    protected void registerForNotifications() throws FalconException {
+    protected void registerForNotifications(Date instanceTime) throws FalconException {
         AlarmService.AlarmRequestBuilder requestBuilder =
                 (AlarmService.AlarmRequestBuilder)
                 NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.TIME)
                         .createRequestBuilder(executionService, getId());
         Cluster processCluster = ProcessHelper.getCluster(process, cluster);
 
-        InstanceState instanceState = STATE_STORE.getLastExecutionInstance(process, cluster);
-        // If there are no instances, use process's start, else, use last materialized instance's
nominal time
-        Date startTime = (instanceState == null) ? processCluster.getValidity().getStart()
-                : EntityUtil.getNextInstanceTime(instanceState.getInstance().getInstanceTime().toDate(),
-                    EntityUtil.getFrequency(process), EntityUtil.getTimeZone(process), 1);
+        // If there are no instances, use process's start, else, use last materialized instance's
time
+        Date startTime = (instanceTime == null) ? processCluster.getValidity().getStart()
: instanceTime;
         Date endTime = processCluster.getValidity().getEnd();
         // TODO : Handle cron based and calendar based time triggers
         // TODO : Set execution order details.
@@ -460,6 +502,8 @@ public class ProcessExecutor extends EntityExecutor {
 
     @Override
     public void onSuspend(ExecutionInstance instance) throws FalconException {
+        NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE)
+                .unregister(executionService, instance.getId());
         instances.invalidate(instance.getId());
     }
 
@@ -470,6 +514,8 @@ public class ProcessExecutor extends EntityExecutor {
 
     @Override
     public void onKill(ExecutionInstance instance) throws FalconException {
+        NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE)
+                .unregister(executionService, instance.getId());
         instances.invalidate(instance.getId());
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/4ba652f5/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
index c524dfa..57a41c8 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
@@ -167,7 +167,7 @@ public class SchedulerService implements FalconNotificationService, Notification
     @Override
     public void onRemoval(RemovalNotification<ID, List<ExecutionInstance>> removalNotification)
{
         // When instances are removed due to size...
-        // Ensure instances are persisted in state store and add to another list of awaited
entities.
+        // Ensure instances are persisted in state store.
         if (removalNotification.wasEvicted()) {
             for (ExecutionInstance instance : removalNotification.getValue()) {
                 InstanceState state = new InstanceState(instance);
@@ -212,15 +212,22 @@ public class SchedulerService implements FalconNotificationService,
Notification
                                     JobScheduleRequestBuilder requestBuilder = new JobScheduleRequestBuilder(
                                             handler, instance.getId());
                                     requestBuilder.setInstance(instance);
-                                    InstanceRunner runner = new InstanceRunner(requestBuilder.build());
+                                    //The update kicks in for new instances, but, when old
waiting instances are
+                                    // scheduled and it retrieves the parallelism for entity
definition,
+                                    // it will use the "new" parallelism (if the user has
updated it).
+                                    // Since there is no versioning of entities yet,
+                                    // need to retrieve what was the parallelism when that
instance was created.
+                                    Integer runParallel = (Integer)predicate.getClauseValue("parallelInstances");
+                                    InstanceRunner runner = new InstanceRunner(requestBuilder.build(),
runParallel);
                                     runQueue.execute(runner);
                                     instances.remove(instance.getInstanceSequence());
+                                    break;
                                 }
                             }
                         }
                     }
                 }
-            } catch (Exception e) {
+            } catch (ExecutionException e) {
                 throw new FalconException(e);
             }
         }
@@ -249,10 +256,19 @@ public class SchedulerService implements FalconNotificationService,
Notification
         private int allowedParallelInstances = 1;
 
         public InstanceRunner(JobScheduleNotificationRequest request) {
+            this(request, EntityUtil.getParallel(request.getInstance().getEntity()));
+        }
+
+        /**
+         * @param request
+         * @param runParallel - concurrency at the time the Instance was run,
+         *                    coz., runParallel can be updated later by user.
+         */
+        public InstanceRunner(JobScheduleNotificationRequest request, Integer runParallel)
{
             this.request = request;
             this.instance = request.getInstance();
             this.priority = getPriority(instance.getEntity()).getPriority();
-            allowedParallelInstances = EntityUtil.getParallel(instance.getEntity());
+            allowedParallelInstances = runParallel;
         }
 
         private EntityUtil.JOBPRIORITY getPriority(Entity entity) {
@@ -302,7 +318,7 @@ public class SchedulerService implements FalconNotificationService, Notification
                 try {
                     notifyFailureEvent(request);
                 } catch (FalconException fe) {
-                    throw new RuntimeException("Unable to onEvent : " + request.getCallbackId(),
fe);
+                    throw new RuntimeException("Unable to invoke onEvent : " + request.getCallbackId(),
fe);
                 }
             }
         }
@@ -328,7 +344,7 @@ public class SchedulerService implements FalconNotificationService, Notification
                     EntityClusterID entityID = instance.getId().getEntityClusterID();
                     // Instance is awaiting scheduling conditions to be met. Add predicate
to that effect.
                     instance.getAwaitingPredicates().add(Predicate.createJobCompletionPredicate(request.getHandler(),
-                            entityID));
+                            entityID, EntityUtil.getParallel(instance.getEntity())));
                     updateExecutorAwaitedInstances(entityID);
                     LOG.debug("Schedule conditions not met for instance {}. Awaiting on {}",
                             instance.getId(), entityID);
@@ -354,7 +370,7 @@ public class SchedulerService implements FalconNotificationService, Notification
             for (ExecutionInstance execInstance : request.getDependencies()) {
                 // Dependants should wait for this instance to complete. Add predicate to
that effect.
                 instance.getAwaitingPredicates().add(Predicate.createJobCompletionPredicate(
-                        request.getHandler(), execInstance.getId()));
+                        request.getHandler(), execInstance.getId(), EntityUtil.getParallel(instance.getEntity())));
                 updateExecutorAwaitedInstances(execInstance.getId().getEntityClusterID());
             }
             return false;

http://git-wip-us.apache.org/repos/asf/falcon/blob/4ba652f5/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
index c3db685..c7b4f12 100644
--- a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
+++ b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
@@ -172,12 +172,14 @@ public class Predicate implements Serializable {
      *
      * @param handler
      * @param id
+     * @param parallelInstances
      * @return
      */
-    public static Predicate createJobCompletionPredicate(NotificationHandler handler, ID
id) {
+    public static Predicate createJobCompletionPredicate(NotificationHandler handler, ID
id, int parallelInstances) {
         return new Predicate(TYPE.JOB_COMPLETION)
                 .addClause("instanceId", id.toString())
-                .addClause("handler", handler.getClass().getName());
+                .addClause("handler", handler.getClass().getName())
+                .addClause("parallelInstances", parallelInstances);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/falcon/blob/4ba652f5/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
index 27dd8d4..b862e4d 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
@@ -52,6 +52,8 @@ public class InstanceState implements StateMachine<InstanceState.STATE,
Instance
                     return this;
                 case EXTERNAL_TRIGGER:
                     return this;
+                case FAIL:
+                    return FAILED;
                 default:
                     throw new InvalidStateTransitionException("Event " + event.name() + "
not valid for state, "
                             + STATE.WAITING.name());

http://git-wip-us.apache.org/repos/asf/falcon/blob/4ba652f5/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index 6abc222..989c6e0 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -38,6 +38,7 @@ import org.apache.falcon.state.EntityState;
 import org.apache.falcon.state.InstanceState;
 import org.apache.falcon.state.store.AbstractStateStore;
 import org.apache.falcon.state.store.StateStore;
+import org.apache.falcon.update.UpdateHelper;
 import org.apache.falcon.util.DateUtil;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -360,7 +361,34 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
     @Override
     public String update(Entity oldEntity, Entity newEntity, String cluster, Boolean skipDryRun)
         throws FalconException {
-        throw new FalconException("Not yet Implemented");
+        org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
+                ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
+        boolean entityUpdated =
+                UpdateHelper.isEntityUpdated(oldEntity, newEntity, cluster,
+                        EntityUtil.getLatestStagingPath(clusterEntity, oldEntity));
+
+        if (!entityUpdated) {
+            throw new FalconException("No relevant updates detected in the new entity definition!");
+        }
+
+        Date oldEndTime = EntityUtil.getEndTime(oldEntity, cluster);
+        Date newEndTime = EntityUtil.getEndTime(newEntity, cluster);
+        if (newEndTime.before(DateUtil.now()) || newEndTime.before(oldEndTime)) {
+            throw new FalconException("New Entity's end time " + SchemaHelper.formatDateUTC(newEndTime)
+                    + " is before current time or before old end time. Entity can't be updated.");
+        }
+
+        // The steps required are the same as touch.
+        DAGEngineFactory.getDAGEngine(cluster).touch(newEntity, (skipDryRun == null) ? Boolean.FALSE
: skipDryRun);
+        // Additionally, update the executor.
+        // The update will kick in for new instances created and not for READY/WAITING instances,
as with Oozie.
+        Collection<InstanceState> instances = new ArrayList<>();
+        instances.add(STATE_STORE.getLastExecutionInstance(oldEntity, cluster));
+        EXECUTION_SERVICE.getEntityExecutor(oldEntity, cluster).update(newEntity);
+        StringBuilder result = new StringBuilder();
+        result.append(newEntity.toShortString()).append("/Effective Time: ")
+                .append(getEffectiveTime(newEntity, cluster, instances));
+        return result.toString();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/4ba652f5/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
index 4786cc3..1425a97 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
@@ -372,6 +372,7 @@ public class OozieDAGEngine implements DAGEngine {
     public Properties getConfiguration(String externalID) throws DAGEngineException {
         Properties props = new Properties();
         try {
+            switchUser();
             WorkflowJob jobInfo = client.getJobInfo(externalID);
             Configuration conf = new Configuration(false);
             conf.addResource(new ByteArrayInputStream(jobInfo.getConf().getBytes()));

http://git-wip-us.apache.org/repos/asf/falcon/blob/4ba652f5/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
index 0ddf895..d66972c 100644
--- a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
@@ -23,6 +23,7 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.notification.service.NotificationServicesRegistry;
 import org.apache.falcon.notification.service.event.DataEvent;
@@ -34,6 +35,7 @@ import org.apache.falcon.notification.service.impl.AlarmService;
 import org.apache.falcon.notification.service.impl.DataAvailabilityService;
 import org.apache.falcon.notification.service.impl.JobCompletionService;
 import org.apache.falcon.notification.service.impl.SchedulerService;
+import org.apache.falcon.notification.service.request.AlarmRequest;
 import org.apache.falcon.service.Services;
 import org.apache.falcon.state.AbstractSchedulerTestBase;
 import org.apache.falcon.state.EntityClusterID;
@@ -52,6 +54,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.oozie.client.OozieClientException;
 import org.apache.oozie.client.WorkflowJob;
 import org.joda.time.DateTime;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -557,6 +560,37 @@ public class FalconExecutionServiceTest extends AbstractSchedulerTestBase
{
         };
     }
 
+    @Test
+    public void testUpdate() throws Exception {
+        storeEntity(EntityType.PROCESS, "summarize10");
+        Process process = getStore().get(EntityType.PROCESS, "summarize10");
+        Assert.assertNotNull(process);
+        EntityID processKey = new EntityID(process);
+        String clusterName = dfsCluster.getCluster().getName();
+        Date now = new Date();
+        process.getClusters().getClusters().get(0).getValidity().setStart(now);
+
+        // Schedule a process
+        Assert.assertEquals(stateStore.getEntity(processKey).getCurrentState(), EntityState.STATE.SUBMITTED);
+        FalconExecutionService.get().schedule(process);
+        Assert.assertEquals(stateStore.getEntity(processKey).getCurrentState(), EntityState.STATE.SCHEDULED);
+        // Simulate a time notification
+        Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+        FalconExecutionService.get().onEvent(event);
+
+        // Update the process with a new dummy input
+        Process newProcess = (Process) process.copy();
+        newProcess.getInputs().getInputs().add(new Input());
+        EntityExecutor executor = FalconExecutionService.get().getEntityExecutor(process,
clusterName);
+        executor.update(newProcess);
+        EntityClusterID executorID = new EntityClusterID(process, clusterName);
+        Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), executorID);
+        ArgumentCaptor<AlarmRequest> argumentCaptor = ArgumentCaptor.forClass(AlarmRequest.class);
+        Mockito.verify(mockTimeService, Mockito.atLeast(2)).register(argumentCaptor.capture());
+        // The second time registration after update should be after now.
+        Assert.assertTrue(argumentCaptor.getValue().getStartTime().isAfter(now.getTime()));
+    }
+
     private Event createEvent(NotificationServicesRegistry.SERVICE type, Process process,
String cluster) {
         EntityClusterID id = new EntityClusterID(process, cluster);
         switch (type) {


Mime
View raw message