falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject [5/6] falcon git commit: FALCON-1213 Base framework of the native scheduler
Date Tue, 20 Oct 2015 12:10:02 GMT
FALCON-1213 Base framework of the native scheduler


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/4175c54a
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/4175c54a
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/4175c54a

Branch: refs/heads/master
Commit: 4175c54a158eeb9883dc192260890eb2b73ad6f1
Parents: 5a55bae
Author: Pallavi Rao <pallavi.rao@inmobi.com>
Authored: Tue Oct 20 17:38:26 2015 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Tue Oct 20 17:38:26 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/falcon/entity/EntityUtil.java    |  34 ++
 pom.xml                                         |   6 +-
 scheduler/pom.xml                               | 120 ++++
 .../falcon/exception/DAGEngineException.java    |  48 ++
 .../InvalidStateTransitionException.java        |  47 ++
 .../exception/NotificationServiceException.java |  48 ++
 .../falcon/exception/StateStoreException.java   |  47 ++
 .../apache/falcon/execution/EntityExecutor.java | 111 ++++
 .../falcon/execution/ExecutionInstance.java     | 180 ++++++
 .../execution/FalconExecutionService.java       | 214 +++++++
 .../falcon/execution/NotificationHandler.java   |  34 ++
 .../execution/ProcessExecutionInstance.java     | 277 +++++++++
 .../falcon/execution/ProcessExecutor.java       | 460 +++++++++++++++
 .../apache/falcon/execution/SchedulerUtil.java  |  54 ++
 .../service/FalconNotificationService.java      |  76 +++
 .../service/NotificationServicesRegistry.java   | 125 +++++
 .../notification/service/event/DataEvent.java   |  76 +++
 .../notification/service/event/Event.java       |  37 ++
 .../service/event/JobCompletedEvent.java        |  58 ++
 .../service/event/JobScheduledEvent.java        |  80 +++
 .../service/event/TimeElapsedEvent.java         |  62 +++
 .../notification/service/impl/AlarmService.java | 326 +++++++++++
 .../service/impl/DataAvailabilityService.java   |  94 ++++
 .../service/impl/JobCompletionService.java      | 208 +++++++
 .../service/impl/SchedulerService.java          | 399 +++++++++++++
 .../service/request/AlarmRequest.java           |  84 +++
 .../request/DataNotificationRequest.java        |  79 +++
 .../JobCompletionNotificationRequest.java       |  62 +++
 .../request/JobScheduleNotificationRequest.java |  60 ++
 .../service/request/NotificationRequest.java    |  53 ++
 .../org/apache/falcon/predicate/Predicate.java  | 220 ++++++++
 .../org/apache/falcon/state/EntityState.java    | 133 +++++
 .../falcon/state/EntityStateChangeHandler.java  |  59 ++
 .../main/java/org/apache/falcon/state/ID.java   | 200 +++++++
 .../org/apache/falcon/state/InstanceState.java  | 250 +++++++++
 .../state/InstanceStateChangeHandler.java       |  99 ++++
 .../org/apache/falcon/state/StateMachine.java   |  34 ++
 .../org/apache/falcon/state/StateService.java   | 185 ++++++
 .../falcon/state/store/AbstractStateStore.java  |  92 +++
 .../falcon/state/store/EntityStateStore.java    |  76 +++
 .../falcon/state/store/InMemoryStateStore.java  | 227 ++++++++
 .../falcon/state/store/InstanceStateStore.java  | 113 ++++
 .../apache/falcon/state/store/StateStore.java   |  27 +
 .../falcon/workflow/engine/DAGEngine.java       | 115 ++++
 .../workflow/engine/DAGEngineFactory.java       |  53 ++
 .../workflow/engine/FalconWorkflowEngine.java   | 366 ++++++++++++
 .../falcon/workflow/engine/OozieDAGEngine.java  | 401 +++++++++++++
 .../execution/FalconExecutionServiceTest.java   | 557 +++++++++++++++++++
 .../apache/falcon/execution/MockDAGEngine.java  | 122 ++++
 .../falcon/execution/SchedulerUtilTest.java     |  50 ++
 .../notification/service/AlarmServiceTest.java  |  77 +++
 .../service/SchedulerServiceTest.java           | 314 +++++++++++
 .../apache/falcon/predicate/PredicateTest.java  |  53 ++
 .../falcon/state/EntityStateServiceTest.java    | 119 ++++
 .../falcon/state/InstanceStateServiceTest.java  | 138 +++++
 .../resources/config/cluster/cluster-0.1.xml    |  43 ++
 .../src/test/resources/config/feed/feed-0.1.xml |  57 ++
 .../resources/config/process/process-0.1.xml    |  54 ++
 webapp/pom.xml                                  |   6 +
 60 files changed, 7800 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bc4fdf5..a4dc1c8 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@ Trunk (Unreleased)
     FALCON-1401 MetadataMappingService fails to add an edge for a process instance(Pallavi Rao) 
 
   NEW FEATURES
