falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject falcon git commit: FALCON-1636 Add Rerun API In Falcon Native Scheduler. Contributed by Pavan Kumar Kolamuri.
Date Fri, 18 Dec 2015 10:38:40 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 62fb403be -> 10fcb9153


FALCON-1636 Add Rerun API In Falcon Native Scheduler. Contributed by Pavan Kumar Kolamuri.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/10fcb915
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/10fcb915
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/10fcb915

Branch: refs/heads/master
Commit: 10fcb91537c1e4589578bf14ccc6eb47c9dda173
Parents: 62fb403
Author: Ajay Yadava <ajaynsit@gmail.com>
Authored: Fri Dec 18 15:36:41 2015 +0530
Committer: Ajay Yadava <ajaynsit@gmail.com>
Committed: Fri Dec 18 15:36:41 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/falcon/resource/InstancesResult.java |  2 +-
 .../apache/falcon/execution/EntityExecutor.java |  8 ++++
 .../falcon/execution/ExecutionInstance.java     | 11 +++++
 .../execution/ProcessExecutionInstance.java     |  7 +--
 .../falcon/execution/ProcessExecutor.java       | 40 +++++++++++++++--
 .../notification/service/event/EventType.java   |  4 +-
 .../notification/service/event/RerunEvent.java  | 45 ++++++++++++++++++++
 .../service/impl/SchedulerService.java          | 23 ++++++++--
 .../org/apache/falcon/predicate/Predicate.java  | 21 ++++++++-
 .../org/apache/falcon/state/InstanceState.java  | 20 ++++++++-
 .../state/InstanceStateChangeHandler.java       |  8 ++++
 .../org/apache/falcon/state/StateService.java   |  7 +++
 .../falcon/state/store/InMemoryStateStore.java  | 14 ++++++
 .../falcon/state/store/InstanceStateStore.java  |  9 ++++
 .../falcon/state/store/jdbc/BeanMapperUtil.java | 33 ++++++++++++++
 .../falcon/state/store/jdbc/InstanceBean.java   | 15 ++++++-
 .../falcon/state/store/jdbc/JDBCStateStore.java | 31 ++++++++++++++
 .../falcon/workflow/engine/DAGEngine.java       |  4 +-
 .../workflow/engine/FalconWorkflowEngine.java   | 28 ++++++++++--
 .../falcon/workflow/engine/OozieDAGEngine.java  | 35 ++++++++++++---
 .../apache/falcon/execution/MockDAGEngine.java  |  6 ++-
 .../state/service/store/TestJDBCStateStore.java | 34 +++++++++++++++
 23 files changed, 379 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 45d2e7c..10ac338 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-1636 Add Rerun API In Falcon Native Scheduler(Pavan Kumar Kolamuri via Ajay Yadava)
