falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject [3/6] falcon git commit: FALCON-1213 Base framework of the native scheduler
Date Tue, 20 Oct 2015 12:10:00 GMT
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/ID.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/ID.java b/scheduler/src/main/java/org/apache/falcon/state/ID.java
new file mode 100644
index 0000000..420c856
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/ID.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.datanucleus.util.StringUtils;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.io.Serializable;
+
+/**
+ * A serializable, comparable ID used to uniquely represent an entity or an instance.
+ */
+public final class ID implements Serializable, Comparable<ID> {
+    public static final String KEY_SEPARATOR = "/";
+    public static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm";
+
+    private String entityName;
+    private EntityType entityType;
+    private String entityKey;
+    private String cluster;
+    private DateTime instanceTime;
+
+    /**
+     * Default Constructor.
+     */
+    public ID(){}
+
+    /**
+     * Constructor.
+     *
+     * @param type
+     * @param name
+     */
+    public ID(EntityType type, String name) {
+        assert type != null : "Entity type must be present.";
+        assert !StringUtils.isEmpty(name) : "Entity name must be present.";
+        this.entityName = name;
+        this.entityType = type;
+        this.entityKey = entityType + KEY_SEPARATOR + entityName;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param entity
+     */
+    public ID(Entity entity) {
+        this(entity.getEntityType(), entity.getName());
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param entity
+     * @param cluster
+     */
+    public ID(Entity entity, String cluster) {
+        this(entity.getEntityType(), entity.getName());
+        assert !StringUtils.isEmpty(cluster) : "Cluster cannot be empty.";
+        this.cluster = cluster;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param instance
+     */
+    public ID(ExecutionInstance instance) {
+        this(instance.getEntity(), instance.getCluster());
+        assert instance.getInstanceTime() != null : "Nominal time cannot be null.";
+        this.instanceTime = instance.getInstanceTime();
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param entity
+     * @param cluster
+     * @param instanceTime
+     */
+    public ID(Entity entity, String cluster, DateTime instanceTime) {
+        this(entity, cluster);
+        assert instanceTime != null : "Nominal time cannot be null.";
+        this.instanceTime = instanceTime;
+    }
+
+    /**
+     * @return cluster name
+     */
+    public String getCluster() {
+        return cluster;
+    }
+
+    /**
+     * @param cluster name
+     */
+    public void setCluster(String cluster) {
+        this.cluster = cluster;
+    }
+
+    /**
+     * @return nominal time
+     */
+    public DateTime getInstanceTime() {
+        return instanceTime;
+    }
+
+    /**
+     * @param instanceTime
+     */
+    public void setInstanceTime(DateTime instanceTime) {
+        this.instanceTime = instanceTime;
+    }
+
+    /**
+     * @return entity name
+     */
+    public String getEntityName() {
+        return entityName;
+    }
+
+    /**
+     * @return entity type
+     */
+    public EntityType getEntityType() {
+        return entityType;
+    }
+
+    @Override
+    public String toString() {
+        String val = entityKey;
+        if (!StringUtils.isEmpty(cluster)) {
+            val = val + KEY_SEPARATOR + cluster;
+        }
+
+        if (instanceTime != null) {
+            DateTimeFormatter fmt = DateTimeFormat.forPattern(INSTANCE_FORMAT);
+            val = val + KEY_SEPARATOR + fmt.print(instanceTime);
+        }
+        return val;
+    }
+
+    /**
+     * @return An ID without the cluster name
+     */
+    public String getEntityKey() {
+        return entityKey;
+    }
+
+    /**
+     * @return ID without the instance information
+     */
+    public ID getEntityID() {
+        ID newID = new ID(this.entityType, this.entityName);
+        newID.setCluster(this.cluster);
+        newID.setInstanceTime(null);
+        return newID;
+    }
+
+    @Override
+    public boolean equals(Object id) {
+        if (id == null || id.getClass() != getClass()) {
+            return false;
+        }
+        return compareTo((ID)id) == 0;
+    }
+
+    @Override
+    public int hashCode() {
+        return this.toString().hashCode();
+    }
+
+    @Override
+    public int compareTo(ID id) {
+        if (id == null) {
+            return -1;
+        }
+        return this.toString().compareTo(id.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
new file mode 100644
index 0000000..8cf24ee
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.exception.InvalidStateTransitionException;
+import org.apache.falcon.execution.ExecutionInstance;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Represents the state of an execution instance.
+ * Implements {@link org.apache.falcon.state.StateMachine} for an instance.
+ */
+public class InstanceState implements StateMachine<InstanceState.STATE, InstanceState.EVENT> {
+    private ExecutionInstance instance;
+    private STATE currentState;
+    private static final STATE INITIAL_STATE = STATE.WAITING;
+
+    /**
+     * Enumerates all the valid states of an instance and the valid transitions from that state.
+     */
+    public enum STATE implements StateMachine<InstanceState.STATE, InstanceState.EVENT> {
+        WAITING {
+            @Override
+            public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+                switch (event) {
+                case SUSPEND:
+                    return SUSPENDED;
+                case KILL:
+                    return KILLED;
+                case CONDITIONS_MET:
+                    return READY;
+                case TIME_OUT:
+                    return TIMED_OUT;
+                case TRIGGER:
+                    return this;
+                default:
+                    throw new InvalidStateTransitionException("Event " + event.name() + " not valid for state, "
+                            + STATE.WAITING.name());
+                }
+            }
+        },
+        READY {
+            @Override
+            public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+                switch (event) {
+                case SUSPEND:
+                    return SUSPENDED;
+                case KILL:
+                    return KILLED;
+                case SCHEDULE:
+                    return RUNNING;
+                case CONDITIONS_MET:
+                    return this;
+                default:
+                    throw new InvalidStateTransitionException("Event " + event.name()
+                            + " not valid for state, " + this.name());
+                }
+            }
+        },
+        RUNNING {
+            @Override
+            public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+                switch (event) {
+                case SUSPEND:
+                    return SUSPENDED;
+                case KILL:
+                    return KILLED;
+                case SUCCEED:
+                    return SUCCEEDED;
+                case FAIL:
+                    return FAILED;
+                case SCHEDULE:
+                    return this;
+                default:
+                    throw new InvalidStateTransitionException("Event " + event.name()
+                            + " not valid for state, " + this.name());
+                }
+            }
+        }, SUCCEEDED {
+            @Override
+            public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+                if (event == EVENT.SUCCEED) {
+                    return this;
+                }
+                throw new InvalidStateTransitionException("Instance is in terminal state, " + this.name()
+                        + ". Cannot apply transitions.");
+            }
+        },
+        FAILED {
+            @Override
+            public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+                if (event == EVENT.FAIL) {
+                    return this;
+                }
+                throw new InvalidStateTransitionException("Instance is in terminal state, " + this.name()
+                        + ". Cannot apply transitions.");
+            }
+        },
+        KILLED {
+            @Override
+            public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+                if (event == EVENT.KILL) {
+                    return this;
+                }
+                throw new InvalidStateTransitionException("Instance is in terminal state, " + this.name()
+                        + ". Cannot apply transitions.");
+            }
+        },
+        TIMED_OUT {
+            @Override
+            public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+                if (event == EVENT.TIME_OUT) {
+                    return this;
+                }
+                throw new InvalidStateTransitionException("Instance is in terminal state, " + this.name()
+                        + ". Cannot apply transitions.");
+            }
+        },
+        SUSPENDED {
+            @Override
+            public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+                switch (event) {
+                case RESUME_WAITING:
+                    return WAITING;
+                case RESUME_READY:
+                    return READY;
+                case RESUME_RUNNING:
+                    return RUNNING;
+                case SUSPEND:
+                    return this;
+                // The instance can complete execution on DAG engine, just after a suspend was issued.
+                // Especially with Oozie, it finishes execution of current action before suspending.
+                // Hence need to allow terminal states too.
+                case SUCCEED:
+                    return SUCCEEDED;
+                case FAIL:
+                    return FAILED;
+                case KILL:
+                    return KILLED;
+                default:
+                    throw new InvalidStateTransitionException("Event " + event.name()
+                            + " not valid for state, " + this.name());
+                }
+            }
+        }
+    }
+
+    /**
+     * Enumerates all the valid events that can cause a state transition.
+     */
+    public enum EVENT {
+        TRIGGER,
+        CONDITIONS_MET,
+        TIME_OUT,
+        SCHEDULE,
+        SUSPEND,
+        RESUME_WAITING,
+        RESUME_READY,
+        RESUME_RUNNING,
+        KILL,
+        SUCCEED,
+        FAIL
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param instance
+     */
+    public InstanceState(ExecutionInstance instance) {
+        this.instance = instance;
+        currentState = INITIAL_STATE;
+    }
+
+    /**
+     * @return execution instance
+     */
+    public ExecutionInstance getInstance() {
+        return instance;
+    }
+
+    /**
+     * @return current state
+     */
+    public STATE getCurrentState() {
+        return currentState;
+    }
+
+    /**
+     * @param state
+     * @return This instance
+     */
+    public InstanceState setCurrentState(STATE state) {
+        this.currentState = state;
+        return this;
+    }
+
+    @Override
+    public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+        return currentState.nextTransition(event);
+    }
+
+    /**
+     * @return "active" states of an instance.
+     */
+    public static List<STATE> getActiveStates() {
+        List<InstanceState.STATE> states = new ArrayList<STATE>();
+        states.add(STATE.RUNNING);
+        states.add(STATE.READY);
+        states.add(STATE.WAITING);
+        return states;
+    }
+
+    /**
+     * @return "running" states of an instance.
+     */
+    public static List<STATE> getRunningStates() {
+        List<InstanceState.STATE> states = new ArrayList<STATE>();
+        states.add(STATE.RUNNING);
+        return states;
+    }
+
+    /**
+     *  @return "terminal" states of an instance.
+     */
+    public static List<STATE> getTerminalStates() {
+        List<InstanceState.STATE> states = new ArrayList<STATE>();
+        states.add(STATE.FAILED);
+        states.add(STATE.KILLED);
+        states.add(STATE.SUCCEEDED);
+        return states;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java
new file mode 100644
index 0000000..1f69fab
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.execution.ExecutionInstance;
+
+/**
+ * Any handler interested in handling state changes of instances must implement this interface.
+ */
+public interface InstanceStateChangeHandler {
+
+    /**
+     * Invoked when an instance is created.
+     *
+     * @param instance
+     * @throws FalconException
+     */
+    void onTrigger(ExecutionInstance instance) throws FalconException;
+
+    /**
+     * Invoked when all gating conditions are satisfied.
+     *
+     * @param instance
+     * @throws FalconException
+     */
+    void onConditionsMet(ExecutionInstance instance) throws FalconException;
+
+    /**
+     * Invoked when an instance scheduled on a DAG Engine.
+     *
+     * @param instance
+     * @throws FalconException
+     */
+    void onSchedule(ExecutionInstance instance) throws FalconException;
+
+    /**
+     * Invoked on suspension of an instance.
+     *
+     * @param instance
+     * @throws FalconException
+     */
+    void onSuspend(ExecutionInstance instance) throws FalconException;
+
+    /**
+     * Invoked when an instance is resumed.
+     *
+     * @param instance
+     * @throws FalconException
+     */
+    void onResume(ExecutionInstance instance) throws FalconException;
+
+    /**
+     * Invoked when an instance is killed.
+     *
+     * @param instance
+     * @throws FalconException
+     */
+    void onKill(ExecutionInstance instance) throws FalconException;
+
+    /**
+     * Invoked when an instance completes successfully.
+     *
+     * @param instance
+     * @throws FalconException
+     */
+    void onSuccess(ExecutionInstance instance) throws FalconException;
+
+    /**
+     * Invoked when execution of an instance fails.
+     *
+     * @param instance
+     * @throws FalconException
+     */
+    void onFailure(ExecutionInstance instance) throws FalconException;
+
+    /**
+     * Invoked when an instance times out waiting for gating conditions to be satisfied.
+     *
+     * @param instance
+     * @throws FalconException
+     */
+    void onTimeOut(ExecutionInstance instance) throws FalconException;
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/StateMachine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/StateMachine.java b/scheduler/src/main/java/org/apache/falcon/state/StateMachine.java
new file mode 100644
index 0000000..6ca0500
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/StateMachine.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.exception.InvalidStateTransitionException;
+
+/**
+ * Interface to be implemented by a class that handles state transitions.
+ */
+public interface StateMachine<STATE extends Enum<STATE>, EVENT extends Enum<EVENT>> {
+
+    /**
+     * @param event
+     * @return The state that the machine enters into as a result of the event.
+     * @throws InvalidStateTransitionException
+     */
+    STATE nextTransition(EVENT event) throws InvalidStateTransitionException;
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/StateService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/StateService.java b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
new file mode 100644
index 0000000..81357a4
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.StateStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A service that fetches state from state store, handles state transitions of entities and instances,
+ * invokes state change handler and finally persists the new state in the state store.
+ */
+public final class StateService {
+    private static final Logger LOG = LoggerFactory.getLogger(StateService.class);
+    private static final StateService LIFE_CYCLE_SERVICE = new StateService();
+    private final StateStore stateStore;
+
+    private StateService() {
+        stateStore = AbstractStateStore.get();
+    }
+
+    /**
+     * @return - Singleton instance of StateService
+     */
+    public static StateService get() {
+        return LIFE_CYCLE_SERVICE;
+    }
+
+    /**
+     * @return - Name of the service
+     */
+    public String getName() {
+        return "EntityLifeCycleService";
+    }
+
+    /**
+     * Fetches the entity from state store, applies state transitions, calls appropriate method on the handler and
+     * persists the final state in the store.
+     *
+     * @param entity
+     * @param event
+     * @param handler
+     * @throws FalconException
+     */
+    public void handleStateChange(Entity entity, EntityState.EVENT event, EntityStateChangeHandler handler)
+        throws FalconException {
+        ID id = new ID(entity);
+        if (!stateStore.entityExists(id)) {
+            // New entity
+            if (event == EntityState.EVENT.SUBMIT) {
+                callbackHandler(entity, EntityState.EVENT.SUBMIT, handler);
+                stateStore.putEntity(new EntityState(entity));
+                LOG.debug("Entity {} submitted due to event {}.", id, event.name());
+            } else {
+                throw new FalconException("Entity " + id + " does not exist in state store.");
+            }
+        } else {
+            if (entity.getEntityType() == EntityType.CLUSTER) {
+                throw new FalconException("Cluster entity " + entity.getName() + " can only be submitted.");
+            }
+            EntityState entityState = stateStore.getEntity(id);
+            EntityState.STATE newState = entityState.nextTransition(event);
+            callbackHandler(entity, event, handler);
+            entityState.setCurrentState(newState);
+            stateStore.updateEntity(entityState);
+            LOG.debug("State of entity: {} changed to: {} as a result of event: {}.", id,
+                    entityState.getCurrentState(), event.name());
+        }
+    }
+
+    // Invokes the right method on the state change handler
+    private void callbackHandler(Entity entity, EntityState.EVENT event, EntityStateChangeHandler handler)
+        throws FalconException {
+        if (handler == null) {
+            return;
+        }
+        switch (event) {
+        case SUBMIT:
+            handler.onSubmit(entity);
+            break;
+        case SCHEDULE:
+            handler.onSchedule(entity);
+            break;
+        case SUSPEND:
+            handler.onSuspend(entity);
+            break;
+        case RESUME:
+            handler.onResume(entity);
+            break;
+        default: // Do nothing, only propagate events that originate from user
+        }
+    }
+
+    /**
+     * Fetches the instance from state store, applies state transitions, calls appropriate method on the handler and
+     * persists the final state in the store.
+     *
+     * @param instance
+     * @param event
+     * @param handler
+     * @throws FalconException
+     */
+    public void handleStateChange(ExecutionInstance instance, InstanceState.EVENT event,
+                                  InstanceStateChangeHandler handler) throws FalconException {
+        ID id = new ID(instance);
+        if (!stateStore.executionInstanceExists(id)) {
+            // New instance
+            if (event == InstanceState.EVENT.TRIGGER) {
+                callbackHandler(instance, InstanceState.EVENT.TRIGGER, handler);
+                stateStore.putExecutionInstance(new InstanceState(instance));
+                LOG.debug("Instance {} triggered due to event {}.", id, event.name());
+            } else {
+                throw new FalconException("Instance " + id + "does not exist.");
+            }
+        } else {
+            InstanceState instanceState = stateStore.getExecutionInstance(id);
+            InstanceState.STATE newState = instanceState.nextTransition(event);
+            callbackHandler(instance, event, handler);
+            instanceState.setCurrentState(newState);
+            stateStore.updateExecutionInstance(instanceState);
+            LOG.debug("State of instance: {} changed to: {} as a result of event: {}.", id,
+                    instanceState.getCurrentState(), event.name());
+        }
+    }
+
+    // Invokes the right method on the state change handler
+    private void callbackHandler(ExecutionInstance instance, InstanceState.EVENT event,
+                                 InstanceStateChangeHandler handler) throws FalconException {
+        if (handler == null) {
+            return;
+        }
+        switch (event) {
+        case TRIGGER:
+            handler.onTrigger(instance);
+            break;
+        case CONDITIONS_MET:
+            handler.onConditionsMet(instance);
+            break;
+        case TIME_OUT:
+            handler.onTimeOut(instance);
+            break;
+        case SCHEDULE:
+            handler.onSchedule(instance);
+            break;
+        case SUSPEND:
+            handler.onSuspend(instance);
+            break;
+        case RESUME_WAITING:
+        case RESUME_READY:
+        case RESUME_RUNNING:
+            handler.onResume(instance);
+            break;
+        case KILL:
+            handler.onKill(instance);
+            break;
+        case SUCCEED:
+            handler.onSuccess(instance);
+            break;
+        case FAIL:
+            handler.onFailure(instance);
+            break;
+        default: // Do nothing
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
new file mode 100644
index 0000000..67e047f
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.service.ConfigurationChangeListener;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.util.ReflectionUtils;
+import org.apache.falcon.util.StartupProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This abstract class listens to config store changes and keeps the state store in sync with the config store.
+ */
+public abstract class AbstractStateStore implements StateStore, ConfigurationChangeListener {
+    private static StateStore stateStore;
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractStateStore.class);
+
+    @Override
+    public void onAdd(Entity entity) throws FalconException {
+        if (entity.getEntityType() != EntityType.CLUSTER) {
+            putEntity(new EntityState(entity));
+        }
+    }
+
+    @Override
+    public void onRemove(Entity entity) throws FalconException {
+        // Delete entity should remove its instances too.
+        if (entity.getEntityType() != EntityType.CLUSTER) {
+            deleteEntity(new ID(entity));
+        }
+    }
+
+    @Override
+    public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
+        if (newEntity.getEntityType() != EntityType.CLUSTER) {
+            EntityState entityState = getEntity(new ID(oldEntity));
+            if (entityState == null) {
+                onAdd(newEntity);
+            } else {
+                entityState.setEntity(newEntity);
+                updateEntity(entityState);
+            }
+        }
+    }
+
+    @Override
+    public void onReload(Entity entity) throws FalconException {
+        if (entity.getEntityType() != EntityType.CLUSTER) {
+            // To ensure the config store and state store are in sync
+            if (!entityExists(new ID(entity))) {
+                LOG.info("State store missing entity {}. Adding it.", entity.getName());
+                onAdd(entity);
+            }
+        }
+    }
+
+    /**
+     * @return Singleton instance of an implementation of State Store based on the startup properties.
+     */
+    public static synchronized StateStore get() {
+        if (stateStore == null) {
+            String storeImpl = StartupProperties.get().getProperty("state.store",
+                    "org.apache.falcon.state.store.InMemoryStateStore");
+            try {
+                stateStore = ReflectionUtils.getInstanceByClassName(storeImpl);
+            } catch (FalconException e) {
+                throw new RuntimeException("Unable to load state store impl. : " + storeImpl, e);
+            }
+        }
+        return stateStore;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
new file mode 100644
index 0000000..4aa6fdb
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.exception.StateStoreException;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.ID;
+
+import java.util.Collection;
+
+/**
+ * Interface to abstract out Entity store API.
+ */
+public interface EntityStateStore {
+    /**
+     * @param entityState
+     * @throws StateStoreException
+     */
+    void putEntity(EntityState entityState) throws StateStoreException;
+
+    /**
+     * @param entityId
+     * @return Entity corresponding to the key
+     * @throws StateStoreException - If entity does not exist.
+     */
+    EntityState getEntity(ID entityId) throws StateStoreException;
+
+    /**
+     * @param entityId
+     * @return true, if entity exists in store.
+     */
+    boolean entityExists(ID entityId);
+
+    /**
+     * @param state
+     * @return Entities in a given state.
+     */
+    Collection<Entity> getEntities(EntityState.STATE state);
+
+    /**
+     * @return All Entities in the store.
+     */
+    Collection<EntityState> getAllEntities();
+
+    /**
+     * Update an existing entity with the new values.
+     *
+     * @param entityState
+     * @throws StateStoreException when entity does not exist.
+     */
+    void updateEntity(EntityState entityState) throws StateStoreException;
+
+    /**
+     * Removes the entity and its instances from the store.
+     *
+     * @param entityId
+     * @throws StateStoreException
+     */
+    void deleteEntity(ID entityId) throws StateStoreException;
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
new file mode 100644
index 0000000..3822860
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store;
+
+import com.google.common.collect.Lists;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.exception.StateStoreException;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceState;
+import org.joda.time.DateTime;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * An in memory state store mostly intended for unit tests.
+ * Singleton.
+ */
+public final class InMemoryStateStore extends AbstractStateStore {
+
+    private Map<String, EntityState> entityStates = new HashMap<>();
+    // Keep it sorted
+    private SortedMap<String, InstanceState> instanceStates = Collections
+            .synchronizedSortedMap(new TreeMap<String, InstanceState>());
+
+    private static final StateStore STORE = new InMemoryStateStore();
+
+    private InMemoryStateStore() {
+    }
+
+    public static StateStore get() {
+        return STORE;
+    }
+
+    @Override
+    public void putEntity(EntityState entityState) throws StateStoreException {
+        String key = new ID(entityState.getEntity()).getEntityKey();
+        if (entityStates.containsKey(key)) {
+            throw new StateStoreException("Entity with key, " + key + " already exists.");
+        }
+        entityStates.put(key, entityState);
+    }
+
+    @Override
+    public EntityState getEntity(ID entityId) throws StateStoreException {
+        if (!entityStates.containsKey(entityId.getEntityKey())) {
+            throw new StateStoreException("Entity with key, " + entityId + " does not exist.");
+        }
+        return entityStates.get(entityId.getEntityKey());
+    }
+
+    @Override
+    public boolean entityExists(ID entityId) {
+        return entityStates.containsKey(entityId.getEntityKey());
+    }
+
+    @Override
+    public Collection<Entity> getEntities(EntityState.STATE state) {
+        Collection<Entity> entities = new ArrayList<>();
+        for (EntityState entityState : entityStates.values()) {
+            if (entityState.getCurrentState().equals(state)) {
+                entities.add(entityState.getEntity());
+            }
+        }
+        return entities;
+    }
+
+    @Override
+    public Collection<EntityState> getAllEntities() {
+        return entityStates.values();
+    }
+
+    @Override
+    public void updateEntity(EntityState entityState) throws StateStoreException {
+        String key = new ID(entityState.getEntity()).getEntityKey();
+        if (!entityStates.containsKey(key)) {
+            throw new StateStoreException("Entity with key, " + key + " does not exist.");
+        }
+        entityStates.put(key, entityState);
+    }
+
+    @Override
+    public void deleteEntity(ID entityId) throws StateStoreException {
+        if (!entityStates.containsKey(entityId.getEntityKey())) {
+            throw new StateStoreException("Entity with key, " + entityId + " does not exist.");
+        }
+        deleteExecutionInstances(entityId);
+        entityStates.remove(entityId.getEntityKey());
+    }
+
+    @Override
+    public void putExecutionInstance(InstanceState instanceState) throws StateStoreException {
+        String key = new ID(instanceState.getInstance()).toString();
+        if (instanceStates.containsKey(key)) {
+            throw new StateStoreException("Instance with key, " + key + " already exists.");
+        }
+        instanceStates.put(key, instanceState);
+    }
+
+    @Override
+    public InstanceState getExecutionInstance(ID instanceId) throws StateStoreException {
+        if (!instanceStates.containsKey(instanceId.toString())) {
+            throw new StateStoreException("Instance with key, " + instanceId + " does not exist.");
+        }
+        return instanceStates.get(instanceId.toString());
+    }
+
+    @Override
+    public void updateExecutionInstance(InstanceState instanceState) throws StateStoreException {
+        String key = new ID(instanceState.getInstance()).toString();
+        if (!instanceStates.containsKey(key)) {
+            throw new StateStoreException("Instance with key, " + key + " does not exist.");
+        }
+        instanceStates.put(key, instanceState);
+    }
+
+    @Override
+    public Collection<InstanceState> getAllExecutionInstances(Entity entity, String cluster)
+        throws StateStoreException {
+        ID id = new ID(entity, cluster);
+        if (!entityStates.containsKey(id.getEntityKey())) {
+            throw new StateStoreException("Entity with key, " + id.getEntityKey() + " does not exist.");
+        }
+        Collection<InstanceState> instances = new ArrayList<InstanceState>();
+        for (Map.Entry<String, InstanceState> instanceState : instanceStates.entrySet()) {
+            if (instanceState.getKey().startsWith(id.toString())) {
+                instances.add(instanceState.getValue());
+            }
+        }
+        return instances;
+    }
+
+    @Override
+    public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster,
+            Collection<InstanceState.STATE> states) throws StateStoreException {
+        ID id = new ID(entity, cluster);
+        return getExecutionInstances(id, states);
+    }
+
+    @Override
+    public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster,
+            Collection<InstanceState.STATE> states, DateTime start, DateTime end) throws StateStoreException {
+        List<InstanceState> instancesToReturn = new ArrayList<>();
+        ID id = new ID(entity, cluster);
+        for (InstanceState state : getExecutionInstances(id, states)) {
+            ExecutionInstance instance = state.getInstance();
+            // if end time is before start time of instance
+            // or start time is after end time of instance ignore.
+            if ((instance.getActualStart() != null && !(end.isBefore(instance.getActualStart()))
+                    || (instance.getActualEnd() != null && start.isAfter(instance.getActualEnd())))) {
+                instancesToReturn.add(state);
+            }
+        }
+        return instancesToReturn;
+    }
+
+    @Override
+    public Collection<InstanceState> getExecutionInstances(ID entityId, Collection<InstanceState.STATE> states)
+        throws StateStoreException {
+        Collection<InstanceState> instances = new ArrayList<InstanceState>();
+        for (Map.Entry<String, InstanceState> instanceState : instanceStates.entrySet()) {
+            if (instanceState.getKey().startsWith(entityId.toString())
+                    && states.contains(instanceState.getValue().getCurrentState())) {
+                instances.add(instanceState.getValue());
+            }
+        }
+        return instances;
+    }
+
+    @Override
+    public InstanceState getLastExecutionInstance(Entity entity, String cluster) throws StateStoreException {
+        ID id = new ID(entity, cluster);
+        if (!entityStates.containsKey(id.getEntityKey())) {
+            throw new StateStoreException("Entity with key, " + id.getEntityKey() + " does not exist.");
+        }
+        InstanceState latestState = null;
+        // TODO : Very crude. Iterating over all entries and getting the last one.
+        for (Map.Entry<String, InstanceState> instanceState : instanceStates.entrySet()) {
+            if (instanceState.getKey().startsWith(id.toString())) {
+                latestState = instanceState.getValue();
+            }
+        }
+        return latestState;
+    }
+
+    @Override
+    public boolean executionInstanceExists(ID instanceId) {
+        return instanceStates.containsKey(instanceId.toString());
+    }
+
+    @Override
+    public void deleteExecutionInstances(ID entityId) {
+        for (String instanceKey : Lists.newArrayList(instanceStates.keySet())) {
+            if (instanceKey.startsWith(entityId.getEntityKey())) {
+                instanceStates.remove(instanceKey);
+            }
+        }
+    }
+
+    public void clear() {
+        entityStates.clear();
+        instanceStates.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
new file mode 100644
index 0000000..d6a4b49
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.exception.StateStoreException;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceState;
+import org.joda.time.DateTime;
+
+import java.util.Collection;
+
+/**
+ * Interface to abstract out instance store API.
+ */
+// TODO : Add order and limit capabilities to the API
+public interface InstanceStateStore {
+    /**
+     * Adds an execution instance to the store.
+     *
+     * @param instanceState
+     * @throws StateStoreException
+     */
+    void putExecutionInstance(InstanceState instanceState) throws StateStoreException;
+
+    /**
+     * @param instanceId
+     * @return Execution instance corresponding to the name.
+     * @throws StateStoreException - When instance does not exist
+     */
+    InstanceState getExecutionInstance(ID instanceId) throws StateStoreException;
+
+    /**
+     * Updates an execution instance in the store.
+     *
+     * @param instanceState
+     * @throws StateStoreException - if the instance does not exist.
+     */
+    void updateExecutionInstance(InstanceState instanceState) throws StateStoreException;
+
+    /**
+     * @param entity
+     * @param cluster
+     * @return - All execution instances for the given entity and cluster.
+     * @throws StateStoreException
+     */
+    Collection<InstanceState> getAllExecutionInstances(Entity entity, String cluster) throws StateStoreException;
+
+    /**
+     * @param entity
+     * @param cluster
+     * @param states
+     * @return - All execution instances for the given entity and cluster and states
+     * @throws StateStoreException
+     */
+    Collection<InstanceState> getExecutionInstances(Entity entity, String cluster,
+                                                    Collection<InstanceState.STATE> states) throws StateStoreException;
+
+    /**
+     * @param entity
+     * @param cluster
+     * @param states
+     * @return - All execution instances for the given entity and cluster and states
+     * @throws StateStoreException
+     */
+    Collection<InstanceState> getExecutionInstances(Entity entity, String cluster,
+                                                    Collection<InstanceState.STATE> states,
+                                                    DateTime start, DateTime end) throws StateStoreException;
+
+    /**
+     * @param entityId
+     * @param states
+     * @return - All execution instance for an given entityKey (that includes the cluster name)
+     * @throws StateStoreException
+     */
+    Collection<InstanceState> getExecutionInstances(ID entityId, Collection<InstanceState.STATE> states)
+        throws StateStoreException;
+    /**
+     * @param entity
+     * @param cluster
+     * @return - The latest execution instance
+     * @throws StateStoreException
+     */
+    InstanceState getLastExecutionInstance(Entity entity, String cluster) throws StateStoreException;
+
+    /**
+     * @param instanceId
+     * @return true, if instance exists.
+     */
+    boolean executionInstanceExists(ID instanceId);
+
+    /**
+     * Delete instances of a given entity.
+     *
+     * @param entityId
+     */
+    void deleteExecutionInstances(ID entityId);
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java
new file mode 100644
index 0000000..f595c26
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store;
+
+import org.apache.falcon.service.ConfigurationChangeListener;
+
+/**
+ * Interface that combines entity, instance store APIs and also config change listener's.
+ */
+public interface StateStore extends ConfigurationChangeListener, EntityStateStore, InstanceStateStore {
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
new file mode 100644
index 0000000..ebc05ec
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.workflow.engine;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.exception.DAGEngineException;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.resource.InstancesResult;
+
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * Interface for an implementation that executes a DAG.
+ */
+public interface DAGEngine {
+
+    /**
+     * Run an instance for execution.
+     *
+     * @param instance
+     * @return
+     * @throws DAGEngineException
+     */
+    String run(ExecutionInstance instance) throws DAGEngineException;
+
+    /**
+     * @param instance
+     * @return true if an instance is scheduled for execution.
+     * @throws DAGEngineException
+     */
+    boolean isScheduled(ExecutionInstance instance) throws DAGEngineException;
+
+    /**
+     * Suspend a running instance.
+     *
+     * @param instance
+     * @throws DAGEngineException
+     */
+    void suspend(ExecutionInstance instance) throws DAGEngineException;
+
+    /**
+     * Resume a suspended instance.
+     *
+     * @param instance
+     * @throws DAGEngineException
+     */
+    void resume(ExecutionInstance instance) throws DAGEngineException;
+
+    /**
+     * Kill a running instance.
+     *
+     * @param instance
+     * @throws DAGEngineException
+     */
+    void kill(ExecutionInstance instance) throws DAGEngineException;
+
+    /**
+     * Re-run a terminated instance.
+     *
+     * @param instance
+     * @throws DAGEngineException
+     */
+    void reRun(ExecutionInstance instance) throws DAGEngineException;
+
+    /**
+     * Perform dryrun of an instance.
+     *
+     * @param entity
+     * @throws DAGEngineException
+     */
+    void submit(Entity entity) throws DAGEngineException;
+
+    /**
+     * Returns info about the Job.
+     * @param externalID
+     * @return
+     */
+    InstancesResult.Instance info(String externalID) throws DAGEngineException;
+
+    /**
+     * @param externalID
+     * @return status of each individual node in the DAG.
+     * @throws DAGEngineException
+     */
+    List<InstancesResult.InstanceAction> getJobDetails(String externalID) throws DAGEngineException;
+
+    /**
+     * @return true if DAG Engine is up and running.
+     */
+    boolean isAlive() throws DAGEngineException;
+
+    /**
+     * @param externalID
+     * @return Configuration used to run the job.
+     * @throws DAGEngineException
+     */
+    Properties getConfiguration(String externalID) throws DAGEngineException;
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngineFactory.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngineFactory.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngineFactory.java
new file mode 100644
index 0000000..e400326
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngineFactory.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.workflow.engine;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.util.ReflectionUtils;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *  Factory for providing appropriate dag engine to the Falcon services.
+ */
+public final class DAGEngineFactory {
+    private static final String DAG_ENGINE = "dag.engine.impl";
+
+    // Cache the DAGEngines, to avoid overhead of creation.
+    private static final Map<String, DAGEngine> DAG_ENGINES = new HashMap<>();
+
+    private DAGEngineFactory() {
+    }
+
+    public static DAGEngine getDAGEngine(Cluster cluster) throws FalconException {
+        return getDAGEngine(cluster.getName());
+    }
+
+    public static DAGEngine getDAGEngine(String clusterName) throws FalconException {
+        // Cache the DAGEngines for every cluster.
+        if (!DAG_ENGINES.containsKey(clusterName)) {
+            DAG_ENGINES.put(clusterName,
+                    (DAGEngine) ReflectionUtils.getInstance(DAG_ENGINE, String.class, clusterName));
+        }
+
+        return DAG_ENGINES.get(clusterName);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
new file mode 100644
index 0000000..8dcf3a5
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.workflow.engine;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.execution.EntityExecutor;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.execution.FalconExecutionService;
+import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.InstancesSummaryResult;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceState;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.StateStore;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Workflow engine which uses Falcon's native scheduler.
+ */
+public class FalconWorkflowEngine extends AbstractWorkflowEngine {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FalconWorkflowEngine.class);
+    private static final FalconExecutionService EXECUTION_SERVICE = FalconExecutionService.get();
+    private static final StateStore STATE_STORE = AbstractStateStore.get();
+    private static final ConfigurationStore CONFIG_STORE = ConfigurationStore.get();
+    private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters";
+
+    private enum JobAction {
+        KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY, PARAMS
+    }
+
+    public FalconWorkflowEngine() {
+        // Registering As it cleans up staging paths and not entirely Oozie Specific.
+        registerListener(new OozieHouseKeepingService());
+    }
+
+    @Override
+    public boolean isAlive(Cluster cluster) throws FalconException {
+        return DAGEngineFactory.getDAGEngine(cluster).isAlive();
+    }
+
+    @Override
+    public void schedule(Entity entity, Boolean skipDryRun, Map<String, String> properties) throws FalconException {
+        EXECUTION_SERVICE.schedule(entity);
+    }
+
+    @Override
+    public void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException {
+        DAGEngineFactory.getDAGEngine(clusterName).submit(entity);
+    }
+
+    @Override
+    public boolean isActive(Entity entity) throws FalconException {
+        return STATE_STORE.getEntity(new ID(entity)).getCurrentState() != EntityState.STATE.SUBMITTED;
+    }
+
+    @Override
+    public boolean isSuspended(Entity entity) throws FalconException {
+        return STATE_STORE.getEntity(new ID(entity))
+                .getCurrentState().equals(EntityState.STATE.SUSPENDED);
+    }
+
+    @Override
+    public boolean isCompleted(Entity entity) throws FalconException {
+        throw new FalconException("Not yet implemented.");
+    }
+
+    @Override
+    public String suspend(Entity entity) throws FalconException {
+        EXECUTION_SERVICE.suspend(entity);
+        return "SUCCESS";
+    }
+
+    @Override
+    public String resume(Entity entity) throws FalconException {
+        EXECUTION_SERVICE.resume(entity);
+        return "SUCCESS";
+    }
+
+    @Override
+    public String delete(Entity entity) throws FalconException {
+        if (isActive(entity)) {
+            EXECUTION_SERVICE.delete(entity);
+        }
+        // This should remove it from state store too as state store listens to config store changes.
+        CONFIG_STORE.remove(entity.getEntityType(), entity.getName());
+        return "SUCCESS";
+    }
+
+    @Override
+    public String delete(Entity entity, String cluster) throws FalconException {
+        EXECUTION_SERVICE.getEntityExecutor(entity, cluster).killAll();
+        return "SUCCESS";
+    }
+
+    @Override
+    public InstancesResult getRunningInstances(Entity entity, List<LifeCycle> lifeCycles) throws FalconException {
+        Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
+        List<InstancesResult.Instance> runInstances = new ArrayList<>();
+
+        for (String cluster : clusters) {
+            Collection<InstanceState> instances =
+                    STATE_STORE.getExecutionInstances(entity, cluster, InstanceState.getRunningStates());
+            for (InstanceState state : instances) {
+                String instanceTimeStr = state.getInstance().getInstanceTime().toString();
+                InstancesResult.Instance instance = new InstancesResult.Instance(cluster, instanceTimeStr,
+                        InstancesResult.WorkflowStatus.RUNNING);
+                instance.startTime = state.getInstance().getActualStart().toDate();
+                runInstances.add(instance);
+            }
+        }
+        InstancesResult result = new InstancesResult(APIResult.Status.SUCCEEDED, "Running Instances");
+        result.setInstances(runInstances.toArray(new InstancesResult.Instance[runInstances.size()]));
+        return result;
+    }
+
+    private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end,
+                                        Properties props, List<LifeCycle> lifeCycles) throws FalconException {
+        Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
+        List<String> clusterList = getIncludedClusters(props, FALCON_INSTANCE_ACTION_CLUSTERS);
+        APIResult.Status overallStatus = APIResult.Status.SUCCEEDED;
+        int instanceCount = 0;
+
+        Collection<InstanceState.STATE> states;
+        switch(action) {
+        case KILL:
+        case SUSPEND:
+            states = InstanceState.getActiveStates();
+            break;
+        case RESUME:
+            states = new ArrayList<>();
+            states.add(InstanceState.STATE.SUSPENDED);
+            break;
+        case STATUS:
+        case PARAMS:
+            // Applicable only for running and finished jobs.
+            states = InstanceState.getRunningStates();
+            states.addAll(InstanceState.getTerminalStates());
+            states.add(InstanceState.STATE.SUSPENDED);
+            break;
+        default:
+            throw new IllegalArgumentException("Unhandled action " + action);
+        }
+
+        List<ExecutionInstance> instancesToActOn = new ArrayList<>();
+        for (String cluster : clusters) {
+            if (clusterList.size() != 0 && !clusterList.contains(cluster)) {
+                continue;
+            }
+            LOG.debug("Retrieving instances for cluster : {} for action {}" , cluster, action);
+            Collection<InstanceState> instances =
+                    STATE_STORE.getExecutionInstances(entity, cluster, states, new DateTime(start), new DateTime(end));
+            for (InstanceState state : instances) {
+                instancesToActOn.add(state.getInstance());
+            }
+        }
+
+        List<InstancesResult.Instance> instances = new ArrayList<>();
+        for (ExecutionInstance ins : instancesToActOn) {
+            instanceCount++;
+            String instanceTimeStr = SchemaHelper.formatDateUTC(ins.getInstanceTime().toDate());
+
+            InstancesResult.Instance instance = null;
+            try {
+                instance = performAction(ins.getCluster(), entity, action, ins);
+                instance.instance = instanceTimeStr;
+            } catch (FalconException e) {
+                LOG.warn("Unable to perform action {} on cluster", action, e);
+                instance = new InstancesResult.Instance(ins.getCluster(), instanceTimeStr, null);
+                instance.status = InstancesResult.WorkflowStatus.ERROR;
+                instance.details = e.getMessage();
+                overallStatus = APIResult.Status.PARTIAL;
+            }
+            instances.add(instance);
+        }
+        if (instanceCount < 2 && overallStatus == APIResult.Status.PARTIAL) {
+            overallStatus = APIResult.Status.FAILED;
+        }
+        InstancesResult instancesResult = new InstancesResult(overallStatus, action.name());
+        instancesResult.setInstances(instances.toArray(new InstancesResult.Instance[instances.size()]));
+        return instancesResult;
+    }
+
+    private List<String> getIncludedClusters(Properties props, String clustersType) {
+        String clusters = props == null ? "" : props.getProperty(clustersType, "");
+        List<String> clusterList = new ArrayList<>();
+        for (String cluster : clusters.split(",")) {
+            if (StringUtils.isNotEmpty(cluster)) {
+                clusterList.add(cluster.trim());
+            }
+        }
+        return clusterList;
+    }
+
+    private InstancesResult.Instance performAction(String cluster, Entity entity, JobAction action,
+                                                   ExecutionInstance instance) throws FalconException {
+        EntityExecutor executor = EXECUTION_SERVICE.getEntityExecutor(entity, cluster);
+        InstancesResult.Instance instanceInfo = null;
+        LOG.debug("Retrieving information for {} for action {}", instance.getId(), action);
+        if (StringUtils.isNotEmpty(instance.getExternalID())) {
+            instanceInfo = DAGEngineFactory.getDAGEngine(cluster).info(instance.getExternalID());
+        } else {
+            instanceInfo = new InstancesResult.Instance();
+        }
+        switch(action) {
+        case KILL:
+            executor.kill(instance);
+            instanceInfo.status = InstancesResult.WorkflowStatus.KILLED;
+            break;
+        case SUSPEND:
+            executor.suspend(instance);
+            instanceInfo.status = InstancesResult.WorkflowStatus.SUSPENDED;
+            break;
+        case RESUME:
+            executor.resume(instance);
+            instanceInfo.status =
+                    InstancesResult.WorkflowStatus.valueOf(STATE_STORE
+                            .getExecutionInstance(instance.getId()).getCurrentState().name());
+            break;
+        case RERUN:
+            break;
+        case STATUS:
+            if (StringUtils.isNotEmpty(instance.getExternalID())) {
+                List<InstancesResult.InstanceAction> instanceActions =
+                        DAGEngineFactory.getDAGEngine(cluster).getJobDetails(instance.getExternalID());
+                instanceInfo.actions = instanceActions
+                        .toArray(new InstancesResult.InstanceAction[instanceActions.size()]);
+            }
+            break;
+
+        case PARAMS:
+            // Mask details, log
+            instanceInfo.details = null;
+            instanceInfo.logFile = null;
+            Properties props = DAGEngineFactory.getDAGEngine(cluster).getConfiguration(instance.getExternalID());
+            InstancesResult.KeyValuePair[] keyValuePairs = new InstancesResult.KeyValuePair[props.size()];
+            int i=0;
+            for (String name : props.stringPropertyNames()) {
+                keyValuePairs[i++] = new InstancesResult.KeyValuePair(name, props.getProperty(name));
+            }
+            break;
+        default:
+            throw new IllegalArgumentException("Unhandled action " + action);
+        }
+        return instanceInfo;
+    }
+
+    @Override
+    public InstancesResult killInstances(Entity entity, Date start, Date end,
+                                         Properties props, List<LifeCycle> lifeCycles) throws FalconException {
+        return doJobAction(JobAction.KILL, entity, start, end, props, lifeCycles);
+    }
+
+    @Override
+    public InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props,
+                                          List<LifeCycle> lifeCycles, Boolean isForced) throws FalconException {
+        throw new FalconException("Not yet Implemented");
+    }
+
+    @Override
+    public InstancesResult suspendInstances(Entity entity, Date start, Date end,
+                                            Properties props, List<LifeCycle> lifeCycles) throws FalconException {
+        return doJobAction(JobAction.SUSPEND, entity, start, end, props, lifeCycles);
+    }
+
+    @Override
+    public InstancesResult resumeInstances(Entity entity, Date start, Date end,
+                                           Properties props, List<LifeCycle> lifeCycles) throws FalconException {
+        return doJobAction(JobAction.RESUME, entity, start, end, props, lifeCycles);
+    }
+
+    @Override
+    public InstancesResult getStatus(Entity entity, Date start, Date end,
+                                     List<LifeCycle> lifeCycles) throws FalconException {
+        return doJobAction(JobAction.STATUS, entity, start, end, null, lifeCycles);
+    }
+
+    @Override
+    public InstancesSummaryResult getSummary(Entity entity, Date start, Date end,
+                                             List<LifeCycle> lifeCycles) throws FalconException {
+        throw new FalconException("Not yet Implemented");
+    }
+
+    @Override
+    public InstancesResult getInstanceParams(Entity entity, Date start, Date end,
+                                             List<LifeCycle> lifeCycles) throws FalconException {
+        return doJobAction(JobAction.PARAMS, entity, start, end, null, lifeCycles);
+    }
+
+    @Override
+    public boolean isNotificationEnabled(String cluster, String jobID) throws FalconException {
+        return true;
+    }
+
+    @Override
+    public String update(Entity oldEntity, Entity newEntity, String cluster, Boolean skipDryRun)
+        throws FalconException {
+        throw new FalconException("Not yet Implemented");
+    }
+
+    @Override
+    public String touch(Entity entity, String cluster, Boolean skipDryRun) throws FalconException {
+        throw new FalconException("Not yet Implemented");
+    }
+
+    @Override
+    public void reRun(String cluster, String jobId, Properties props, boolean isForced) throws FalconException {
+        throw new FalconException("Not yet Implemented");
+    }
+
+    @Override
+    public String getWorkflowStatus(String cluster, String jobId) throws FalconException {
+        return DAGEngineFactory.getDAGEngine(cluster).info(jobId).getStatus().name();
+    }
+
+    @Override
+    public Properties getWorkflowProperties(String cluster, String jobId) throws FalconException {
+        return DAGEngineFactory.getDAGEngine(cluster).getConfiguration(jobId);
+    }
+
+    @Override
+    public InstancesResult getJobDetails(String cluster, String jobId) throws FalconException {
+        InstancesResult.Instance[] instances = new InstancesResult.Instance[1];
+        InstancesResult result = new InstancesResult(APIResult.Status.SUCCEEDED,
+                "Instance for workflow id:" + jobId);
+        instances[0] = DAGEngineFactory.getDAGEngine(cluster).info(jobId);
+        result.setInstances(instances);
+        return result;
+    }
+}
+


Mime
View raw message