+    FALCON-1213 Base framework of the native scheduler(Pallavi Rao)
+
     FALCON-1315 Update falcon ui for HiveDR, secure clusters and bug fixes(Armando Reyna/Venkat Ranganathan via Sowmya Ramesh)
 
     FALCON-1102 Gather data transfer details of filesystem replication(Peeyush Bishnoi via Sowmya Ramesh)

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 3ab9339..ceefb17 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -85,10 +85,32 @@ public final class EntityUtil {
     private static final long DAY_IN_MS = 24 * HOUR_IN_MS;
     private static final long MONTH_IN_MS = 31 * DAY_IN_MS;
     private static final long ONE_MS = 1;
+    public static final String MR_JOB_PRIORITY = "jobPriority";
 
     public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
     private static final String STAGING_DIR_NAME_SEPARATOR = "_";
 
+    /** Priority with which the DAG will be scheduled.
+     *  Matches the five priorities of Hadoop jobs.
+     */
+    public enum JOBPRIORITY {
+        VERY_HIGH((short) 1),
+        HIGH((short) 2),
+        NORMAL((short) 3),
+        LOW((short) 4),
+        VERY_LOW((short) 5);
+
+        private short priority;
+
+        public short getPriority() {
+            return priority;
+        }
+
+        JOBPRIORITY(short priority) {
+            this.priority = priority;
+        }
+    }
+
     private EntityUtil() {}
 
     public static <T extends Entity> T getEntity(EntityType type, String entityName) throws FalconException {
@@ -1015,4 +1037,16 @@ public final class EntityUtil {
         }
         return props;
     }
+
+    public static JOBPRIORITY getPriority(Process process) {
+        org.apache.falcon.entity.v0.process.Properties processProps = process.getProperties();
+        if (processProps != null) {
+            for (org.apache.falcon.entity.v0.process.Property prop : processProps.getProperties()) {
+                if (prop.getName().equals(MR_JOB_PRIORITY)) {
+                    return JOBPRIORITY.valueOf(prop.getValue());
+                }
+            }
+        }
+        return JOBPRIORITY.NORMAL;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 54e6cd1..8cd3c3c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -110,6 +110,9 @@
         <hive.version>0.13.1</hive.version>
         <jetty.version>6.1.26</jetty.version>
         <jersey.version>1.9</jersey.version>
+        <quartz.version>2.2.1</quartz.version>
+        <joda.version>2.8.2</joda.version>
+        <mockito.version>1.9.5</mockito.version>
         <internal.maven.repo>file:///tmp/falcontemprepo</internal.maven.repo>
         <excluded.test.groups>exhaustive</excluded.test.groups>
     </properties>
@@ -427,6 +430,7 @@
         <module>messaging</module>
         <module>oozie-el-extensions</module>
         <module>oozie</module>
+        <module>scheduler</module>
         <module>acquisition</module>
         <module>replication</module>
         <module>retention</module>
@@ -680,7 +684,7 @@
             <dependency>
                 <groupId>org.mockito</groupId>
                 <artifactId>mockito-all</artifactId>
-                <version>1.8.5</version>
+                <version>${mockito.version}</version>
                 <scope>provided</scope>
             </dependency>
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/pom.xml
----------------------------------------------------------------------
diff --git a/scheduler/pom.xml b/scheduler/pom.xml
new file mode 100644
index 0000000..dddfcce
--- /dev/null
+++ b/scheduler/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.falcon</groupId>
+        <artifactId>falcon-main</artifactId>
+        <version>0.8-SNAPSHOT</version>
+    </parent>
+    <artifactId>falcon-scheduler</artifactId>
+    <description>Apache Falcon Scheduler Module</description>
+    <name>Apache Falcon Scheduler</name>
+    <packaging>jar</packaging>
+
+    <profiles>
+        <profile>
+            <id>hadoop-2</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-oozie-adaptor</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-messaging</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-common</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-test-util</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.oozie</groupId>
+            <artifactId>oozie-client</artifactId>
+        </dependency>
+
+	<dependency>
+	    <groupId>org.quartz-scheduler</groupId>
+	    <artifactId>quartz</artifactId>
+	    <version>${quartz.version}</version>
+	</dependency>
+
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>joda-time</groupId>
+            <artifactId>joda-time</artifactId>
+            <version>${joda.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.7</source>
+                    <target>1.7</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/exception/DAGEngineException.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/exception/DAGEngineException.java b/scheduler/src/main/java/org/apache/falcon/exception/DAGEngineException.java
new file mode 100644
index 0000000..8b5bb64
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/exception/DAGEngineException.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.exception;
+
+import org.apache.falcon.FalconException;
+
+/**
+ * Exception thrown by DAG Execution Engine.
+ */
+public class DAGEngineException extends FalconException {
+
+    /**
+     * @param e
+     */
+    public DAGEngineException(Throwable e) {
+        super(e);
+    }
+
+    /**
+     * @param message - custom message
+     * @param e
+     */
+    public DAGEngineException(String message, Throwable e) {
+        super(message, e);
+    }
+
+    /**
+     * @param message - custom message
+     */
+    public DAGEngineException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/exception/InvalidStateTransitionException.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/exception/InvalidStateTransitionException.java b/scheduler/src/main/java/org/apache/falcon/exception/InvalidStateTransitionException.java
new file mode 100644
index 0000000..19284a5
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/exception/InvalidStateTransitionException.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.exception;
+
+import org.apache.falcon.FalconException;
+
+/**
+ * Exception thrown during state transition of entities and instances.
+ */
+public class InvalidStateTransitionException extends FalconException {
+    /**
+     * @param e Exception
+     */
+    public InvalidStateTransitionException(Throwable e) {
+        super(e);
+    }
+
+    /**
+     * @param message - custom exception message
+     * @param e
+     */
+    public InvalidStateTransitionException(String message, Throwable e) {
+        super(message, e);
+    }
+
+    /**
+     * @param message - custom exception message
+     */
+    public InvalidStateTransitionException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/exception/NotificationServiceException.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/exception/NotificationServiceException.java b/scheduler/src/main/java/org/apache/falcon/exception/NotificationServiceException.java
new file mode 100644
index 0000000..b7f84df
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/exception/NotificationServiceException.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.exception;
+
+import org.apache.falcon.FalconException;
+
+/**
+ * Exception thrown by notification services.
+ */
+public class NotificationServiceException extends FalconException {
+
+    /**
+     * @param e
+     */
+    public NotificationServiceException(Throwable e) {
+        super(e);
+    }
+
+    /**
+     * @param message - custom message
+     * @param e
+     */
+    public NotificationServiceException(String message, Throwable e) {
+        super(message, e);
+    }
+
+    /**
+     * @param message - custom message
+     */
+    public NotificationServiceException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/exception/StateStoreException.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/exception/StateStoreException.java b/scheduler/src/main/java/org/apache/falcon/exception/StateStoreException.java
new file mode 100644
index 0000000..93bdad3
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/exception/StateStoreException.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.exception;
+
+import org.apache.falcon.FalconException;
+
+/**
+ * Exception thrown by the State store API.
+ */
+public class StateStoreException extends FalconException {
+    /**
+     * @param e
+     */
+    public StateStoreException(Throwable e) {
+        super(e);
+    }
+
+    /**
+     * @param message - custom message
+     * @param e
+     */
+    public StateStoreException(String message, Throwable e) {
+        super(message, e);
+    }
+
+    /**
+     * @param message - custom message
+     */
+    public StateStoreException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
new file mode 100644
index 0000000..9b07b9e
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.execution;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceStateChangeHandler;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.StateStore;
+
+/**
+ * This class is responsible for creation of execution instances for a given entity.
+ * An execution instance is created upon receipt of a "trigger event".
+ * It also handles the state transition of each execution instance.
+ * This class is also responsible for handling user interrupts for an entity such as suspend, kill etc.
+ */
+public abstract class EntityExecutor implements NotificationHandler, InstanceStateChangeHandler {
+    protected static final ConfigurationStore STORE = ConfigurationStore.get();
+    // The number of execution instances to be cached by default
+    public static final String DEFAULT_CACHE_SIZE = "20";
+    protected String cluster;
+    protected static final StateStore STATE_STORE = AbstractStateStore.get();
+    protected ID id;
+
+    /**
+     * Schedules execution instances for the entity. Idempotent operation.
+     *
+     * @throws FalconException
+     */
+    public abstract void schedule() throws FalconException;
+
+    /**
+     * Suspends all "active" execution instances of the entity.  Idempotent operation.
+     * The operation can fail on certain instances. In such cases, the operation is partially successful.
+     *
+     * @throws FalconException - When the operation on an instance fails
+     */
+    public abstract void suspendAll() throws FalconException;
+
+    /**
+     * Resumes all suspended execution instances of the entity.  Idempotent operation.
+     * The operation can fail on certain instances. In such cases, the operation is partially successful.
+     *
+     * @throws FalconException - When the operation on an instance fails
+     */
+    public abstract void resumeAll() throws FalconException;
+
+    /**
+     * Deletes all execution instances of an entity, even from the store.  Idempotent operation.
+     * The operation can fail on certain instances. In such cases, the operation is partially successful.
+     *
+     * @throws FalconException - When the operation on an instance fails
+     */
+    public abstract void killAll() throws FalconException;
+
+    /**
+     * Suspends a specified set of execution instances.  Idempotent operation.
+     * The operation can fail on certain instances. In such cases, the operation is partially successful.
+     *
+     * @param instance
+     * @throws FalconException
+     */
+    public abstract void suspend(ExecutionInstance instance) throws FalconException;
+
+    /**
+     * Resumes a specified set of execution instances.  Idempotent operation.
+     * The operation can fail on certain instances. In such cases, the operation is partially successful.
+     *
+     * @param instance
+     * @throws FalconException
+     */
+    public abstract void resume(ExecutionInstance instance) throws FalconException;
+
+    /**
+     * Kills a specified set of execution instances.  Idempotent operation.
+     * The operation can fail on certain instances. In such cases, the operation is partially successful.
+     *
+     * @param instance
+     * @throws FalconException
+     */
+    public abstract void kill(ExecutionInstance instance) throws FalconException;
+
+    /**
+     * @return The entity
+     */
+    public abstract Entity getEntity();
+
+    /**
+     * @return ID of the entity
+     */
+    public ID getId() {
+        return id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
new file mode 100644
index 0000000..3869ff2
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.execution;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.predicate.Predicate;
+import org.apache.falcon.state.ID;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import java.util.List;
+import java.util.TimeZone;
+
+/**
+ * Represents an execution instance of an entity.
+ */
+public abstract class ExecutionInstance implements NotificationHandler {
+
+    // TODO : Add more fields
+    private final String cluster;
+    // External ID is the ID used to identify the Job submitted to the DAG Engine, as returned by the DAG Engine.
+    // For example, for Oozie this would be the workflow Id.
+    private String externalID;
+    private final DateTime instanceTime;
+    private final DateTime creationTime;
+    private DateTime actualStart;
+    private DateTime actualEnd;
+    private static final DateTimeZone UTC = DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC"));
+
+    /**
+     * @param instanceTime
+     * @param cluster
+     */
+    public ExecutionInstance(DateTime instanceTime, String cluster) {
+        this.instanceTime = new DateTime(instanceTime, UTC);
+        this.cluster = cluster;
+        this.creationTime = DateTime.now(UTC);
+    }
+
+    /**
+     * For a-periodic instances.
+     * @param cluster
+     */
+    public ExecutionInstance(String cluster) {
+        this.instanceTime = DateTime.now();
+        this.cluster = cluster;
+        this.creationTime = DateTime.now(UTC);
+    }
+
+    /**
+     * @return - The external id corresponding to this instance.
+     * If the instance is executed on Oozie, externalID will the Oozie workflow ID.
+     */
+    public String getExternalID() {
+        return externalID;
+    }
+
+    /**
+     * Setter for external ID, Oozie workflow ID, for example.
+     *
+     * @param jobID
+     */
+    public void setExternalID(String jobID) {
+        this.externalID = jobID;
+    }
+
+    /**
+     * @return The unique ID of this instance. The instance is referred using this ID inside the system.
+     */
+    public abstract ID getId();
+
+    /**
+     * @return - The entity to which this instance belongs.
+     */
+    public abstract Entity getEntity();
+
+    /**
+     * @return - The nominal time of the instance.
+     */
+    public DateTime getInstanceTime() {
+        return instanceTime;
+    }
+
+    /**
+     * @return - The name of the cluster on which this instance is running
+     */
+    public String getCluster() {
+        return cluster;
+    }
+
+    /**
+     * @return - The sequential numerical id of the instance
+     */
+    public abstract int getInstanceSequence();
+
+    /**
+     * @return - Actual start time of instance.
+     */
+    public DateTime getActualStart() {
+        return actualStart;
+    }
+
+    /**
+     * @param actualStart
+     */
+    public void setActualStart(DateTime actualStart) {
+        this.actualStart = actualStart;
+    }
+
+    /**
+     * @return - Completion time of the instance
+     */
+    public DateTime getActualEnd() {
+        return actualEnd;
+    }
+
+    /**
+     * @param actualEnd
+     */
+    public void setActualEnd(DateTime actualEnd) {
+        this.actualEnd = actualEnd;
+    }
+
+
+    public DateTime getCreationTime() {
+        return creationTime;
+    }
+
+    /**
+     * @return - The gating conditions on which this instance is waiting before it is scheduled for execution.
+     * @throws FalconException
+     */
+    public abstract List<Predicate> getAwaitingPredicates() throws FalconException;
+
+    /**
+     * Suspends the instance if it is in one of the active states, waiting, ready or running.
+     *
+     * @throws FalconException
+     */
+    public abstract void suspend() throws FalconException;
+
+    /**
+     * Resumes a previously suspended instance.
+     *
+     * @throws FalconException
+     */
+    public abstract void resume() throws FalconException;
+
+    /**
+     * Kills an instance if it is in one of the active states, waiting, ready or running.
+     *
+     * @throws FalconException
+     */
+    public abstract void kill() throws FalconException;
+
+    /**
+     * Handles any clean up and de-registration of notification subscriptions.
+     * Invoked when the instance reaches one of its terminal states.
+     *
+     * @throws FalconException
+     */
+    public abstract void destroy() throws FalconException;
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
new file mode 100644
index 0000000..b959320
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.execution;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.service.FalconService;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.EntityStateChangeHandler;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.StateService;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This singleton is the entry point for all callbacks from the notification services.
+ * The execution service handles any system level events that apply to all entities.
+ * It is responsible for creation of entity executors one per entity, per cluster.
+ */
+public final class FalconExecutionService implements FalconService, EntityStateChangeHandler, NotificationHandler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FalconExecutionService.class);
+
+    // Stores all entity executors in memory
+    private ConcurrentMap<ID, EntityExecutor> executors = new ConcurrentHashMap<ID, EntityExecutor>();
+
+    private static FalconExecutionService executionService = new FalconExecutionService();
+
+    @Override
+    public String getName() {
+        return "FalconExecutionService";
+    }
+
+    public void init() {
+        LOG.debug("State store instance being used : {}", AbstractStateStore.get());
+        // Initialize all executors from store
+        for (Entity entity : AbstractStateStore.get().getEntities(EntityState.STATE.SCHEDULED)) {
+            try {
+                for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+                    EntityExecutor executor = createEntityExecutor(entity, cluster);
+                    executors.put(new ID(entity, cluster), executor);
+                    executor.schedule();
+                }
+            } catch (FalconException e) {
+                LOG.error("Unable to load entity : " + entity.getName(), e);
+                throw new RuntimeException(e);
+            }
+        }
+        // TODO : During migration, the state store itself may not have been completely bootstrapped.
+    }
+
+    /**
+     * Returns an EntityExecutor implementation based on the entity type.
+     *
+     * @param entity
+     * @param cluster
+     * @return
+     * @throws FalconException
+     */
+    private EntityExecutor createEntityExecutor(Entity entity, String cluster) throws FalconException {
+        switch (entity.getEntityType()) {
+        case FEED:
+            throw new UnsupportedOperationException("No support yet for feed.");
+        case PROCESS:
+            return new ProcessExecutor(((Process)entity), cluster);
+        default:
+            throw new IllegalArgumentException("Unhandled type " + entity.getEntityType().name());
+        }
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+
+    }
+
+    /**
+     * @return - An instance(singleton) of FalconExecutionService
+     */
+    public static FalconExecutionService get() {
+        return executionService;
+    }
+
+    private FalconExecutionService() {}
+
+    @Override
+    public void onEvent(Event event) throws FalconException {
+        // Currently, simply passes along the event to the appropriate executor
+        EntityExecutor executor = executors.get(event.getTarget().getEntityID());
+        if (executor == null) {
+            // The executor has gone away, throw an exception so the notification service knows
+            throw new FalconException("Target executor for " + event.getTarget().getEntityID() + " does not exist.");
+        }
+        executor.onEvent(event);
+    }
+
+    @Override
+    public void onSubmit(Entity entity) throws FalconException {
+        // Do nothing
+    }
+
+    @Override
+    public void onSchedule(Entity entity) throws FalconException {
+        for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+            EntityExecutor executor = createEntityExecutor(entity, cluster);
+            ID id = new ID(entity, cluster);
+            executors.put(id, executor);
+            LOG.info("Scheduling entity {}.", id);
+            executor.schedule();
+        }
+    }
+
+    @Override
+    public void onSuspend(Entity entity) throws FalconException {
+        for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+            EntityExecutor executor = getEntityExecutor(entity, cluster);
+            executor.suspendAll();
+        }
+    }
+
+    @Override
+    public void onResume(Entity entity) throws FalconException {
+        for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+            EntityExecutor executor = createEntityExecutor(entity, cluster);
+            executors.put(new ID(entity, cluster), executor);
+            LOG.info("Resuming entity, {} of type {} on cluster {}.", entity.getName(),
+                    entity.getEntityType(), cluster);
+            executor.resumeAll();
+        }
+    }
+
+    /**
+     * Schedules an entity.
+     *
+     * @param entity
+     * @throws FalconException
+     */
+    public void schedule(Entity entity) throws FalconException {
+        StateService.get().handleStateChange(entity, EntityState.EVENT.SCHEDULE, this);
+    }
+
+    /**
+     * Suspends an entity.
+     *
+     * @param entity
+     * @throws FalconException
+     */
+    public void suspend(Entity entity) throws FalconException {
+        StateService.get().handleStateChange(entity, EntityState.EVENT.SUSPEND, this);
+    }
+
+    /**
+     * Resumes an entity.
+     *
+     * @param entity
+     * @throws FalconException
+     */
+    public void resume(Entity entity) throws FalconException {
+        StateService.get().handleStateChange(entity, EntityState.EVENT.RESUME, this);
+    }
+
+    /**
+     * Deletes an entity from the execution service.
+     *
+     * @param entity
+     * @throws FalconException
+     */
+    public void delete(Entity entity) throws FalconException {
+        for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+            EntityExecutor executor = getEntityExecutor(entity, cluster);
+            executor.killAll();
+            executors.remove(executor.getId());
+        }
+    }
+
+    /**
+     * Returns the instance of {@link EntityExecutor} for a given entity and cluster.
+     *
+     * @param entity
+     * @param cluster
+     * @return
+     * @throws FalconException
+     */
+    public EntityExecutor getEntityExecutor(Entity entity, String cluster) throws FalconException {
+        ID id = new ID(entity, cluster);
+        if (executors.containsKey(id)) {
+            return executors.get(id);
+        } else {
+            throw new FalconException("Entity executor for entity : " + id.getEntityKey() + " does not exist.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java b/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java
new file mode 100644
index 0000000..b071f5f
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.execution;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.notification.service.event.Event;
+
+/**
+ * An interface that every class that handles notifications from notification services must implement.
+ */
+public interface NotificationHandler {
+    /**
+     * The method a notification service calls to onEvent an event.
+     *
+     * @param event
+     * @throws FalconException
+     */
+    void onEvent(Event event) throws FalconException;
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
new file mode 100644
index 0000000..19089c4
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.execution;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.Cluster;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.notification.service.event.DataEvent;
+import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.JobCompletedEvent;
+import org.apache.falcon.notification.service.event.JobScheduledEvent;
+import org.apache.falcon.notification.service.impl.DataAvailabilityService;
+import org.apache.falcon.predicate.Predicate;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.workflow.engine.DAGEngine;
+import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * Represents an execution instance of a process.
+ * Responsible for user actions such as suspend, resume, kill on individual instances.
+ */
+
+public class ProcessExecutionInstance extends ExecutionInstance {
+    private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutionInstance.class);
+    private final Process process;
+    private List<Predicate> awaitedPredicates = new ArrayList<Predicate>();
+    private DAGEngine dagEngine = null;
+    private boolean hasTimedOut = false;
+    private ID id;
+    private int instanceSequence;
+    private final FalconExecutionService executionService = FalconExecutionService.get();
+
+    /**
+     * Constructor.
+     *
+     * @param process
+     * @param instanceTime
+     * @param cluster
+     * @throws FalconException
+     */
+    public ProcessExecutionInstance(Process process, DateTime instanceTime, String cluster) throws FalconException {
+        super(instanceTime, cluster);
+        this.process = process;
+        this.id = new ID(process, cluster, getInstanceTime());
+        computeInstanceSequence();
+        dagEngine = DAGEngineFactory.getDAGEngine(cluster);
+        registerForNotifications(false);
+    }
+
+    // Computes the instance number based on the nominal time.
+    // Method can be extended to assign instance numbers for non-time based instances.
+    private void computeInstanceSequence() {
+        for (Cluster processCluster : process.getClusters().getClusters()) {
+            if (processCluster.getName().equals(getCluster())) {
+                Date start = processCluster.getValidity().getStart();
+                instanceSequence = EntityUtil.getInstanceSequence(start, process.getFrequency(),
+                        process.getTimezone(), getInstanceTime().toDate());
+                break;
+            }
+        }
+    }
+
+    // Currently, registers for only data notifications to ensure gating conditions are met.
+    // Can be extended to register for other notifications.
+    private void registerForNotifications(boolean isResume) throws FalconException {
+        if (process.getInputs() == null) {
+            return;
+        }
+        for (Input input : process.getInputs().getInputs()) {
+            // Register for notification for every required input
+            if (input.isOptional()) {
+                continue;
+            }
+            Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed());
+            for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
+                List<Location> locations = FeedHelper.getLocations(cluster, feed);
+                for (Location loc : locations) {
+                    if (loc.getType() != LocationType.DATA) {
+                        continue;
+                    }
+
+                    Predicate predicate = Predicate.createDataPredicate(loc);
+                    // To ensure we evaluate only predicates not evaluated before when an instance is resumed.
+                    if (isResume && !awaitedPredicates.contains(predicate)) {
+                        continue;
+                    }
+                    // TODO : Revisit this once the Data Availability Service has been built
+                    DataAvailabilityService.DataRequestBuilder requestBuilder =
+                            (DataAvailabilityService.DataRequestBuilder)
+                            NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.DATA)
+                                    .createRequestBuilder(executionService, getId());
+                    requestBuilder.setDataLocation(new Path(loc.getPath()));
+                    NotificationServicesRegistry.register(requestBuilder.build());
+                    LOG.info("Registered for a data notification for process {} for data location {}",
+                            process.getName(), loc.getPath());
+                    awaitedPredicates.add(predicate);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onEvent(Event event) throws FalconException {
+        switch (event.getSource()) {
+        case JOB_SCHEDULE:
+            JobScheduledEvent jobScheduleEvent = (JobScheduledEvent) event;
+            setExternalID(jobScheduleEvent.getExternalID());
+            setActualStart(jobScheduleEvent.getStartTime());
+            break;
+        case JOB_COMPLETION:
+            setActualEnd(((JobCompletedEvent)event).getEndTime());
+            break;
+        case DATA:
+            // Data has not become available and the wait time has passed
+            if (((DataEvent) event).getStatus() == DataEvent.STATUS.UNAVAILABLE) {
+                if (getTimeOutInMillis() <= (System.currentTimeMillis() - getCreationTime().getMillis())) {
+                    hasTimedOut = true;
+                }
+            } else {
+                // If the event matches any of the awaited predicates, remove the predicate of the awaited list
+                Predicate toRemove = null;
+                for (Predicate predicate : awaitedPredicates) {
+                    if (predicate.evaluate(Predicate.getPredicate(event))) {
+                        toRemove = predicate;
+                        break;
+                    }
+                }
+                if (toRemove != null) {
+                    awaitedPredicates.remove(toRemove);
+                }
+            }
+            break;
+        default:
+        }
+    }
+
+    /**
+     * Is the instance ready to be scheduled?
+     *
+     * @return true when it is not already scheduled or is gated on some conditions.
+     */
+    public boolean isReady() {
+        if (getExternalID() != null) {
+            return false;
+        }
+        if (awaitedPredicates.isEmpty()) {
+            return true;
+        } else {
+            // If it is waiting to be scheduled, it is in ready.
+            for (Predicate predicate : awaitedPredicates) {
+                if (!predicate.getType().equals(Predicate.TYPE.JOB_COMPLETION)) {
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+
+    /**
+     * Is the instance scheduled for execution?
+     *
+     * @return - true if it is scheduled and has not yet completed.
+     * @throws FalconException
+     */
+    public boolean isScheduled() throws FalconException {
+        return getExternalID() != null && dagEngine.isScheduled(this);
+    }
+
+    /**
+     * Has the instance timed out waiting for gating conditions to be met?
+     *
+     * @return
+     */
+    public boolean hasTimedout() {
+        return hasTimedOut || (getTimeOutInMillis() <= (System.currentTimeMillis() - getCreationTime().getMillis()));
+    }
+
+    @Override
+    public ID getId() {
+        return id;
+    }
+
+    @Override
+    public Entity getEntity() {
+        return process;
+    }
+
+    @Override
+    public int getInstanceSequence() {
+        return instanceSequence;
+    }
+
+    @Override
+    public List<Predicate> getAwaitingPredicates() throws FalconException {
+        return awaitedPredicates;
+    }
+
+    @Override
+    public void suspend() throws FalconException {
+        if (getExternalID() != null) {
+            dagEngine.suspend(this);
+        }
+        destroy();
+    }
+
+    @Override
+    public void resume() throws FalconException {
+        // Was already scheduled on the DAGEngine, so resume on DAGEngine if suspended
+        if (getExternalID() != null) {
+            dagEngine.resume(this);
+        } else if (awaitedPredicates.size() != 0) {
+            // Evaluate any remaining predicates
+            registerForNotifications(true);
+        }
+    }
+
+    @Override
+    public void kill() throws FalconException {
+        if (getExternalID() != null) {
+            dagEngine.kill(this);
+        }
+        destroy();
+    }
+
+    // If timeout specified in process, uses it.
+    // Else, defaults to frequency of the entity * timeoutFactor
+    private long getTimeOutInMillis() {
+        if (process.getTimeout() == null) {
+            // Default timeout is the frequency of the entity
+            int timeoutFactor = Integer.parseInt(RuntimeProperties.get().getProperty("instance.timeout.factor",
+                    "1"));
+            return SchedulerUtil.getFrequencyInMillis(DateTime.now(), process.getFrequency()) * timeoutFactor;
+        } else {
+            // TODO : Should timeout = 0 have a special meaning or should it be disallowed?
+            return SchedulerUtil.getFrequencyInMillis(DateTime.now(), process.getTimeout());
+        }
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+        NotificationServicesRegistry.unregister(executionService, getId());
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
new file mode 100644
index 0000000..68c34e7
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.execution;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.process.Cluster;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.exception.InvalidStateTransitionException;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.JobCompletedEvent;
+import org.apache.falcon.notification.service.event.TimeElapsedEvent;
+import org.apache.falcon.notification.service.impl.JobCompletionService;
+import org.apache.falcon.notification.service.impl.SchedulerService;
+import org.apache.falcon.notification.service.impl.AlarmService;
+import org.apache.falcon.predicate.Predicate;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceState;
+import org.apache.falcon.state.StateService;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.TimeZone;
+
+/**
+ * This class is responsible for managing execution instances of a process.
+ * It caches the active process instances in memory and handles notification events.
+ * It intercepts all the notification events intended for its instances and passes them along to the instance after
+ * acting on it, where applicable.
+ */
+public class ProcessExecutor extends EntityExecutor {
+    private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutor.class);
+    protected LoadingCache<ID, ProcessExecutionInstance> instances;
+    private Predicate triggerPredicate;
+    private final Process process;
+    private final StateService stateService = StateService.get();
+    private final FalconExecutionService executionService = FalconExecutionService.get();
+
+    /**
+     * Constructor per entity, per cluster.
+     *
+     * @param proc
+     * @param clusterName
+     * @throws FalconException
+     */
+    public ProcessExecutor(Process proc, String clusterName) throws FalconException {
+        process = proc;
+        cluster = clusterName;
+        id = new ID(proc, clusterName);
+    }
+
+    @Override
+    public void schedule() throws FalconException {
+        // Lazy instantiation
+        if (instances == null) {
+            initInstances();
+        }
+        // Check to handle restart and restoration from state store.
+        if (STATE_STORE.getEntity(id).getCurrentState() != EntityState.STATE.SCHEDULED) {
+            dryRun();
+        } else {
+            LOG.info("Process, {} was already scheduled on cluster, {}.", process.getName(), cluster);
+            LOG.info("Loading instances for process {} from state store.", process.getName());
+            reloadInstances();
+        }
+        registerForNotifications();
+    }
+
+    private void dryRun() throws FalconException {
+        DAGEngineFactory.getDAGEngine(cluster).submit(process);
+    }
+
+    // Initializes the cache of execution instances. Cache is backed by the state store.
+    private void initInstances() throws FalconException {
+        int cacheSize = Integer.parseInt(StartupProperties.get().getProperty("scheduler.instance.cache.size",
+                DEFAULT_CACHE_SIZE));
+
+        instances = CacheBuilder.newBuilder()
+                .maximumSize(cacheSize)
+                .build(new CacheLoader<ID, ProcessExecutionInstance>() {
+                    @Override
+                    public ProcessExecutionInstance load(ID id) throws Exception {
+                        return (ProcessExecutionInstance) STATE_STORE.getExecutionInstance(id).getInstance();
+                    }
+                });
+    }
+
+    // Re-load any active instances from state
+    private void reloadInstances() throws FalconException {
+        for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster,
+                InstanceState.getActiveStates())) {
+            ExecutionInstance instance = instanceState.getInstance();
+            LOG.debug("Loading instance {} from state.", instance.getId());
+            switch (instanceState.getCurrentState()) {
+            case RUNNING:
+                onSchedule(instance);
+                break;
+            case READY:
+                onConditionsMet(instance);
+                break;
+            case WAITING:
+                instance.resume();
+                break;
+            default: // skip
+            }
+            instances.put(instance.getId(), (ProcessExecutionInstance) instance);
+        }
+    }
+
+    @Override
+    public void suspendAll() throws FalconException {
+        NotificationServicesRegistry.unregister(executionService, getId());
+        StringBuffer errMsg = new StringBuffer();
+        // Only active instances are in memory. Suspend them first.
+        for (ExecutionInstance instance : instances.asMap().values()) {
+            try {
+                suspend(instance);
+            } catch (FalconException e) {
+                // Proceed with next
+                errMsg.append("Instance suspend failed for : " + instance.getId() + " due to " + e.getMessage());
+                LOG.error("Instance suspend failed for : " + instance.getId(), e);
+            }
+        }
+        for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster,
+                InstanceState.getActiveStates())) {
+            ExecutionInstance instance = instanceState.getInstance();
+            try {
+                suspend(instance);
+            } catch (FalconException e) {
+                errMsg.append("Instance suspend failed for : " + instance.getId() + " due to " + e.getMessage());
+                LOG.error("Instance suspend failed for : " + instance.getId(), e);
+            }
+        }
+        // Some errors
+        if (errMsg.length() != 0) {
+            throw new FalconException("Some instances failed to suspend : " + errMsg.toString());
+        }
+    }
+
+    @Override
+    public void resumeAll() throws FalconException {
+        if (instances == null) {
+            initInstances();
+        }
+        StringBuffer errMsg = new StringBuffer();
+        ArrayList<InstanceState.STATE> states = new ArrayList<InstanceState.STATE>();
+        // TODO : Distinguish between individually suspended instance versus suspended entity?
+        states.add(InstanceState.STATE.SUSPENDED);
+        // Load cache with suspended instances
+        for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster, states)) {
+            ExecutionInstance instance = instanceState.getInstance();
+            try {
+                resume(instance);
+            } catch (FalconException e) {
+                errMsg.append("Instance suspend failed for : " + instance.getId() + " due to " + e.getMessage());
+                LOG.error("Instance suspend failed for : " + instance.getId(), e);
+            }
+        }
+        registerForNotifications();
+        // Some errors
+        if (errMsg.length() != 0) {
+            throw new FalconException("Some instances failed to resume : " + errMsg.toString());
+        }
+    }
+
+    @Override
+    public void killAll() throws FalconException {
+        NotificationServicesRegistry.unregister(executionService, getId());
+        StringBuffer errMsg = new StringBuffer();
+        // Only active instances are in memory. Kill them first.
+        for (ExecutionInstance instance : instances.asMap().values()) {
+            try {
+                kill(instance);
+            } catch (FalconException e) {
+                // Proceed with next
+                errMsg.append("Instance kill failed for : " + instance.getId() + " due to " + e.getMessage());
+                LOG.error("Instance kill failed for : " + instance.getId(), e);
+            }
+        }
+        for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster,
+                InstanceState.getActiveStates())) {
+            ExecutionInstance instance = instanceState.getInstance();
+            try {
+                kill(instance);
+            } catch (FalconException e) {
+                errMsg.append("Instance kill failed for : " + instance.getId() + " due to " + e.getMessage());
+                LOG.error("Instance kill failed for : " + instance.getId(), e);
+            }
+        }
+        // Some errors
+        if (errMsg.length() != 0) {
+            throw new FalconException("Some instances failed to kill : " + errMsg.toString());
+        }
+    }
+
+    @Override
+    public void suspend(ExecutionInstance instance) throws FalconException {
+        try {
+            instance.suspend();
+            stateService.handleStateChange(instance, InstanceState.EVENT.SUSPEND, this);
+        } catch (Exception e) {
+            LOG.error("Suspend failed for instance id : " + instance.getId(), e);
+            throw new FalconException("Suspend failed for instance : " + instance.getId(), e);
+        }
+
+    }
+
+    @Override
+    public void resume(ExecutionInstance instance) throws FalconException {
+
+        try {
+            instance.resume();
+            if (((ProcessExecutionInstance) instance).isScheduled()) {
+                stateService.handleStateChange(instance, InstanceState.EVENT.RESUME_RUNNING, this);
+                onSchedule(instance);
+            } else if (((ProcessExecutionInstance) instance).isReady()) {
+                stateService.handleStateChange(instance, InstanceState.EVENT.RESUME_READY, this);
+                onConditionsMet(instance);
+            } else {
+                stateService.handleStateChange(instance, InstanceState.EVENT.RESUME_WAITING, this);
+            }
+        } catch (Exception e) {
+            LOG.error("Resume failed for instance id : " + instance.getId(), e);
+            throw new FalconException("Resume failed for instance : " + instance.getId(), e);
+        }
+    }
+
+    @Override
+    public void kill(ExecutionInstance instance) throws FalconException {
+        try {
+            // Kill will de-register from notification services
+            instance.kill();
+            stateService.handleStateChange(instance, InstanceState.EVENT.KILL, this);
+        } catch (Exception e) {
+            LOG.error("Kill failed for instance id : " + instance.getId(), e);
+            throw new FalconException("Kill failed for instance : " + instance.getId(), e);
+        }
+    }
+
+    @Override
+    public Entity getEntity() {
+        return process;
+    }
+
+    private ProcessExecutionInstance buildInstance(Event event) throws FalconException {
+        // If a time triggered instance, use nominal time from event
+        if (event.getSource() == NotificationServicesRegistry.SERVICE.TIME) {
+            TimeElapsedEvent timeEvent = (TimeElapsedEvent) event;
+            LOG.debug("Creating a new process instance for nominal time {}.", timeEvent.getInstanceTime());
+            return new ProcessExecutionInstance(process, timeEvent.getInstanceTime(), cluster);
+        } else {
+            return new ProcessExecutionInstance(process, DateTime.now(), cluster);
+        }
+    }
+
+    @Override
+    public void onEvent(Event event) throws FalconException {
+        try {
+            // Handle event if applicable
+            if (shouldHandleEvent(event)) {
+                handleEvent(event);
+            } else {
+                // Else, pass it along to the execution instance
+                ProcessExecutionInstance instance = instances.get(event.getTarget());
+                if (instance != null) {
+                    instance.onEvent(event);
+                    if (instance.isReady()) {
+                        stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this);
+                    } else if (instance.hasTimedout()) {
+                        stateService.handleStateChange(instance, InstanceState.EVENT.TIME_OUT, this);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new FalconException("Unable to handle event with source : " + event.getSource() + " with target:"
+                    + event.getTarget(), e);
+        }
+    }
+
+    private void handleEvent(Event event) throws FalconException {
+        ProcessExecutionInstance instance;
+        try {
+            switch (event.getSource()) {
+            // TODO : Handle cases where scheduling fails.
+            case JOB_SCHEDULE:
+                instance = instances.get(event.getTarget());
+                instance.onEvent(event);
+                stateService.handleStateChange(instance, InstanceState.EVENT.SCHEDULE, this);
+                break;
+            case JOB_COMPLETION:
+                instance = instances.get(event.getTarget());
+                instance.onEvent(event);
+                switch (((JobCompletedEvent) event).getStatus()) {
+                case SUCCEEDED:
+                    stateService.handleStateChange(instance, InstanceState.EVENT.SUCCEED, this);
+                    break;
+                case FAILED:
+                    stateService.handleStateChange(instance, InstanceState.EVENT.FAIL, this);
+                    break;
+                case KILLED:
+                    stateService.handleStateChange(instance, InstanceState.EVENT.KILL, this);
+                    break;
+                case SUSPENDED:
+                    stateService.handleStateChange(instance, InstanceState.EVENT.SUSPEND, this);
+                    break;
+                default:
+                    throw new InvalidStateTransitionException(
+                            "Job seems to be have been managed outside Falcon.");
+                }
+                break;
+            default:
+                if (isTriggerEvent(event)) {
+                    instance = buildInstance(event);
+                    stateService.handleStateChange(instance, InstanceState.EVENT.TRIGGER, this);
+                    // This happens where are no conditions the instance is waiting on (for example, no data inputs).
+                    if (instance.isReady()) {
+                        stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this);
+                    }
+                }
+            }
+        } catch (Exception ee) {
+            throw new FalconException("Unable to cache execution instance", ee);
+        }
+    }
+
+    // Evaluates the trigger predicate against the current event, to determine if a new instance needs to be triggered.
+    private boolean isTriggerEvent(Event event) {
+        try {
+            return triggerPredicate.evaluate(Predicate.getPredicate(event));
+        } catch (FalconException e) {
+            return false;
+        }
+    }
+
+    // Registers for all notifications that should trigger an instance.
+    // Currently, only time based triggers are handled.
+    protected void registerForNotifications() throws FalconException {
+        AlarmService.AlarmRequestBuilder requestBuilder =
+                (AlarmService.AlarmRequestBuilder)
+                NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.TIME)
+                        .createRequestBuilder(executionService, getId());
+        Cluster processCluster = ProcessHelper.getCluster(process, cluster);
+
+        InstanceState instanceState = STATE_STORE.getLastExecutionInstance(process, cluster);
+        // If there are no instances, use process's start, else, use last materialized instance's nominal time
+        Date startTime = (instanceState == null) ? processCluster.getValidity().getStart()
+                : EntityUtil.getNextInstanceTime(instanceState.getInstance().getInstanceTime().toDate(),
+                    EntityUtil.getFrequency(process), EntityUtil.getTimeZone(process), 1);
+        Date endTime = processCluster.getValidity().getEnd();
+        // TODO : Handle cron based and calendar based time triggers
+        // TODO : Set execution order details.
+        requestBuilder.setFrequency(process.getFrequency())
+                .setStartTime(new DateTime(startTime))
+                .setEndTime(new DateTime(endTime))
+                .setTimeZone(TimeZone.getTimeZone("UTC"));
+        NotificationServicesRegistry.register(requestBuilder.build());
+        LOG.info("Registered for a time based notification for process {}  with frequency: {}, "
+                + "start time: {}, end time: {}", process.getName(), process.getFrequency(), startTime, endTime);
+        triggerPredicate = Predicate.createTimePredicate(startTime.getTime(), endTime.getTime(), -1);
+    }
+
+    @Override
+    public ID getId() {
+        return id;
+    }
+
+    // This executor must handle any events intended for itself.
+    // Or, if it is job run or job complete notifications, so it can handle the instance's state transition.
+    private boolean shouldHandleEvent(Event event) {
+        return event.getTarget().equals(id)
+                || event.getSource() == NotificationServicesRegistry.SERVICE.JOB_COMPLETION
+                || event.getSource() == NotificationServicesRegistry.SERVICE.JOB_SCHEDULE;
+    }
+
+    @Override
+    public void onTrigger(ExecutionInstance instance) throws FalconException {
+        instances.put(new ID(instance), (ProcessExecutionInstance) instance);
+    }
+
+    @Override
+    public void onConditionsMet(ExecutionInstance instance) throws FalconException {
+        // Put process in run queue and register for notification
+        SchedulerService.JobScheduleRequestBuilder requestBuilder = (SchedulerService.JobScheduleRequestBuilder)
+                NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE)
+                        .createRequestBuilder(executionService, getId());
+        requestBuilder.setInstance(instance);
+        NotificationServicesRegistry.register(requestBuilder.build());
+    }
+
+    @Override
+    public void onSchedule(ExecutionInstance instance) throws FalconException {
+        JobCompletionService.JobCompletionRequestBuilder completionRequestBuilder =
+                (JobCompletionService.JobCompletionRequestBuilder)
+                NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_COMPLETION)
+                        .createRequestBuilder(executionService, getId());
+        completionRequestBuilder.setExternalId(instance.getExternalID());
+        completionRequestBuilder.setCluster(instance.getCluster());
+        NotificationServicesRegistry.register(completionRequestBuilder.build());
+    }
+
+    @Override
+    public void onSuspend(ExecutionInstance instance) throws FalconException {
+        instances.invalidate(instance.getId());
+    }
+
+    @Override
+    public void onResume(ExecutionInstance instance) throws FalconException {
+        instances.put(instance.getId(), (ProcessExecutionInstance) instance);
+    }
+
+    @Override
+    public void onKill(ExecutionInstance instance) throws FalconException {
+        instances.invalidate(instance.getId());
+    }
+
+    @Override
+    public void onSuccess(ExecutionInstance instance) throws FalconException {
+        instance.destroy();
+        instances.invalidate(instance.getId());
+    }
+
+    @Override
+    public void onFailure(ExecutionInstance instance) throws FalconException {
+        instance.destroy();
+        instances.invalidate(instance.getId());
+    }
+
+    @Override
+    public void onTimeOut(ExecutionInstance instance) throws FalconException {
+        instance.destroy();
+        instances.invalidate(instance.getId());
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java b/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java
new file mode 100644
index 0000000..3e7fc9b
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.execution;
+
+import org.apache.falcon.entity.v0.Frequency;
+import org.joda.time.DateTime;
+
+/**
+ * Contains utility methods.
+ */
+public final class SchedulerUtil {
+
+    private static final long MINUTE_IN_MS = 60 * 1000L;
+    private static final long HOUR_IN_MS = 60 * MINUTE_IN_MS;
+
+    private SchedulerUtil(){};
+
+    /**
+     * Returns the frequency in millis from the given time.
+     * Needs to take the calender into account.
+     * @param referenceTime
+     * @param frequency
+     * @return
+     */
+    public static long getFrequencyInMillis(DateTime referenceTime, Frequency frequency) {
+        switch (frequency.getTimeUnit()) {
+        case minutes:
+            return MINUTE_IN_MS * frequency.getFrequencyAsInt();
+        case hours:
+            return HOUR_IN_MS * frequency.getFrequencyAsInt();
+        case days:
+            return referenceTime.plusDays(frequency.getFrequencyAsInt()).getMillis() - referenceTime.getMillis();
+        case months:
+            return referenceTime.plusMonths(frequency.getFrequencyAsInt()).getMillis() - referenceTime.getMillis();
+        default:
+            throw new IllegalArgumentException("Invalid time unit " + frequency.getTimeUnit().name());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/FalconNotificationService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/FalconNotificationService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/FalconNotificationService.java
new file mode 100644
index 0000000..41d20a8
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/FalconNotificationService.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service;
+
+import org.apache.falcon.exception.NotificationServiceException;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.request.NotificationRequest;
+import org.apache.falcon.service.FalconService;
+import org.apache.falcon.state.ID;
+
+/**
+ * An interface that every notification service must implement.
+ */
+public interface FalconNotificationService extends FalconService {
+
+    /**
+     * Register for a notification.
+     *
+     * @param notifRequest
+     */
+    void register(NotificationRequest notifRequest) throws NotificationServiceException;
+
+    /**
+     * De-register from receiving notifications.
+     * @param handler - The notification handler that needs to be de-registered.
+     * @param callbackID
+     */
+    void unregister(NotificationHandler handler, ID callbackID) throws NotificationServiceException;
+
+    /**
+     * Creates and returns an implementation of
+     * {@link RequestBuilder} that is applicable to the service.
+     * @param handler - The notification handler that needs to be de-registered.
+     * @param callbackID
+     * @return
+     */
+    RequestBuilder createRequestBuilder(NotificationHandler handler, ID callbackID);
+
+    /**
+     * Builder to build appropriate {@link NotificationRequest}.
+     * @param <T>
+     */
+    abstract class RequestBuilder<T extends NotificationRequest> {
+
+        protected NotificationHandler handler;
+        protected ID callbackId;
+
+        public RequestBuilder(NotificationHandler notificationHandler, ID callbackID) {
+            if (notificationHandler == null) {
+                throw new IllegalArgumentException("Handler cannot be null.");
+            }
+            this.handler = notificationHandler;
+            this.callbackId = callbackID;
+        }
+
+        /**
+         * @return Corresponding {@link NotificationRequest}.
+         */
+        public abstract T build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/NotificationServicesRegistry.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/NotificationServicesRegistry.java b/scheduler/src/main/java/org/apache/falcon/notification/service/NotificationServicesRegistry.java
new file mode 100644
index 0000000..3ffb489
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/NotificationServicesRegistry.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service;
+
+import org.apache.falcon.exception.NotificationServiceException;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.request.NotificationRequest;
+import org.apache.falcon.service.Services;
+import org.apache.falcon.state.ID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A service registry that manages the notification services.
+ * This class is also responsible for routing any register and unregister calls to the appropriate service.
+ */
+public final class NotificationServicesRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(NotificationServicesRegistry.class);
+
+    /**
+     * A list of notifiation service that the scheduler framework uses.
+     */
+    public enum SERVICE {
+        TIME("AlarmService"),
+        DATA("DataAvailabilityService"),
+        JOB_COMPLETION("JobCompletionService"),
+        JOB_SCHEDULE("JobSchedulerService");
+
+        private final String name;
+
+        private SERVICE(String name) {
+            this.name = name;
+        }
+
+        public String toString() {
+            return name;
+        }
+    }
+
+    private NotificationServicesRegistry() {
+    }
+
+    /**
+     * Routes the notification request to appropriate service based on the request.
+     *
+     * @param notifRequest
+     */
+    public static void register(NotificationRequest notifRequest) throws NotificationServiceException {
+        FalconNotificationService service = getService(notifRequest.getService());
+        service.register(notifRequest);
+    }
+
+    /**
+     * De-registers the listener from all services.
+     *
+     * @param listenerID
+     */
+    public static void unregister(NotificationHandler handler, ID listenerID)
+        throws NotificationServiceException {
+        for (SERVICE service : SERVICE.values()) {
+            unregisterForNotification(handler, listenerID, service);
+        }
+    }
+
+    /**
+     * @param serviceType - Type of service requested
+     * @return An instance of {@link org.apache.falcon.notification.service.FalconNotificationService}
+     */
+    public static FalconNotificationService getService(SERVICE serviceType) {
+        FalconNotificationService service = Services.get().getService(serviceType.toString());
+        if (service == null) {
+            LOG.error("Unable to find service type : {} . Service not registered.", serviceType.toString());
+            throw new RuntimeException("Unable to find service : " + serviceType.toString()
+                    + " . Service not registered.");
+        }
+        return service;
+    }
+
+    /**
+     * @param serviceName - Name of service requested
+     * @return - An instance of {@link org.apache.falcon.notification.service.FalconNotificationService}
+     * @throws NotificationServiceException
+     */
+    public static FalconNotificationService getService(String serviceName) throws NotificationServiceException {
+        SERVICE serviceType = null;
+        for (SERVICE type : SERVICE.values()) {
+            if (type.toString().equals(serviceName)) {
+                serviceType = type;
+            }
+        }
+        if (serviceType == null) {
+            LOG.error("Unable to find service : {}. Not a valid service.", serviceName);
+            throw new NotificationServiceException("Unable to find service : " + serviceName
+                    + " . Not a valid service.");
+        }
+        return getService(serviceType);
+    }
+
+    /**
+     * Routes the unregister request to the mentioned service.
+     * @param handler
+     * @param listenerID
+     * @param service
+     */
+    public static void unregisterForNotification(NotificationHandler handler, ID listenerID, SERVICE service)
+        throws NotificationServiceException {
+        FalconNotificationService falconNotificationService = getService(service);
+        falconNotificationService.unregister(handler, listenerID);
+    }
+}


Mime
View raw message