+
     FALCON-1562 Documentation for enabling native scheduler in falcon (Pallavi Rao)
 
     FALCON-1512 Implement touch feature for native scheduler (Pallavi Rao)

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
index e05eeeb..e12c083 100644
--- a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
+++ b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
@@ -34,7 +34,7 @@ public class InstancesResult extends APIResult {
      * Workflow status as being set in result object.
      */
     public static enum WorkflowStatus {
-        WAITING, RUNNING, SUSPENDED, KILLED, FAILED, SUCCEEDED, ERROR, SKIPPED, UNDEFINED
+        WAITING, RUNNING, SUSPENDED, KILLED, FAILED, SUCCEEDED, ERROR, SKIPPED, UNDEFINED, READY
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
index c9c0f42..bf70dca 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
@@ -25,6 +25,8 @@ import org.apache.falcon.state.InstanceStateChangeHandler;
 import org.apache.falcon.state.store.AbstractStateStore;
 import org.apache.falcon.state.store.StateStore;
 
+import java.util.Properties;
+
 /**
  * This class is responsible for creation of execution instances for a given entity.
  * An execution instance is created upon receipt of a "trigger event".
@@ -98,6 +100,12 @@ public abstract class EntityExecutor implements NotificationHandler, InstanceSta
     public abstract void kill(ExecutionInstance instance) throws FalconException;
 
     /**
+     * Reruns a specified execution instance.It relies on the DAGEngine to maintain a details of
+     * reruns and no rerun information is stored in Falcon.
+     */
+    public abstract void rerun(ExecutionInstance instance, Properties props, boolean isForced) throws FalconException;
+
+    /**
      * @return The entity
      */
     public abstract Entity getEntity();

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
index 3cc8a25..cadbfb6 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
@@ -26,6 +26,7 @@ import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
 import java.util.List;
+import java.util.Properties;
 
 /**
  * Represents an execution instance of an entity.
@@ -43,6 +44,7 @@ public abstract class ExecutionInstance implements NotificationHandler {
     private final DateTime creationTime;
     private DateTime actualStart;
     private DateTime actualEnd;
+    private Properties properties;
     protected static final DateTimeZone UTC = DateTimeZone.UTC;
 
     /**
@@ -148,6 +150,15 @@ public abstract class ExecutionInstance implements NotificationHandler {
         this.actualEnd = actualEnd;
     }
 
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
+
     /**
      * Creation time of an instance.
      * @return

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
index f3beabc..72e5558 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
@@ -187,9 +187,6 @@ public class ProcessExecutionInstance extends ExecutionInstance {
      * @return true when it is not already scheduled or is gated on some conditions.
      */
     public boolean isReady() {
-        if (getExternalID() != null) {
-            return false;
-        }
         if (awaitedPredicates.isEmpty()) {
             return true;
         } else {
@@ -324,4 +321,8 @@ public class ProcessExecutionInstance extends ExecutionInstance {
         NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.DATA)
                 .unregister(executionService, getId());
     }
+
+    public void rerun() throws FalconException {
+        registerForNotifications(false);
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
index e446069..e1ec1bd 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
@@ -31,6 +31,7 @@ import org.apache.falcon.notification.service.NotificationServicesRegistry;
 import org.apache.falcon.notification.service.event.Event;
 import org.apache.falcon.notification.service.event.EventType;
 import org.apache.falcon.notification.service.event.JobCompletedEvent;
+import org.apache.falcon.notification.service.event.RerunEvent;
 import org.apache.falcon.notification.service.event.TimeElapsedEvent;
 import org.apache.falcon.notification.service.impl.AlarmService;
 import org.apache.falcon.notification.service.impl.JobCompletionService;
@@ -43,13 +44,16 @@ import org.apache.falcon.state.InstanceState;
 import org.apache.falcon.state.StateService;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.apache.falcon.workflow.engine.FalconWorkflowEngine;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.Properties;
 import java.util.TimeZone;
+import java.util.concurrent.ExecutionException;
 
 /**
  * This class is responsible for managing execution instances of a process.
@@ -267,6 +271,21 @@ public class ProcessExecutor extends EntityExecutor {
     }
 
     @Override
+    public void rerun(ExecutionInstance instance, Properties props, boolean isForced) throws FalconException {
+        if (props == null) {
+            props = new Properties();
+        }
+        if (isForced) {
+            props.put(FalconWorkflowEngine.FALCON_FORCE_RERUN, "true");
+        }
+        props.put(FalconWorkflowEngine.FALCON_RERUN, "true");
+        instance.setProperties(props);
+        instances.put(new InstanceID(instance), (ProcessExecutionInstance) instance);
+        RerunEvent rerunEvent = new RerunEvent(instance.getId(), instance.getInstanceTime());
+        onEvent(rerunEvent);
+    }
+
+    @Override
     public Entity getEntity() {
         return process;
     }
@@ -311,7 +330,6 @@ public class ProcessExecutor extends EntityExecutor {
 
     private void handleEvent(Event event) throws FalconException {
         ProcessExecutionInstance instance;
-        InstanceID instanceID;
         try {
             switch (event.getType()) {
             // TODO : Handle cases where scheduling fails.
@@ -341,17 +359,24 @@ public class ProcessExecutor extends EntityExecutor {
                             "Job seems to be have been managed outside Falcon.");
                 }
                 break;
+            case RE_RUN:
+                instance = instances.get((InstanceID)event.getTarget());
+                stateService.handleStateChange(instance, InstanceState.EVENT.EXTERNAL_TRIGGER, this);
+                if (instance.isReady()) {
+                    stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this);
+                }
+                break;
             default:
                 if (isTriggerEvent(event)) {
                     instance = buildInstance(event);
                     stateService.handleStateChange(instance, InstanceState.EVENT.TRIGGER, this);
                     // This happens where are no conditions the instance is waiting on (for example, no data inputs).
-                    if (instance.isReady()) {
+                    if (!instance.isScheduled() && instance.isReady()) {
                         stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this);
                     }
                 }
             }
-        } catch (Exception ee) {
+        } catch (ExecutionException ee) {
             throw new FalconException("Unable to cache execution instance", ee);
         }
     }
@@ -397,7 +422,8 @@ public class ProcessExecutor extends EntityExecutor {
     private boolean shouldHandleEvent(Event event) {
         return event.getTarget().equals(id)
                 || event.getType() == EventType.JOB_COMPLETED
-                || event.getType() == EventType.JOB_SCHEDULED;
+                || event.getType() == EventType.JOB_SCHEDULED
+                || event.getType() == EventType.RE_RUN;
     }
 
     @Override
@@ -406,6 +432,12 @@ public class ProcessExecutor extends EntityExecutor {
     }
 
     @Override
+    public void onExternalTrigger(ExecutionInstance instance) throws FalconException {
+        instances.put(new InstanceID(instance), (ProcessExecutionInstance) instance);
+        ((ProcessExecutionInstance) instance).rerun();
+    }
+
+    @Override
     public void onConditionsMet(ExecutionInstance instance) throws FalconException {
         // Put process in run queue and register for notification
         SchedulerService.JobScheduleRequestBuilder requestBuilder = (SchedulerService.JobScheduleRequestBuilder)

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java
index 59f5cba..b2418ec 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java
@@ -25,6 +25,6 @@ public enum EventType {
         TIME_ELAPSED,
         DATA_AVAILABLE,
         JOB_COMPLETED,
-        JOB_SCHEDULED
-
+        JOB_SCHEDULED,
+        RE_RUN
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/notification/service/event/RerunEvent.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/RerunEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/RerunEvent.java
new file mode 100644
index 0000000..67b4b50
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/RerunEvent.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.event;
+
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceID;
+import org.joda.time.DateTime;
+
+/**
+ * Rerun Event used while rerunning an instance.
+ */
+public class RerunEvent extends Event {
+    private DateTime instanceTime;
+    private final InstanceID callbackID;
+
+    public DateTime getInstanceTime() {
+        return instanceTime;
+    }
+
+    public RerunEvent(InstanceID callbackID, DateTime instanceTime) {
+        this.callbackID = callbackID;
+        this.instanceTime = instanceTime;
+        this.type = EventType.RE_RUN;
+    }
+
+    @Override
+    public ID getTarget() {
+        return callbackID;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
index fb11091..c524dfa 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
@@ -52,6 +52,7 @@ import org.apache.falcon.state.store.StateStore;
 import org.apache.falcon.util.ReflectionUtils;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.apache.falcon.workflow.engine.FalconWorkflowEngine;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,6 +61,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -274,12 +276,20 @@ public class SchedulerService implements FalconNotificationService, Notification
                 }
                 LOG.debug("Received request to run instance {}", instance.getId());
                 if (checkConditions()) {
-                    // If instance not already scheduled.
                     String externalId = instance.getExternalID();
-                    if (externalId == null) {
+                    if (externalId != null) {
+                        Properties props = instance.getProperties();
+                        boolean isForced = false;
+                        if (props != null) {
+                            isForced = Boolean.valueOf(props.getProperty(FalconWorkflowEngine.FALCON_FORCE_RERUN));
+                        }
+                        if (isReRun(props)) {
+                            DAGEngineFactory.getDAGEngine(instance.getCluster()).reRun(instance, props, isForced);
+                        }
+                    } else {
                         externalId = DAGEngineFactory.getDAGEngine(instance.getCluster()).run(instance);
-                        LOG.info("Scheduled job {} for instance {}", externalId, instance.getId());
                     }
+                    LOG.info("Scheduled job {} for instance {}", externalId, instance.getId());
                     JobScheduledEvent event = new JobScheduledEvent(instance.getId(),
                             JobScheduledEvent.STATUS.SUCCESSFUL);
                     event.setExternalID(externalId);
@@ -297,6 +307,13 @@ public class SchedulerService implements FalconNotificationService, Notification
             }
         }
 
+        private boolean isReRun(Properties props) {
+            if (props != null && !props.isEmpty()) {
+                return Boolean.valueOf(props.getProperty(FalconWorkflowEngine.FALCON_RERUN));
+            }
+            return false;
+        }
+
         public short getPriority() {
             return priority;
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
index 164fb0e..c3db685 100644
--- a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
+++ b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
@@ -23,6 +23,7 @@ import org.apache.falcon.execution.NotificationHandler;
 import org.apache.falcon.notification.service.event.DataEvent;
 import org.apache.falcon.notification.service.event.Event;
 import org.apache.falcon.notification.service.event.EventType;
+import org.apache.falcon.notification.service.event.RerunEvent;
 import org.apache.falcon.notification.service.event.TimeElapsedEvent;
 import org.apache.falcon.state.ID;
 
@@ -46,7 +47,8 @@ public class Predicate implements Serializable {
     public enum TYPE {
         DATA,
         TIME,
-        JOB_COMPLETION
+        JOB_COMPLETION,
+        RE_RUN
     }
 
     private final TYPE type;
@@ -179,6 +181,16 @@ public class Predicate implements Serializable {
     }
 
     /**
+     * Creates a predicate of type Rerun.
+     * @param instanceTime
+     * @return
+     */
+    public static Predicate createRerunPredicate(long instanceTime) {
+        return new Predicate(TYPE.RE_RUN)
+                .addClause("instanceTime", (instanceTime < 0) ? ANY : instanceTime);
+    }
+
+    /**
      * Creates a predicate from an event based on the event source and values in the event.
      *
      * @param event
@@ -206,6 +218,13 @@ public class Predicate implements Serializable {
                 throw new FalconException("Event does not have enough data to create a predicate");
             }
 
+        } else if (event.getType() == EventType.RE_RUN) {
+            RerunEvent rerunEvent = (RerunEvent) event;
+            if (rerunEvent.getInstanceTime() != null) {
+                return Predicate.createRerunPredicate(rerunEvent.getInstanceTime().getMillis());
+            } else {
+                throw new FalconException("Event does not have enough data to create a predicate");
+            }
         } else {
             throw new FalconException("Unhandled event type " + event.getType());
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
index 7f2bda9..27dd8d4 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
@@ -50,6 +50,8 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance
                     return TIMED_OUT;
                 case TRIGGER:
                     return this;
+                case EXTERNAL_TRIGGER:
+                    return this;
                 default:
                     throw new InvalidStateTransitionException("Event " + event.name() + " not valid for state, "
                             + STATE.WAITING.name());
@@ -68,6 +70,8 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance
                     return RUNNING;
                 case CONDITIONS_MET:
                     return this;
+                case FAIL:
+                    return FAILED;
                 default:
                     throw new InvalidStateTransitionException("Event " + event.name()
                             + " not valid for state, " + this.name());
@@ -99,6 +103,9 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance
                 if (event == EVENT.SUCCEED) {
                     return this;
                 }
+                if (event == EVENT.EXTERNAL_TRIGGER) {
+                    return WAITING;
+                }
                 throw new InvalidStateTransitionException("Instance is in terminal state, " + this.name()
                         + ". Cannot apply transitions.");
             }
@@ -109,6 +116,9 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance
                 if (event == EVENT.FAIL) {
                     return this;
                 }
+                if (event == EVENT.EXTERNAL_TRIGGER) {
+                    return WAITING;
+                }
                 throw new InvalidStateTransitionException("Instance is in terminal state, " + this.name()
                         + ". Cannot apply transitions.");
             }
@@ -119,6 +129,9 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance
                 if (event == EVENT.KILL) {
                     return this;
                 }
+                if (event == EVENT.EXTERNAL_TRIGGER) {
+                    return WAITING;
+                }
                 throw new InvalidStateTransitionException("Instance is in terminal state, " + this.name()
                         + ". Cannot apply transitions.");
             }
@@ -129,6 +142,9 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance
                 if (event == EVENT.TIME_OUT) {
                     return this;
                 }
+                if (event == EVENT.EXTERNAL_TRIGGER) {
+                    return WAITING;
+                }
                 throw new InvalidStateTransitionException("Instance is in terminal state, " + this.name()
                         + ". Cannot apply transitions.");
             }
@@ -176,7 +192,8 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance
         RESUME_RUNNING,
         KILL,
         SUCCEED,
-        FAIL
+        FAIL,
+        EXTERNAL_TRIGGER
     }
 
     /**
@@ -245,6 +262,7 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance
         states.add(STATE.FAILED);
         states.add(STATE.KILLED);
         states.add(STATE.SUCCEEDED);
+        states.add(STATE.TIMED_OUT);
         return states;
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java
index 1f69fab..e460ee7 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java
@@ -34,6 +34,14 @@ public interface InstanceStateChangeHandler {
     void onTrigger(ExecutionInstance instance) throws FalconException;
 
     /**
+     * Invoked when an instance is rerun or triggered externally.
+     *
+     * @param instance
+     * @throws FalconException
+     */
+    void onExternalTrigger(ExecutionInstance instance) throws FalconException;
+
+    /**
      * Invoked when all gating conditions are satisfied.
      *
      * @param instance

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/state/StateService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/StateService.java b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
index c702cc3..9266354 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/StateService.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
@@ -129,6 +129,10 @@ public final class StateService {
                 callbackHandler(instance, InstanceState.EVENT.TRIGGER, handler);
                 stateStore.putExecutionInstance(new InstanceState(instance));
                 LOG.debug("Instance {} triggered due to event {}.", id, event.name());
+            } else if (event == InstanceState.EVENT.EXTERNAL_TRIGGER) {
+                callbackHandler(instance, InstanceState.EVENT.EXTERNAL_TRIGGER, handler);
+                stateStore.updateExecutionInstance(new InstanceState(instance));
+                LOG.debug("Instance {} triggered due to event {}.", id, event.name());
             } else {
                 throw new FalconException("Instance " + id + "does not exist.");
             }
@@ -154,6 +158,9 @@ public final class StateService {
         case TRIGGER:
             handler.onTrigger(instance);
             break;
+        case EXTERNAL_TRIGGER:
+            handler.onExternalTrigger(instance);
+            break;
         case CONDITIONS_MET:
             handler.onConditionsMet(instance);
             break;

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
index 2f3aa3a..c4ced46 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
@@ -18,6 +18,7 @@
 package org.apache.falcon.state.store;
 
 import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.exception.StateStoreException;
 import org.apache.falcon.execution.ExecutionInstance;
@@ -141,6 +142,19 @@ public final class InMemoryStateStore extends AbstractStateStore {
     }
 
     @Override
+    public InstanceState getExecutionInstance(String externalID) throws StateStoreException {
+        if (StringUtils.isEmpty(externalID)) {
+            throw new StateStoreException("External ID for retrieving instance cannot be null");
+        }
+        for (InstanceState instanceState : instanceStates.values()) {
+            if (externalID.equals(instanceState.getInstance().getExternalID())) {
+                return instanceState;
+            }
+        }
+        return null;
+    }
+
+    @Override
     public void updateExecutionInstance(InstanceState instanceState) throws StateStoreException {
         String key = new InstanceID(instanceState.getInstance()).getKey();
         if (!instanceStates.containsKey(key)) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
index f1d1931..8ce8fa0 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
@@ -48,6 +48,15 @@ public interface InstanceStateStore {
     InstanceState getExecutionInstance(InstanceID instanceId) throws StateStoreException;
 
     /**
+     * Retrieves ExecutionInstance from external ID.
+     *
+     * @param externalID
+     * @return
+     * @throws StateStoreException
+     */
+    InstanceState getExecutionInstance(String externalID) throws StateStoreException;
+
+    /**
      * Updates an execution instance in the store.
      *
      * @param instanceState

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
index 4bee269..3def14a 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
@@ -41,6 +41,7 @@ import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Properties;
 
 /**
  * Mapping util for Persistent Store.
@@ -165,6 +166,10 @@ public final class BeanMapperUtil {
                 IOUtils.closeQuietly(out);
             }
         }
+        if (instance.getProperties() != null && !instance.getProperties().isEmpty()) {
+            byte[] props = getProperties(instanceState);
+            instanceBean.setProperties(props);
+        }
         return instanceBean;
     }
 
@@ -207,6 +212,22 @@ public final class BeanMapperUtil {
             }
         }
         executionInstance.setAwaitingPredicates(predicates);
+
+        result = instanceBean.getProperties();
+        if (result != null && result.length != 0) {
+            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(result);
+            ObjectInputStream in = null;
+            Properties properties = null;
+            try {
+                in = new ObjectInputStream(byteArrayInputStream);
+                properties = (Properties) in.readObject();
+            } catch (ClassNotFoundException e) {
+                throw new IOException(e);
+            } finally {
+                IOUtils.closeQuietly(in);
+            }
+            executionInstance.setProperties(properties);
+        }
         InstanceState instanceState = new InstanceState(executionInstance);
         instanceState.setCurrentState(InstanceState.STATE.valueOf(instanceBean.getCurrentState()));
         return instanceState;
@@ -268,4 +289,16 @@ public final class BeanMapperUtil {
             IOUtils.closeQuietly(out);
         }
     }
+
+    public static byte [] getProperties(InstanceState instanceState) throws IOException {
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        ObjectOutputStream out = null;
+        try {
+            out = new ObjectOutputStream(byteArrayOutputStream);
+            out.writeObject(instanceState.getInstance().getProperties());
+            return byteArrayOutputStream.toByteArray();
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
index 0e3dfa9..305b398 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
@@ -37,9 +37,10 @@ import java.sql.Timestamp;
 @Entity
 @NamedQueries({
         @NamedQuery(name = "GET_INSTANCE", query = "select OBJECT(a) from InstanceBean a where a.id = :id"),
+        @NamedQuery(name = "GET_INSTANCE_FOR_EXTERNAL_ID", query = "select OBJECT(a) from InstanceBean a where a.externalID = :externalID"),
         @NamedQuery(name = "DELETE_INSTANCE", query = "delete from InstanceBean a where a.id = :id"),
         @NamedQuery(name = "DELETE_INSTANCE_FOR_ENTITY", query = "delete from InstanceBean a where a.entityId = :entityId"),
-        @NamedQuery(name = "UPDATE_INSTANCE", query = "update InstanceBean a set a.cluster = :cluster, a.externalID = :externalID, a.instanceTime = :instanceTime, a.creationTime = :creationTime, a.actualEndTime = :actualEndTime, a.currentState = :currentState, a.actualStartTime = :actualStartTime, a.instanceSequence = :instanceSequence, a.awaitedPredicates = :awaitedPredicates where a.id = :id"),
+        @NamedQuery(name = "UPDATE_INSTANCE", query = "update InstanceBean a set a.cluster = :cluster, a.externalID = :externalID, a.instanceTime = :instanceTime, a.creationTime = :creationTime, a.actualEndTime = :actualEndTime, a.currentState = :currentState, a.actualStartTime = :actualStartTime, a.instanceSequence = :instanceSequence, a.awaitedPredicates = :awaitedPredicates, a.properties = :properties where a.id = :id"),
         @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster"),
         @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState)"),
         @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)"),
@@ -108,6 +109,10 @@ public class InstanceBean {
     @Lob
     private byte[] awaitedPredicates;
 
+    @Column(name = "properties", columnDefinition = "BLOB")
+    @Lob
+    private byte[] properties;
+
 
     public String getId() {
         return id;
@@ -196,4 +201,12 @@ public class InstanceBean {
     public void setEntityId(String entityId) {
         this.entityId = entityId;
     }
+
+    public byte[] getProperties() {
+        return properties;
+    }
+
+    public void setProperties(byte[] properties) {
+        this.properties = properties;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
index b2f8e80..151c2c2 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.state.store.jdbc;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.exception.StateStoreException;
 import org.apache.falcon.execution.ExecutionInstance;
@@ -217,6 +218,29 @@ public final class JDBCStateStore extends AbstractStateStore {
     }
 
     @Override
+    public InstanceState getExecutionInstance(String externalID) throws StateStoreException {
+        if (StringUtils.isEmpty(externalID)) {
+            throw new StateStoreException("External ID for retrieving instance cannot be null or empty");
+        }
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery("GET_INSTANCE_FOR_EXTERNAL_ID");
+        q.setParameter("externalID", externalID);
+        List result = q.getResultList();
+        entityManager.close();
+        if (result.isEmpty()) {
+            return null;
+        }
+        try {
+            InstanceBean instanceBean = (InstanceBean)(result.get(0));
+            return BeanMapperUtil.convertToInstanceState(instanceBean);
+        } catch (IOException e) {
+            throw new StateStoreException(e);
+        }
+
+    }
+
+
+    @Override
     public void updateExecutionInstance(InstanceState instanceState) throws StateStoreException {
         InstanceID id = new InstanceID(instanceState.getInstance());
         String key = id.toString();
@@ -248,6 +272,13 @@ public final class JDBCStateStore extends AbstractStateStore {
                 throw new StateStoreException(e);
             }
         }
+        if (instance.getProperties() != null && !instance.getProperties().isEmpty()) {
+            try {
+                q.setParameter("properties", BeanMapperUtil.getProperties(instanceState));
+            } catch (IOException e) {
+                throw new StateStoreException(e);
+            }
+        }
         q.executeUpdate();
         commitAndCloseTransaction(entityManager);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
index e0d2a0e..49e083c 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
@@ -75,9 +75,11 @@ public interface DAGEngine {
      * Re-run a terminated instance.
      *
      * @param instance
+     * @param props
+     * @param isForced
      * @throws DAGEngineException
      */
-    void reRun(ExecutionInstance instance) throws DAGEngineException;
+    void reRun(ExecutionInstance instance, Properties props, boolean isForced) throws DAGEngineException;
 
     /**
      * Perform dryrun of an instance.

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index c19cada..6abc222 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -63,6 +63,8 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
     private static final StateStore STATE_STORE = AbstractStateStore.get();
     private static final ConfigurationStore CONFIG_STORE = ConfigurationStore.get();
     private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters";
+    public static final String FALCON_FORCE_RERUN = "falcon.system.force.rerun";
+    public static final String FALCON_RERUN = "falcon.system.rerun";
 
     private enum JobAction {
         KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY, PARAMS
@@ -160,6 +162,12 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
 
     private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end,
                                         Properties props, List<LifeCycle> lifeCycles) throws FalconException {
+        return doJobAction(action, entity, start, end, props, lifeCycles, false);
+    }
+
+    private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end,
+                                        Properties props, List<LifeCycle> lifeCycles,
+                                        boolean isForced) throws FalconException {
         Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
         List<String> clusterList = getIncludedClusters(props, FALCON_INSTANCE_ACTION_CLUSTERS);
         APIResult.Status overallStatus = APIResult.Status.SUCCEEDED;
@@ -186,6 +194,10 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
             states.addAll(InstanceState.getTerminalStates());
             states.add(InstanceState.STATE.SUSPENDED);
             break;
+        case RERUN:
+            // Applicable only for terminated States
+            states = InstanceState.getTerminalStates();
+            break;
         default:
             throw new IllegalArgumentException("Unhandled action " + action);
         }
@@ -210,7 +222,7 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
 
             InstancesResult.Instance instance = null;
             try {
-                instance = performAction(ins.getCluster(), entity, action, ins);
+                instance = performAction(ins.getCluster(), entity, action, ins, props, isForced);
                 instance.instance = instanceTimeStr;
             } catch (FalconException e) {
                 LOG.warn("Unable to perform action {} on cluster", action, e);
@@ -241,7 +253,8 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     private InstancesResult.Instance performAction(String cluster, Entity entity, JobAction action,
-                                                   ExecutionInstance instance) throws FalconException {
+                                                   ExecutionInstance instance, Properties userProps,
+                                                   boolean isForced) throws FalconException {
         EntityExecutor executor = EXECUTION_SERVICE.getEntityExecutor(entity, cluster);
         InstancesResult.Instance instanceInfo = null;
         LOG.debug("Retrieving information for {} for action {}", instance.getId(), action);
@@ -266,6 +279,10 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
                             .getExecutionInstance(instance.getId()).getCurrentState().name());
             break;
         case RERUN:
+            executor.rerun(instance, userProps, isForced);
+            instanceInfo.status =
+                    InstancesResult.WorkflowStatus.valueOf(STATE_STORE
+                            .getExecutionInstance(instance.getId()).getCurrentState().name());
             break;
         case STATUS:
             if (StringUtils.isNotEmpty(instance.getExternalID())) {
@@ -302,7 +319,7 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
     @Override
     public InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props,
                                           List<LifeCycle> lifeCycles, Boolean isForced) throws FalconException {
-        throw new FalconException("Not yet Implemented");
+        return doJobAction(JobAction.RERUN, entity, start, end, props, lifeCycles, isForced);
     }
 
     @Override
@@ -395,7 +412,10 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
 
     @Override
     public void reRun(String cluster, String jobId, Properties props, boolean isForced) throws FalconException {
-        throw new FalconException("Not yet Implemented");
+        InstanceState instanceState = STATE_STORE.getExecutionInstance(jobId);
+        ExecutionInstance instance = instanceState.getInstance();
+        EntityExecutor executor = EXECUTION_SERVICE.getEntityExecutor(instance.getEntity(), cluster);
+        executor.rerun(instance, props, isForced);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
index a26eb77..4786cc3 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
@@ -32,6 +32,7 @@ import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -199,7 +200,8 @@ public class OozieDAGEngine implements DAGEngine {
             client.suspend(instance.getExternalID());
             assertStatus(instance.getExternalID(), Job.Status.PREPSUSPENDED, Job.Status.SUSPENDED, Job.Status.SUCCEEDED,
                     Job.Status.FAILED, Job.Status.KILLED);
-            LOG.info("Suspended job {} on cluster {}", instance.getExternalID(), instance.getCluster());
+            LOG.info("Suspended job {} of entity {} of time {} on cluster {}", instance.getExternalID(),
+                    instance.getEntity().getName(), instance.getInstanceTime(), instance.getCluster());
         } catch (OozieClientException e) {
             throw new DAGEngineException(e);
         }
@@ -211,7 +213,8 @@ public class OozieDAGEngine implements DAGEngine {
             client.resume(instance.getExternalID());
             assertStatus(instance.getExternalID(), Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUCCEEDED,
                     Job.Status.FAILED, Job.Status.KILLED);
-            LOG.info("Resumed job {} on cluster {}", instance.getExternalID(), instance.getCluster());
+            LOG.info("Resumed job {} of entity {} of time {} on cluster {}", instance.getExternalID(),
+                    instance.getEntity().getName(), instance.getInstanceTime(), instance.getCluster());
         } catch (OozieClientException e) {
             throw new DAGEngineException(e);
         }
@@ -222,15 +225,37 @@ public class OozieDAGEngine implements DAGEngine {
         try {
             client.kill(instance.getExternalID());
             assertStatus(instance.getExternalID(), Job.Status.KILLED, Job.Status.SUCCEEDED, Job.Status.FAILED);
-            LOG.info("Killed job {} on cluster {}", instance.getExternalID(), instance.getCluster());
+            LOG.info("Killed job {} of entity {} of time {} on cluster {}", instance.getExternalID(),
+                    instance.getEntity().getName(), instance.getInstanceTime(), instance.getCluster());
         } catch (OozieClientException e) {
             throw new DAGEngineException(e);
         }
     }
 
     @Override
-    public void reRun(ExecutionInstance instance) throws DAGEngineException {
-        // TODO : Implement this
+    public void reRun(ExecutionInstance instance, Properties props, boolean isForced) throws DAGEngineException {
+        String jobId = instance.getExternalID();
+        try {
+            WorkflowJob jobInfo = client.getJobInfo(jobId);
+            Properties jobprops = OozieUtils.toProperties(jobInfo.getConf());
+            if (props != null) {
+                jobprops.putAll(props);
+            }
+            //if user has set any of these oozie rerun properties then force rerun flag is ignored
+            if (!jobprops.containsKey(OozieClient.RERUN_FAIL_NODES)
+                    && !jobprops.containsKey(OozieClient.RERUN_SKIP_NODES)) {
+                jobprops.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced));
+            }
+            jobprops.remove(OozieClient.COORDINATOR_APP_PATH);
+            jobprops.remove(OozieClient.BUNDLE_APP_PATH);
+            client.reRun(jobId, jobprops);
+            assertStatus(instance.getExternalID(), Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUCCEEDED);
+            LOG.info("Rerun job {} of entity {} of time {} on cluster {}", jobId, instance.getEntity().getName(),
+                    instance.getInstanceTime(), instance.getCluster());
+        } catch (Exception e) {
+            LOG.error("Unable to rerun workflows", e);
+            throw new DAGEngineException(e);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java
index d274ad7..c99f3fd 100644
--- a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java
+++ b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java
@@ -79,8 +79,10 @@ public class MockDAGEngine implements DAGEngine {
     }
 
     @Override
-    public void reRun(ExecutionInstance instance) throws DAGEngineException {
-
+    public void reRun(ExecutionInstance instance, Properties props, boolean isForced) throws DAGEngineException {
+        if (failInstances.contains(instance)) {
+            throw new DAGEngineException("mock failure.");
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/10fcb915/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
index 6d5bd49..34156c0 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
@@ -364,6 +364,40 @@ public class TestJDBCStateStore extends AbstractSchedulerTestBase {
 
     }
 
+    @Test
+    public void testGetInstanceFromExternalID() throws Exception {
+        storeEntity(EntityType.CLUSTER, "testCluster");
+        storeEntity(EntityType.FEED, "clicksFeed");
+        storeEntity(EntityType.FEED, "clicksSummary");
+
+        long instance1Time = System.currentTimeMillis() - 180000;
+        long instance2Time = System.currentTimeMillis();
+        EntityState entityState = getEntityState(EntityType.PROCESS, "processext");
+        ExecutionInstance processExecutionInstance1 = BeanMapperUtil.getExecutionInstance(
+                entityState.getEntity().getEntityType(), entityState.getEntity(),
+                instance1Time, "cluster1", instance1Time);
+        processExecutionInstance1.setExternalID("external_id_1");
+        InstanceState instanceState1 = new InstanceState(processExecutionInstance1);
+        instanceState1.setCurrentState(InstanceState.STATE.RUNNING);
+
+        ExecutionInstance processExecutionInstance2 = BeanMapperUtil.getExecutionInstance(
+                entityState.getEntity().getEntityType(), entityState.getEntity(),
+                instance2Time, "cluster1", instance2Time);
+        processExecutionInstance2.setExternalID("external_id_2");
+        InstanceState instanceState2 = new InstanceState(processExecutionInstance2);
+        instanceState2.setCurrentState(InstanceState.STATE.RUNNING);
+
+        stateStore.putExecutionInstance(instanceState1);
+        stateStore.putExecutionInstance(instanceState2);
+
+        InstanceState actualInstanceState = stateStore.getExecutionInstance("external_id_1");
+        Assert.assertEquals(actualInstanceState.getInstance(), processExecutionInstance1);
+
+        actualInstanceState = stateStore.getExecutionInstance("external_id_2");
+        Assert.assertEquals(actualInstanceState.getInstance(), processExecutionInstance2);
+
+    }
+
 
     private void initInstanceState(InstanceState instanceState) {
         instanceState.setCurrentState(InstanceState.STATE.READY);


Mime
View raw message