falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject [3/4] falcon git commit: FALCON-1234 State Store for instances scheduled by Falcon (Pavan Kolamuri)
Date Thu, 26 Nov 2015 10:28:27 GMT
FALCON-1234 State Store for instances scheduled by Falcon (Pavan Kolamuri)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/6d313855
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/6d313855
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/6d313855

Branch: refs/heads/master
Commit: 6d3138559e8395ac1140f10197fcd10badf64883
Parents: 6676032
Author: Pallavi Rao <pallavi.rao@inmobi.com>
Authored: Thu Nov 26 15:56:14 2015 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Thu Nov 26 15:56:14 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../src/main/resources/falcon/checkstyle.xml    |   6 +-
 .../main/resources/falcon/findbugs-exclude.xml  |  15 +
 common/src/main/resources/startup.properties    |  29 ++
 pom.xml                                         |   4 +
 scheduler/pom.xml                               |  84 ++++
 .../falcon/execution/ExecutionInstance.java     |  41 +-
 .../execution/FalconExecutionService.java       |  24 +-
 .../execution/ProcessExecutionInstance.java     |  60 ++-
 .../org/apache/falcon/predicate/Predicate.java  |  50 ++-
 .../org/apache/falcon/state/EntityState.java    |  30 ++
 .../org/apache/falcon/state/InstanceID.java     |  19 +
 .../org/apache/falcon/state/InstanceState.java  |  32 ++
 .../org/apache/falcon/state/StateService.java   |   1 +
 .../falcon/state/store/AbstractStateStore.java  |   2 +-
 .../falcon/state/store/EntityStateStore.java    |  14 +-
 .../falcon/state/store/InMemoryStateStore.java  |  22 +-
 .../falcon/state/store/InstanceStateStore.java  |  18 +-
 .../apache/falcon/state/store/StateStore.java   |   6 +-
 .../falcon/state/store/jdbc/BeanMapperUtil.java | 271 ++++++++++++
 .../falcon/state/store/jdbc/EntityBean.java     | 104 +++++
 .../falcon/state/store/jdbc/InstanceBean.java   | 199 +++++++++
 .../falcon/state/store/jdbc/JDBCStateStore.java | 416 ++++++++++++++++++
 .../state/store/service/FalconJPAService.java   | 171 ++++++++
 .../falcon/tools/FalconStateStoreDBCLI.java     | 435 +++++++++++++++++++
 .../src/main/resources/META-INF/persistence.xml |  50 +++
 .../main/resources/falcon-buildinfo.properties  |  28 ++
 .../execution/FalconExecutionServiceTest.java   |  53 ++-
 .../service/SchedulerServiceTest.java           |  13 +-
 .../falcon/state/AbstractSchedulerTestBase.java |  71 +++
 .../falcon/state/EntityStateServiceTest.java    |  39 +-
 .../falcon/state/InstanceStateServiceTest.java  |  24 +-
 .../state/service/TestFalconJPAService.java     |  64 +++
 .../state/service/store/TestJDBCStateStore.java | 397 +++++++++++++++++
 .../falcon/tools/TestFalconStateStoreDBCLI.java |  89 ++++
 scheduler/src/test/resources/startup.properties | 154 +++++++
 src/bin/falcon-db.sh                            |  49 +++
 src/conf/startup.properties                     |  22 +-
 src/main/assemblies/distributed-package.xml     |   5 +
 src/main/assemblies/standalone-package.xml      |   5 +
 unit/src/main/resources/startup.properties      |  18 +
 41 files changed, 3071 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 13d3439..31a2566 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-1234 State Store for instances scheduled by Falcon (Pavan Kolamuri via Pallavi Rao)
+
     FALCON-1588 Add ability to provide the path for recipe files in command line(Peeyush Bishnoi via Ajay Yadava)
 
     FALCON-1573 Supply user-defined properties to Oozie workflows during schedule(Daniel Del Castillo via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/checkstyle/src/main/resources/falcon/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/src/main/resources/falcon/checkstyle.xml b/checkstyle/src/main/resources/falcon/checkstyle.xml
index 2130e73..292a0a3 100644
--- a/checkstyle/src/main/resources/falcon/checkstyle.xml
+++ b/checkstyle/src/main/resources/falcon/checkstyle.xml
@@ -230,9 +230,9 @@
 
     <!-- allow warnings to be suppressed -->
     <module name="SuppressionCommentFilter">
-        <property name="offCommentFormat" value="SUSPEND CHECKSTYLE CHECK ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName"/>
-        <property name="onCommentFormat" value="RESUME CHECKSTYLE CHECK ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName"/>
-        <property name="checkFormat" value="ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName"/>
+        <property name="offCommentFormat" value="SUSPEND CHECKSTYLE CHECK ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName|LineLengthCheck"/>
+        <property name="onCommentFormat" value="RESUME CHECKSTYLE CHECK ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName|LineLengthCheck"/>
+        <property name="checkFormat" value="ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName|LineLengthCheck"/>
     </module>
 
 </module>

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/checkstyle/src/main/resources/falcon/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/checkstyle/src/main/resources/falcon/findbugs-exclude.xml b/checkstyle/src/main/resources/falcon/findbugs-exclude.xml
index 0a7580d..e1a5a2e 100644
--- a/checkstyle/src/main/resources/falcon/findbugs-exclude.xml
+++ b/checkstyle/src/main/resources/falcon/findbugs-exclude.xml
@@ -31,4 +31,19 @@
     <Match>
         <Bug pattern="DM_DEFAULT_ENCODING" />
     </Match>
+
+    <Match>
+        <Class name="org.apache.falcon.tools.FalconStateStoreDBCLI" />
+        <Bug pattern="SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE" />
+    </Match>
+
+    <Match>
+        <Class name="org.apache.falcon.state.store.jdbc.EntityBean" />
+        <Bug pattern="NP_BOOLEAN_RETURN_NULL" />
+    </Match>
+
+    <Match>
+        <Class name="org.apache.falcon.state.store.jdbc.InstanceBean" />
+        <Bug pattern="NP_BOOLEAN_RETURN_NULL" />
+    </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index cc5212a..3f1ed03 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -43,6 +43,14 @@
                         org.apache.falcon.service.LogCleanupService,\
                         org.apache.falcon.service.GroupsService,\
                         org.apache.falcon.service.ProxyUserService
+## If you wish to use Falcon native scheduler add the commented out services below to application.services ##
+#                        org.apache.falcon.notification.service.impl.JobCompletionService,\
+#                        org.apache.falcon.notification.service.impl.SchedulerService,\
+#                        org.apache.falcon.notification.service.impl.AlarmService,\
+#                        org.apache.falcon.notification.service.impl.DataAvailabilityService,\
+#                        org.apache.falcon.execution.FalconExecutionService,\
+#                        org.apache.falcon.state.store.service.FalconJPAService
+
 
 # List of Lifecycle policies configured.
 *.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete
@@ -55,6 +63,8 @@
                         org.apache.falcon.entity.store.FeedLocationStore,\
                         org.apache.falcon.service.FeedSLAMonitoringService,\
                         org.apache.falcon.service.SharedLibraryHostingService
+## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ##
+#                       org.apache.falcon.state.store.jdbc.JdbcStateStore
 
 ##### JMS MQ Broker Implementation class #####
 *.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory
@@ -247,3 +257,22 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
 # Setting monitoring plugin, if SMTP parameters is defined
 #*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\
 #                     org.apache.falcon.plugin.EmailNotificationPlugin
+
+######### StateStore Properties #####
+#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
+#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
+#*.falcon.statestore.jdbc.url=jdbc:derby:data/statestore.db;create=true
+#*.falcon.statestore.jdbc.username=sa
+#*.falcon.statestore.jdbc.password=
+#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
+## Maximum number of active connections that can be allocated from this pool at the same time.
+#*.falcon.statestore.pool.max.active.conn=10
+#*.falcon.statestore.connection.properties=
+## Indicates the interval (in milliseconds) between eviction runs.
+#*.falcon.statestore.validate.db.connection.eviction.interval=300000
+## The number of objects to examine during each run of the idle object evictor thread.
+#*.falcon.statestore.validate.db.connection.eviction.num=10
+## Creates Falcon DB.
+## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
+## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
+#*.falcon.statestore.create.db.schema=true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fad8902..678c87c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,6 +113,10 @@
         <quartz.version>2.2.1</quartz.version>
         <joda.version>2.8.2</joda.version>
         <mockito.version>1.9.5</mockito.version>
+        <openjpa.version>2.4.0</openjpa.version>
+        <javax-validation.version>1.0.0.GA</javax-validation.version>
+        <derby.version>10.10.1.1</derby.version>
+        <commons-dbcp.version>1.4</commons-dbcp.version>
         <internal.maven.repo>file:///tmp/falcontemprepo</internal.maven.repo>
         <excluded.test.groups>exhaustive</excluded.test.groups>
     </properties>

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/pom.xml
----------------------------------------------------------------------
diff --git a/scheduler/pom.xml b/scheduler/pom.xml
index 20a91d2..336997d 100644
--- a/scheduler/pom.xml
+++ b/scheduler/pom.xml
@@ -88,6 +88,33 @@
 	</dependency>
 
         <dependency>
+            <groupId>org.apache.openjpa</groupId>
+            <artifactId>openjpa-persistence</artifactId>
+            <version>${openjpa.version}</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.openjpa</groupId>
+            <artifactId>openjpa-jdbc</artifactId>
+            <version>${openjpa.version}</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.openjpa</groupId>
+            <artifactId>openjpa-persistence-jdbc</artifactId>
+            <version>${openjpa.version}</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>javax.validation</groupId>
+            <artifactId>validation-api</artifactId>
+            <version>${javax-validation.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
         </dependency>
@@ -98,11 +125,18 @@
             <version>${mockito.version}</version>
             <scope>test</scope>
         </dependency>
+
         <dependency>
             <groupId>joda-time</groupId>
             <artifactId>joda-time</artifactId>
             <version>${joda.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.derby</groupId>
+            <artifactId>derby</artifactId>
+            <version>10.10.1.1</version>
+        </dependency>
     </dependencies>
 
     <build>
@@ -115,6 +149,56 @@
                     <target>1.7</target>
                 </configuration>
             </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <version>1.8</version>
+                <executions>
+                    <execution>
+                        <phase>process-classes</phase>
+                        <configuration>
+                            <tasks>
+                                <taskdef name="openjpac" classname="org.apache.openjpa.ant.PCEnhancerTask" classpathref="maven.compile.classpath"/>
+                                <openjpac>
+                                    <classpath refid="maven.compile.classpath"/>
+                                </openjpac>
+                            </tasks>
+                        </configuration>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-dependency-plugin</artifactId>
+            <version>2.8</version>
+            <executions>
+                <execution>
+                    <id>copy-dependencies</id>
+                    <goals>
+                        <goal>copy</goal>
+                    </goals>
+                    <configuration>
+                        <artifactItems>
+                            <artifactItem>
+                                <groupId>org.apache.derby</groupId>
+                                <artifactId>derby</artifactId>
+                            </artifactItem>
+                            <artifactItem>
+                                <groupId>commons-dbcp</groupId>
+                                <artifactId>commons-dbcp</artifactId>
+                                <version>${commons-dbcp.version}</version>
+                            </artifactItem>
+                        </artifactItems>
+                        <outputDirectory>${project.build.directory}/dependency</outputDirectory>
+                    </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
         </plugins>
     </build>
 </project>

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
index 2d6b67d..5f96d3f 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
@@ -26,7 +26,6 @@ import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
 import java.util.List;
-import java.util.TimeZone;
 
 /**
  * Represents an execution instance of an entity.
@@ -38,20 +37,31 @@ public abstract class ExecutionInstance implements NotificationHandler {
     // External ID is the ID used to identify the Job submitted to the DAG Engine, as returned by the DAG Engine.
     // For example, for Oozie this would be the workflow Id.
     private String externalID;
+    // Time at which instance has to be run.
     private final DateTime instanceTime;
+    // Time at which instance is created.
     private final DateTime creationTime;
     private DateTime actualStart;
     private DateTime actualEnd;
-    private static final DateTimeZone UTC = DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC"));
+    protected static final DateTimeZone UTC = DateTimeZone.UTC;
 
     /**
-     * @param instanceTime
+     * @param instanceTime Time at which instance has to be run.
      * @param cluster
+     * @param creationTime Time at which instance is created to run.
      */
-    public ExecutionInstance(DateTime instanceTime, String cluster) {
+    public ExecutionInstance(DateTime instanceTime, String cluster, DateTime creationTime) {
         this.instanceTime = new DateTime(instanceTime, UTC);
         this.cluster = cluster;
-        this.creationTime = DateTime.now(UTC);
+        this.creationTime = new DateTime(creationTime, UTC);
+    }
+
+    /**
+     * @param instanceTime
+     * @param cluster
+     */
+    public ExecutionInstance(DateTime instanceTime, String cluster) {
+        this(instanceTime, cluster, DateTime.now());
     }
 
     /**
@@ -92,7 +102,7 @@ public abstract class ExecutionInstance implements NotificationHandler {
     public abstract Entity getEntity();
 
     /**
-     * @return - The nominal time of the instance.
+     * @return - The instance time of the instance.
      */
     public DateTime getInstanceTime() {
         return instanceTime;
@@ -138,16 +148,31 @@ public abstract class ExecutionInstance implements NotificationHandler {
         this.actualEnd = actualEnd;
     }
 
-
+    /**
+     * Creation time of an instance.
+     * @return
+     */
     public DateTime getCreationTime() {
         return creationTime;
     }
 
     /**
+     * Set the gating conditions on which this instance is waiting before it is scheduled for execution.
+     * @param predicates
+     */
+    public abstract void setAwaitingPredicates(List<Predicate> predicates);
+
+    /**
      * @return - The gating conditions on which this instance is waiting before it is scheduled for execution.
      * @throws FalconException
      */
-    public abstract List<Predicate> getAwaitingPredicates() throws FalconException;
+    public abstract List<Predicate> getAwaitingPredicates();
+
+    /**
+     * set the sequential numerical id of the instance.
+     */
+    public abstract void setInstanceSequence(int sequence);
+
 
     /**
      * Suspends the instance if it is in one of the active states, waiting, ready or running.

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
index b48a65b..b6741a4 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
@@ -22,6 +22,7 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.exception.StateStoreException;
 import org.apache.falcon.notification.service.event.Event;
 import org.apache.falcon.service.FalconService;
 import org.apache.falcon.state.EntityClusterID;
@@ -58,17 +59,22 @@ public final class FalconExecutionService implements FalconService, EntityStateC
     public void init() {
         LOG.debug("State store instance being used : {}", AbstractStateStore.get());
         // Initialize all executors from store
-        for (Entity entity : AbstractStateStore.get().getEntities(EntityState.STATE.SCHEDULED)) {
-            try {
-                for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
-                    EntityExecutor executor = createEntityExecutor(entity, cluster);
-                    executors.put(new EntityClusterID(entity, cluster), executor);
-                    executor.schedule();
+        try {
+            for (Entity entity : AbstractStateStore.get().getEntities(EntityState.STATE.SCHEDULED)) {
+                try {
+                    for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+                        EntityExecutor executor = createEntityExecutor(entity, cluster);
+                        executors.put(new EntityClusterID(entity, cluster), executor);
+                        executor.schedule();
+                    }
+                } catch (FalconException e) {
+                    LOG.error("Unable to load entity : " + entity.getName(), e);
+                    throw new RuntimeException(e);
                 }
-            } catch (FalconException e) {
-                LOG.error("Unable to load entity : " + entity.getName(), e);
-                throw new RuntimeException(e);
             }
+        } catch (StateStoreException e) {
+            LOG.error("Unable to get Entities from State Store ", e);
+            throw new RuntimeException(e);
         }
         // TODO : During migration, the state store itself may not have been completely bootstrapped.
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
index 434f168..cff4a73 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
@@ -49,6 +49,7 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
+
 /**
  * Represents an execution instance of a process.
  * Responsible for user actions such as suspend, resume, kill on individual instances.
@@ -57,7 +58,7 @@ import java.util.List;
 public class ProcessExecutionInstance extends ExecutionInstance {
     private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutionInstance.class);
     private final Process process;
-    private List<Predicate> awaitedPredicates = new ArrayList<Predicate>();
+    private List<Predicate> awaitedPredicates = new ArrayList<>();
     private DAGEngine dagEngine = null;
     private boolean hasTimedOut = false;
     private InstanceID id;
@@ -72,8 +73,9 @@ public class ProcessExecutionInstance extends ExecutionInstance {
      * @param cluster
      * @throws FalconException
      */
-    public ProcessExecutionInstance(Process process, DateTime instanceTime, String cluster) throws FalconException {
-        super(instanceTime, cluster);
+    public ProcessExecutionInstance(Process process, DateTime instanceTime, String cluster,
+                                    DateTime creationTime) throws FalconException {
+        super(instanceTime, cluster, creationTime);
         this.process = process;
         this.id = new InstanceID(process, cluster, getInstanceTime());
         computeInstanceSequence();
@@ -81,7 +83,18 @@ public class ProcessExecutionInstance extends ExecutionInstance {
         registerForNotifications(false);
     }
 
-    // Computes the instance number based on the nominal time.
+    /**
+     *
+     * @param process
+     * @param instanceTime
+     * @param cluster
+     * @throws FalconException
+     */
+    public ProcessExecutionInstance(Process process, DateTime instanceTime, String cluster) throws FalconException {
+        this(process, instanceTime, cluster, DateTime.now(UTC));
+    }
+
+    // Computes the instance number based on the instance Time.
     // Method can be extended to assign instance numbers for non-time based instances.
     private void computeInstanceSequence() {
         for (Cluster processCluster : process.getClusters().getClusters()) {
@@ -225,11 +238,21 @@ public class ProcessExecutionInstance extends ExecutionInstance {
     }
 
     @Override
-    public List<Predicate> getAwaitingPredicates() throws FalconException {
+    public void setAwaitingPredicates(List<Predicate> predicates) {
+        this.awaitedPredicates = predicates;
+    }
+
+    @Override
+    public List<Predicate> getAwaitingPredicates() {
         return awaitedPredicates;
     }
 
     @Override
+    public void setInstanceSequence(int sequence) {
+        this.instanceSequence = sequence;
+    }
+
+    @Override
     public void suspend() throws FalconException {
         if (getExternalID() != null) {
             dagEngine.suspend(this);
@@ -242,7 +265,7 @@ public class ProcessExecutionInstance extends ExecutionInstance {
         // Was already scheduled on the DAGEngine, so resume on DAGEngine if suspended
         if (getExternalID() != null) {
             dagEngine.resume(this);
-        } else if (awaitedPredicates.size() != 0) {
+        } else if (awaitedPredicates != null && !awaitedPredicates.isEmpty()) {
             // Evaluate any remaining predicates
             registerForNotifications(true);
         }
@@ -271,6 +294,31 @@ public class ProcessExecutionInstance extends ExecutionInstance {
     }
 
     @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || !o.getClass().equals(this.getClass())) {
+            return false;
+        }
+
+        ProcessExecutionInstance processExecutionInstance = (ProcessExecutionInstance) o;
+
+        return  this.getId().equals(processExecutionInstance.getId())
+                && Predicate.isEqualAwaitingPredicates(this.getAwaitingPredicates(),
+                    processExecutionInstance.getAwaitingPredicates())
+                && this.getInstanceSequence() == (processExecutionInstance.getInstanceSequence());
+    }
+
+    @Override
+    public int hashCode() {
+        int result = id != null ? id.hashCode() : 0;
+        result = 31 * result + (awaitedPredicates != null ? awaitedPredicates.hashCode() : 0);
+        result = 31 * result + instanceSequence;
+        return result;
+    }
+
+    @Override
     public void destroy() throws FalconException {
         NotificationServicesRegistry.unregister(executionService, getId());
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
index fb4c8c9..164fb0e 100644
--- a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
+++ b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
@@ -27,14 +27,19 @@ import org.apache.falcon.notification.service.event.TimeElapsedEvent;
 import org.apache.falcon.state.ID;
 
 import java.io.Serializable;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 
 /**
  * Represents the gating condition for which an instance is waiting before it is scheduled.
  * This will be serialized and stored in state store.
  */
 public class Predicate implements Serializable {
+
     /**
      * Type of predicate, currently data and time are supported.
      */
@@ -47,7 +52,10 @@ public class Predicate implements Serializable {
     private final TYPE type;
 
     // A key-value pair of clauses that need make this predicate.
-    private Map<String, Comparable> clauses = new HashMap<String, Comparable>();
+    private Map<String, Comparable> clauses = new TreeMap<>();
+
+    // Id for a predicate used for comparison.
+    private String id;
 
     // A generic "any" object that can be used when a particular key is allowed to have any value.
     public static final Comparable<? extends Serializable> ANY = new Any();
@@ -59,6 +67,10 @@ public class Predicate implements Serializable {
         return type;
     }
 
+    public String getId() {
+        return id;
+    }
+
     /**
      * @param key
      * @return the value corresponding to the key
@@ -106,6 +118,7 @@ public class Predicate implements Serializable {
      */
     public Predicate(TYPE type) {
         this.type = type;
+        this.id = this.type + String.valueOf(System.currentTimeMillis());
     }
 
     /**
@@ -120,7 +133,7 @@ public class Predicate implements Serializable {
      * @param rhs - The value in the key-value pair of a clause
      * @return This instance
      */
-    public Predicate addClause(String lhs, Comparable<? extends Serializable> rhs) {
+    Predicate addClause(String lhs, Comparable<? extends Serializable> rhs) {
         clauses.put(lhs, rhs);
         return this;
     }
@@ -217,4 +230,35 @@ public class Predicate implements Serializable {
             return super.hashCode();
         }
     }
+
+    public static boolean isEqualAwaitingPredicates(List<Predicate> thisAwaitingPredicates,
+                                              List<Predicate> otherAwaitingPredicates) {
+        if (thisAwaitingPredicates == null && otherAwaitingPredicates == null) {
+            return true;
+        } else if (thisAwaitingPredicates != null && otherAwaitingPredicates != null) {
+            if (thisAwaitingPredicates.size() != otherAwaitingPredicates.size()) {
+                return false;
+            }
+            Collections.sort(thisAwaitingPredicates, new PredicateComparator());
+            Collections.sort(otherAwaitingPredicates, new PredicateComparator());
+
+            Iterator<Predicate> thisIterator = thisAwaitingPredicates.iterator();
+            Iterator<Predicate> otherIterator = otherAwaitingPredicates.iterator();
+
+            while (thisIterator.hasNext()) {
+                if (!thisIterator.next().evaluate(otherIterator.next())) {
+                    return false;
+                }
+            }
+            return true;
+        }
+        return false;
+    }
+
+    static class PredicateComparator implements Serializable, Comparator<Predicate> {
+        @Override
+        public int compare(Predicate o1, Predicate o2) {
+            return o1.getId().compareTo(o2.getId());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
index 15aea9a..f44f174 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
@@ -130,4 +130,34 @@ public class EntityState implements StateMachine<EntityState.STATE, EntityState.
     public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
         return currentState.nextTransition(event);
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        EntityState other = (EntityState) o;
+
+        if (this.getCurrentState() != null ? !this.getCurrentState().equals(other.getCurrentState())
+                : other.getCurrentState() != null) {
+            return false;
+        }
+
+        if (this.getEntity() != null ? !this.getEntity().equals(other.getEntity())
+                : other.getEntity() != null) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = currentState != null ? currentState.hashCode() : 0;
+        result = 31 * result + (entity != null ? entity.hashCode() : 0);
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java
index a722be9..72cfc33 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java
@@ -80,4 +80,23 @@ public class InstanceID extends ID {
     public EntityClusterID getEntityClusterID() {
         return new EntityClusterID(entityType, entityName, clusterName);
     }
+
+    public static EntityType getEntityType(String id) {
+        if (id == null) {
+            return null;
+        }
+        String[] values = id.split(KEY_SEPARATOR);
+        String entityType = values[0];
+        return EntityType.valueOf(entityType);
+    }
+
+    public static String getEntityName(String id) {
+        if (id == null) {
+            return null;
+        }
+        String[] values = id.split(KEY_SEPARATOR);
+        String entityName = values[1];
+        return entityName;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
index ada9d2b..7f2bda9 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
@@ -252,4 +252,36 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance
     public String toString() {
         return instance.getId().toString() + "STATE: " + currentState.toString();
     }
+
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        InstanceState other = (InstanceState) o;
+
+        if (this.getCurrentState() != null ? !this.getCurrentState().equals(other.getCurrentState())
+                : other.getCurrentState() != null) {
+            return false;
+        }
+
+        if (this.getInstance() != null ? !this.getInstance().equals(other.getInstance())
+                : other.getInstance() != null) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = currentState != null ? currentState.hashCode() : 0;
+        result = 31 * result + (instance != null ? instance.hashCode() : 0);
+        return result;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/StateService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/StateService.java b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
index c1671ac..c702cc3 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/StateService.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
@@ -136,6 +136,7 @@ public final class StateService {
             InstanceState instanceState = stateStore.getExecutionInstance(id);
             InstanceState.STATE newState = instanceState.nextTransition(event);
             callbackHandler(instance, event, handler);
+            instanceState = new InstanceState(instance);
             instanceState.setCurrentState(newState);
             stateStore.updateExecutionInstance(instanceState);
             LOG.debug("State of instance: {} changed to: {} as a result of event: {}.", id,

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
index e36f85c..2d576e5 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
@@ -79,7 +79,7 @@ public abstract class AbstractStateStore implements StateStore, ConfigurationCha
      */
     public static synchronized StateStore get() {
         if (stateStore == null) {
-            String storeImpl = StartupProperties.get().getProperty("state.store.impl",
+            String storeImpl = StartupProperties.get().getProperty("falcon.state.store.impl",
                     "org.apache.falcon.state.store.InMemoryStateStore");
             try {
                 stateStore = ReflectionUtils.getInstanceByClassName(storeImpl);

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
index 113f4c5..75a315f 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
@@ -45,18 +45,18 @@ public interface EntityStateStore {
      * @param entityId
      * @return true, if entity exists in store.
      */
-    boolean entityExists(EntityID entityId);
+    boolean entityExists(EntityID entityId) throws StateStoreException;;
 
     /**
      * @param state
      * @return Entities in a given state.
      */
-    Collection<Entity> getEntities(EntityState.STATE state);
+    Collection<Entity> getEntities(EntityState.STATE state) throws StateStoreException;
 
     /**
      * @return All Entities in the store.
      */
-    Collection<EntityState> getAllEntities();
+    Collection<EntityState> getAllEntities() throws StateStoreException;
 
     /**
      * Update an existing entity with the new values.
@@ -73,4 +73,12 @@ public interface EntityStateStore {
      * @throws StateStoreException
      */
     void deleteEntity(EntityID entityId) throws StateStoreException;
+
+
+    /**
+     * Removes all entities and its instances from the store.
+     *
+     * @throws StateStoreException
+     */
+    void deleteEntities() throws StateStoreException;
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
index 52b3bb8..7ab996a 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
@@ -50,8 +50,7 @@ public final class InMemoryStateStore extends AbstractStateStore {
 
     private static final StateStore STORE = new InMemoryStateStore();
 
-    private InMemoryStateStore() {
-    }
+    private InMemoryStateStore() {}
 
     public static StateStore get() {
         return STORE;
@@ -114,6 +113,11 @@ public final class InMemoryStateStore extends AbstractStateStore {
     }
 
     @Override
+    public void deleteEntities() throws StateStoreException {
+        entityStates.clear();
+    }
+
+    @Override
     public void putExecutionInstance(InstanceState instanceState) throws StateStoreException {
         String key = new InstanceID(instanceState.getInstance()).getKey();
         if (instanceStates.containsKey(key)) {
@@ -223,6 +227,20 @@ public final class InMemoryStateStore extends AbstractStateStore {
         }
     }
 
+    @Override
+    public void deleteExecutionInstances() {
+        instanceStates.clear();
+    }
+
+    @Override
+    public void deleteExecutionInstance(InstanceID instanceID) throws StateStoreException {
+        if (!instanceStates.containsKey(instanceID.toString())) {
+            throw new StateStoreException("Instance with key, " + instanceID.toString() + " does not exist.");
+        }
+        instanceStates.remove(instanceID.toString());
+    }
+
+    @Override
     public void clear() {
         entityStates.clear();
         instanceStates.clear();

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
index 483d9e6..f1d1931 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
@@ -104,12 +104,26 @@ public interface InstanceStateStore {
      * @param instanceId
      * @return true, if instance exists.
      */
-    boolean executionInstanceExists(InstanceID instanceId);
+    boolean executionInstanceExists(InstanceID instanceId) throws StateStoreException;
 
     /**
      * Delete instances of a given entity.
      *
      * @param entityId
      */
-    void deleteExecutionInstances(EntityID entityId);
+    void deleteExecutionInstances(EntityID entityId) throws StateStoreException;
+
+
+    /**
+     * Delete an instance based on ID.
+     *
+     * @param instanceID
+     * @throws StateStoreException
+     */
+    void deleteExecutionInstance(InstanceID instanceID) throws StateStoreException;
+
+    /**
+     * Delete all instances.
+     */
+    void deleteExecutionInstances();
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java
index f595c26..592e1fb 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java
@@ -17,11 +17,15 @@
  */
 package org.apache.falcon.state.store;
 
+import org.apache.falcon.exception.StateStoreException;
 import org.apache.falcon.service.ConfigurationChangeListener;
 
 /**
  * Interface that combines entity, instance store APIs and also config change listener's.
  */
 public interface StateStore extends ConfigurationChangeListener, EntityStateStore, InstanceStateStore {
-
+    /**
+     * Deletes all entities and instances.
+     */
+    void clear() throws StateStoreException;
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
new file mode 100644
index 0000000..4bee269
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store.jdbc;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.exception.StateStoreException;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.execution.ProcessExecutionInstance;
+import org.apache.falcon.predicate.Predicate;
+import org.apache.falcon.state.EntityID;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.InstanceID;
+import org.apache.falcon.state.InstanceState;
+import org.joda.time.DateTime;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Mapping util for Persistent Store.
+ */
+public final class BeanMapperUtil {
+    private BeanMapperUtil() {
+    }
+
+    /**
+     * Converts Entity object to EntityBean which will be stored in DB.
+     * @param entityState
+     * @return
+     */
+    public static EntityBean convertToEntityBean(EntityState entityState) {
+        EntityBean entityBean = new EntityBean();
+        Entity entity = entityState.getEntity();
+        String id = new EntityID(entity).getKey();
+        entityBean.setId(id);
+        entityBean.setName(entity.getName());
+        entityBean.setState(entityState.getCurrentState().toString());
+        entityBean.setType(entity.getEntityType().toString());
+        return entityBean;
+    }
+
+    /**
+     * Converts EntityBean of Data Base to EntityState.
+     * @param entityBean
+     * @return
+     * @throws StateStoreException
+     */
+    public static EntityState convertToEntityState(EntityBean entityBean) throws StateStoreException {
+        try {
+            Entity entity = EntityUtil.getEntity(entityBean.getType(), entityBean.getName());
+            EntityState entityState = new EntityState(entity);
+            entityState.setCurrentState(EntityState.STATE.valueOf(entityBean.getState()));
+            return entityState;
+        } catch (FalconException e) {
+            throw new StateStoreException(e);
+        }
+    }
+
+    /**
+     * Converts list of EntityBeans of Data Base to EntityStates.
+     * @param entityBeans
+     * @return
+     * @throws StateStoreException
+     */
+    public static Collection<EntityState> convertToEntityState(Collection<EntityBean> entityBeans)
+        throws StateStoreException {
+        List<EntityState> entityStates = new ArrayList<>();
+        if (entityBeans != null && !entityBeans.isEmpty()) {
+            for (EntityBean entityBean : entityBeans) {
+                entityStates.add(convertToEntityState(entityBean));
+            }
+        }
+        return entityStates;
+    }
+
+    /**
+     * Converts list of EntityBeans of Data Base to Entities.
+     * @param entityBeans
+     * @return
+     * @throws StateStoreException
+     */
+    public static Collection<Entity> convertToEntities(Collection<EntityBean> entityBeans) throws StateStoreException {
+        List<Entity> entities = new ArrayList<>();
+        try {
+            if (entityBeans != null && !entityBeans.isEmpty()) {
+                for (EntityBean entityBean : entityBeans) {
+                    Entity entity = EntityUtil.getEntity(entityBean.getType(), entityBean.getName());
+                    entities.add(entity);
+                }
+            }
+            return entities;
+        } catch (FalconException e) {
+            throw new StateStoreException(e);
+        }
+    }
+
+    /**
+     * Convert instance of Entity's instance to InstanceBean of DB.
+     * @param instanceState
+     * @return
+     * @throws StateStoreException
+     * @throws IOException
+     */
+    public static InstanceBean convertToInstanceBean(InstanceState instanceState) throws StateStoreException,
+            IOException {
+        InstanceBean instanceBean = new InstanceBean();
+        ExecutionInstance instance = instanceState.getInstance();
+        if (instance.getActualEnd() != null) {
+            instanceBean.setActualEndTime(new Timestamp(instance.getActualEnd().getMillis()));
+        }
+        if (instance.getActualStart() != null) {
+            instanceBean.setActualStartTime(new Timestamp(instance.getActualStart().getMillis()));
+        }
+        if (instanceState.getCurrentState() != null) {
+            instanceBean.setCurrentState(instanceState.getCurrentState().toString());
+        }
+        if (instance.getExternalID() != null) {
+            instanceBean.setExternalID(instanceState.getInstance().getExternalID());
+        }
+
+        instanceBean.setCluster(instance.getCluster());
+        instanceBean.setCreationTime(new Timestamp(instance.getCreationTime().getMillis()));
+        instanceBean.setId(instance.getId().toString());
+        instanceBean.setInstanceTime(new Timestamp(instance.getInstanceTime().getMillis()));
+        instanceBean.setEntityId(new InstanceID(instance).getEntityID().toString());
+
+        instanceBean.setInstanceSequence(instance.getInstanceSequence());
+        if (instance.getAwaitingPredicates() != null && !instance.getAwaitingPredicates().isEmpty()) {
+            ObjectOutputStream out = null;
+            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+            try {
+                out = new ObjectOutputStream(byteArrayOutputStream);
+                out.writeInt(instance.getAwaitingPredicates().size());
+                for (Predicate predicate : instance.getAwaitingPredicates()) {
+                    out.writeObject(predicate);
+                }
+                instanceBean.setAwaitedPredicates(byteArrayOutputStream.toByteArray());
+            } finally {
+                IOUtils.closeQuietly(out);
+            }
+        }
+        return instanceBean;
+    }
+
+    /**
+     * Converts instance entry of DB to instance of ExecutionInstance.
+     * @param instanceBean
+     * @return
+     * @throws StateStoreException
+     * @throws IOException
+     */
+    public static InstanceState convertToInstanceState(InstanceBean instanceBean) throws StateStoreException,
+            IOException {
+        EntityType entityType = InstanceID.getEntityType(instanceBean.getId());
+        ExecutionInstance executionInstance = getExecutionInstance(entityType, instanceBean);
+        if (instanceBean.getActualEndTime() != null) {
+            executionInstance.setActualEnd(new DateTime(instanceBean.getActualEndTime().getTime()));
+        }
+        if (instanceBean.getActualStartTime() != null) {
+            executionInstance.setActualStart(new DateTime(instanceBean.getActualStartTime().getTime()));
+        }
+        executionInstance.setExternalID(instanceBean.getExternalID());
+        executionInstance.setInstanceSequence(instanceBean.getInstanceSequence());
+
+        byte[] result = instanceBean.getAwaitedPredicates();
+        List<Predicate> predicates = new ArrayList<>();
+        if (result != null && result.length != 0) {
+            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(result);
+            ObjectInputStream in = null;
+            try {
+                in = new ObjectInputStream(byteArrayInputStream);
+                int length = in.readInt();
+                for (int i = 0; i < length; i++) {
+                    Predicate predicate = (Predicate) in.readObject();
+                    predicates.add(predicate);
+                }
+            } catch (ClassNotFoundException e) {
+                throw new IOException(e);
+            } finally {
+                IOUtils.closeQuietly(in);
+            }
+        }
+        executionInstance.setAwaitingPredicates(predicates);
+        InstanceState instanceState = new InstanceState(executionInstance);
+        instanceState.setCurrentState(InstanceState.STATE.valueOf(instanceBean.getCurrentState()));
+        return instanceState;
+    }
+
+    /**
+     * Converting list of instance entries of DB to instance of ExecutionInstance.
+     * @param instanceBeanList
+     * @return
+     * @throws StateStoreException
+     * @throws IOException
+     */
+    public static Collection<InstanceState> convertToInstanceState(List<InstanceBean> instanceBeanList)
+        throws StateStoreException, IOException {
+        List<InstanceState> instanceStates = new ArrayList<>();
+        for (InstanceBean instanceBean : instanceBeanList) {
+            instanceStates.add(convertToInstanceState(instanceBean));
+        }
+        return instanceStates;
+    }
+
+    private static ExecutionInstance getExecutionInstance(EntityType entityType,
+                                                          InstanceBean instanceBean) throws StateStoreException {
+        try {
+            Entity entity = EntityUtil.getEntity(entityType, InstanceID.getEntityName(instanceBean.getId()));
+            return getExecutionInstance(entityType, entity, instanceBean.getInstanceTime().getTime(),
+                    instanceBean.getCluster(), instanceBean.getCreationTime().getTime());
+        } catch (FalconException e) {
+            throw new StateStoreException(e);
+        }
+    }
+
+    public static ExecutionInstance getExecutionInstance(EntityType entityType, Entity entity, long instanceTime,
+                                                         String cluster, long creationTime) throws StateStoreException {
+        if (entityType == EntityType.PROCESS) {
+            try {
+                return new ProcessExecutionInstance((org.apache.falcon.entity.v0.process.Process) entity,
+                        new DateTime(instanceTime), cluster, new DateTime(creationTime));
+            } catch (FalconException e) {
+                throw new StateStoreException("Entity not found");
+            }
+        } else {
+            throw new UnsupportedOperationException("Not supported for entity type " + entityType.toString());
+        }
+    }
+
+
+    public static byte[] getAwaitedPredicates(InstanceState instanceState) throws IOException {
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        ObjectOutputStream out = null;
+        try {
+            out = new ObjectOutputStream(byteArrayOutputStream);
+            out.writeInt(instanceState.getInstance().getAwaitingPredicates().size());
+            for (Predicate predicate : instanceState.getInstance().getAwaitingPredicates()) {
+                out.writeObject(predicate);
+            }
+            return byteArrayOutputStream.toByteArray();
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java
new file mode 100644
index 0000000..03ada39
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store.jdbc;
+
+import org.apache.openjpa.persistence.jdbc.Index;
+
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+//SUSPEND CHECKSTYLE CHECK  LineLengthCheck
+/**
+ * Entity object which will be stored in Data Base.
+ */
+@Entity
+@NamedQueries({
+        @NamedQuery(name = "GET_ENTITY", query = "select OBJECT(a) from EntityBean a where a.id = :id"),
+        @NamedQuery(name = "GET_ENTITY_FOR_STATE", query = "select OBJECT(a) from EntityBean a where a.state = :state"),
+        @NamedQuery(name = "UPDATE_ENTITY", query = "update EntityBean a set a.state = :state, a.name = :name, a.type = :type where a.id = :id"),
+        @NamedQuery(name = "GET_ENTITIES_FOR_TYPE", query = "select OBJECT(a) from EntityBean a where a.type = :type"),
+        @NamedQuery(name = "GET_ENTITIES", query = "select OBJECT(a) from EntityBean a"),
+        @NamedQuery(name = "DELETE_ENTITY", query = "delete from EntityBean a where a.id = :id"),
+        @NamedQuery(name = "DELETE_ENTITIES", query = "delete from EntityBean")})
+//RESUME CHECKSTYLE CHECK  LineLengthCheck
+@Table(name = "ENTITIES")
+public class EntityBean {
+    @NotNull
+    @Id
+    private String id;
+
+    @Basic
+    @NotNull
+    @Column(name = "name")
+    private String name;
+
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "type")
+    private String type;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "current_state")
+    private String state;
+
+    public EntityBean() {
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getState() {
+        return state;
+    }
+
+    public void setState(String state) {
+        this.state = state;
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
new file mode 100644
index 0000000..0e3dfa9
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store.jdbc;
+
+import org.apache.openjpa.persistence.jdbc.Index;
+
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Lob;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+import java.sql.Timestamp;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+ * Instance State which will be stored in DB.
+ */
+@Entity
+@NamedQueries({
+        @NamedQuery(name = "GET_INSTANCE", query = "select OBJECT(a) from InstanceBean a where a.id = :id"),
+        @NamedQuery(name = "DELETE_INSTANCE", query = "delete from InstanceBean a where a.id = :id"),
+        @NamedQuery(name = "DELETE_INSTANCE_FOR_ENTITY", query = "delete from InstanceBean a where a.entityId = :entityId"),
+        @NamedQuery(name = "UPDATE_INSTANCE", query = "update InstanceBean a set a.cluster = :cluster, a.externalID = :externalID, a.instanceTime = :instanceTime, a.creationTime = :creationTime, a.actualEndTime = :actualEndTime, a.currentState = :currentState, a.actualStartTime = :actualStartTime, a.instanceSequence = :instanceSequence, a.awaitedPredicates = :awaitedPredicates where a.id = :id"),
+        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster"),
+        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState)"),
+        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)"),
+        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES_WITH_RANGE", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState) AND a.instanceTime >= :startTime AND a.instanceTime < :endTime"),
+        @NamedQuery(name = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster order by a.instanceTime desc"),
+        @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a")
+})
+//RESUME CHECKSTYLE CHECK  LineLengthCheck
+@Table(name = "INSTANCES")
+public class InstanceBean {
+
+    @Id
+    @NotNull
+    private String id;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "entity_id")
+    private String entityId;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "cluster")
+    private String cluster;
+
+    @Basic
+    @Index
+    @Column(name = "external_id")
+    private String externalID;
+
+    @Basic
+    @Index
+    @Column(name = "instance_time")
+    private Timestamp instanceTime;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "creation_time")
+    private Timestamp creationTime;
+
+    @Basic
+    @Column(name = "actual_start_time")
+    private Timestamp actualStartTime;
+
+    @Basic
+    @Column(name = "actual_end_time")
+    private Timestamp actualEndTime;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "current_state")
+    private String currentState;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "instance_sequence")
+    private Integer instanceSequence;
+
+
+    @Column(name = "awaited_predicates", columnDefinition = "BLOB")
+    @Lob
+    private byte[] awaitedPredicates;
+
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getCluster() {
+        return cluster;
+    }
+
+    public void setCluster(String cluster) {
+        this.cluster = cluster;
+    }
+
+    public String getExternalID() {
+        return externalID;
+    }
+
+    public void setExternalID(String externalID) {
+        this.externalID = externalID;
+    }
+
+    public Timestamp getInstanceTime() {
+        return instanceTime;
+    }
+
+    public void setInstanceTime(Timestamp instanceTime) {
+        this.instanceTime = instanceTime;
+    }
+
+    public Timestamp getCreationTime() {
+        return creationTime;
+    }
+
+    public void setCreationTime(Timestamp creationTime) {
+        this.creationTime = creationTime;
+    }
+
+    public Timestamp getActualStartTime() {
+        return actualStartTime;
+    }
+
+    public void setActualStartTime(Timestamp actualStartTime) {
+        this.actualStartTime = actualStartTime;
+    }
+
+    public Timestamp getActualEndTime() {
+        return actualEndTime;
+    }
+
+    public void setActualEndTime(Timestamp actualEndTime) {
+        this.actualEndTime = actualEndTime;
+    }
+
+    public String getCurrentState() {
+        return currentState;
+    }
+
+    public void setCurrentState(String currentState) {
+        this.currentState = currentState;
+    }
+
+    public byte[] getAwaitedPredicates() {
+        return awaitedPredicates;
+    }
+
+    public void setAwaitedPredicates(byte[] awaitedPredicates) {
+        this.awaitedPredicates = awaitedPredicates;
+    }
+
+    public Integer getInstanceSequence() {
+        return instanceSequence;
+    }
+
+    public void setInstanceSequence(Integer instanceSequence) {
+        this.instanceSequence = instanceSequence;
+    }
+
+    public String getEntityId() {
+        return entityId;
+    }
+
+    public void setEntityId(String entityId) {
+        this.entityId = entityId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
new file mode 100644
index 0000000..ca65b94
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store.jdbc;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.exception.StateStoreException;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.state.EntityClusterID;
+import org.apache.falcon.state.EntityID;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceID;
+import org.apache.falcon.state.InstanceState;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.StateStore;
+import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.util.StartupProperties;
+import org.joda.time.DateTime;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Persistent Data Store for Entities and Instances.
+ */
+public final class JDBCStateStore extends AbstractStateStore {
+
+    private static final StateStore STORE = new JDBCStateStore();
+    private static final String DEBUG = "debug";
+
+    private JDBCStateStore() {}
+
+    public static StateStore get() {
+        return STORE;
+    }
+
+    @Override
+    public void clear() throws StateStoreException {
+        if (!isModeDebug()) {
+            throw new UnsupportedOperationException("Clear Method not supported");
+        }
+        deleteExecutionInstances();
+        deleteEntities();
+    }
+
+    @Override
+    public void putEntity(EntityState entityState) throws StateStoreException {
+        EntityID entityID = new EntityID(entityState.getEntity());
+        String key = entityID.getKey();
+        if (entityExists(entityID)) {
+            throw new StateStoreException("Entity with key, " + key + " already exists.");
+        }
+        EntityBean entityBean = BeanMapperUtil.convertToEntityBean(entityState);
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        entityManager.persist(entityBean);
+        commitAndCloseTransaction(entityManager);
+    }
+
+
+    @Override
+    public EntityState getEntity(EntityID entityID) throws StateStoreException {
+        EntityState entityState = getEntityByKey(entityID);
+        if (entityState == null) {
+            throw new StateStoreException("Entity with key, " + entityID + " does not exist.");
+        }
+        return entityState;
+    }
+
+    private EntityState getEntityByKey(EntityID id) throws StateStoreException {
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery("GET_ENTITY");
+        q.setParameter("id", id.getKey());
+        List result = q.getResultList();
+        if (result.isEmpty()) {
+            return null;
+        }
+        entityManager.close();
+        return BeanMapperUtil.convertToEntityState((EntityBean) result.get(0));
+    }
+
+    @Override
+    public boolean entityExists(EntityID entityID) throws StateStoreException {
+        return getEntityByKey(entityID) == null ? false : true;
+    }
+
+    @Override
+    public Collection<Entity> getEntities(EntityState.STATE state) throws StateStoreException {
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery("GET_ENTITY_FOR_STATE");
+        q.setParameter("state", state.toString());
+        List result = q.getResultList();
+        entityManager.close();
+        return BeanMapperUtil.convertToEntities(result);
+    }
+
+    @Override
+    public Collection<EntityState> getAllEntities() throws StateStoreException {
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery("GET_ENTITIES");
+        List result = q.getResultList();
+        entityManager.close();
+        return BeanMapperUtil.convertToEntityState(result);
+    }
+
+    @Override
+    public void updateEntity(EntityState entityState) throws StateStoreException {
+        EntityID entityID = new EntityID(entityState.getEntity());
+        if (!entityExists(entityID)) {
+            throw new StateStoreException("Entity with key, " + entityID + " doesn't exists.");
+        }
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery("UPDATE_ENTITY");
+        q.setParameter("id", entityID.getKey());
+        if (entityState.getCurrentState() != null) {
+            q.setParameter("state", entityState.getCurrentState().toString());
+        }
+        q.setParameter("type", entityState.getEntity().getEntityType().toString());
+        q.setParameter("name", entityState.getEntity().getName());
+        q.executeUpdate();
+        commitAndCloseTransaction(entityManager);
+    }
+
+    @Override
+    public void deleteEntity(EntityID entityID) throws StateStoreException {
+        if (!entityExists(entityID)) {
+            throw new StateStoreException("Entity with key, " + entityID.getKey() + " does not exist.");
+        }
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery("DELETE_ENTITY");
+        q.setParameter("id", entityID.getKey());
+        q.executeUpdate();
+        commitAndCloseTransaction(entityManager);
+    }
+
+    @Override
+    public void deleteEntities() throws StateStoreException {
+        if (!isModeDebug()) {
+            throw new UnsupportedOperationException("Delete Entities Table not supported");
+        }
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery("DELETE_ENTITIES");
+        q.executeUpdate();
+        commitAndCloseTransaction(entityManager);
+    }
+
+    @Override
+    public void putExecutionInstance(InstanceState instanceState) throws StateStoreException {
+        InstanceID instanceID = new InstanceID(instanceState.getInstance());
+        if (executionInstanceExists(instanceID)) {
+            throw new StateStoreException("Instance with key, " + instanceID + " already exists.");
+        }
+        try {
+            InstanceBean instanceBean = BeanMapperUtil.convertToInstanceBean(instanceState);
+            EntityManager entityManager = getEntityManager();
+            beginTransaction(entityManager);
+            entityManager.persist(instanceBean);
+            commitAndCloseTransaction(entityManager);
+        } catch (IOException e) {
+            throw new StateStoreException(e);
+        }
+    }
+
+    @Override
+    public InstanceState getExecutionInstance(InstanceID instanceId) throws StateStoreException {
+        InstanceState instanceState = getExecutionInstanceByKey(instanceId);
+        if (instanceState == null) {
+            throw new StateStoreException("Instance with key, " + instanceId.toString() + " does not exist.");
+        }
+        return instanceState;
+    }
+
+    private InstanceState getExecutionInstanceByKey(ID instanceKey) throws StateStoreException {
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery("GET_INSTANCE");
+        q.setParameter("id", instanceKey.toString());
+        List result = q.getResultList();
+        entityManager.close();
+        if (result.isEmpty()) {
+            return null;
+        }
+        try {
+            InstanceBean instanceBean = (InstanceBean)(result.get(0));
+            return BeanMapperUtil.convertToInstanceState(instanceBean);
+        } catch (IOException e) {
+            throw new StateStoreException(e);
+        }
+    }
+
+    @Override
+    public void updateExecutionInstance(InstanceState instanceState) throws StateStoreException {
+        InstanceID id = new InstanceID(instanceState.getInstance());
+        String key = id.toString();
+        if (!executionInstanceExists(id)) {
+            throw new StateStoreException("Instance with key, " + key + " does not exist.");
+        }
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery("UPDATE_INSTANCE");
+        ExecutionInstance instance = instanceState.getInstance();
+        q.setParameter("id", key);
+        q.setParameter("cluster", instance.getCluster());
+        q.setParameter("externalID", instance.getExternalID());
+        q.setParameter("instanceTime", new Timestamp(instance.getInstanceTime().getMillis()));
+        q.setParameter("creationTime", new Timestamp(instance.getCreationTime().getMillis()));
+        if (instance.getActualEnd() != null) {
+            q.setParameter("actualEndTime", new Timestamp(instance.getActualEnd().getMillis()));
+        }
+        q.setParameter("currentState", instanceState.getCurrentState().toString());
+        if (instance.getActualStart() != null) {
+            q.setParameter("actualStartTime", new Timestamp(instance.getActualStart().getMillis()));
+        }
+        q.setParameter("instanceSequence", instance.getInstanceSequence());
+        if (instanceState.getInstance().getAwaitingPredicates() != null
+                && !instanceState.getInstance().getAwaitingPredicates().isEmpty()) {
+            try {
+                q.setParameter("awaitedPredicates", BeanMapperUtil.getAwaitedPredicates(instanceState));
+            } catch (IOException e) {
+                throw new StateStoreException(e);
+            }
+        }
+        q.executeUpdate();
+        commitAndCloseTransaction(entityManager);
+    }
+
+    @Override
+    public Collection<InstanceState> getAllExecutionInstances(Entity entity, String cluster)
+        throws StateStoreException {
+        EntityClusterID id = new EntityClusterID(entity, cluster);
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery("GET_INSTANCES_FOR_ENTITY_CLUSTER");
+        q.setParameter("entityId", id.getEntityID().getKey());
+        q.setParameter("cluster", cluster);
+        List result  = q.getResultList();
+        entityManager.close();
+        try {
+            return BeanMapperUtil.convertToInstanceState(result);
+        } catch (IOException e) {
+            throw new StateStoreException(e);
+        }
+    }
+
+    @Override
+    public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster,
+                                                           Collection<InstanceState.STATE> states)
+        throws StateStoreException {
+        EntityClusterID entityClusterID = new EntityClusterID(entity, cluster);
+        String entityKey = entityClusterID.getEntityID().getKey();
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery("GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES");
+        q.setParameter("entityId", entityKey);
+        q.setParameter("cluster", cluster);
+        List<String> instanceStates = new ArrayList<>();
+        for (InstanceState.STATE state : states) {
+            instanceStates.add(state.toString());
+        }
+        q.setParameter("currentState", instanceStates);
+        List result  = q.getResultList();
+        entityManager.close();
+        try {
+            return BeanMapperUtil.convertToInstanceState(result);
+        } catch (IOException e) {
+            throw new StateStoreException(e);
+        }
+    }
+
+    @Override
+    public Collection<InstanceState> getExecutionInstances(EntityClusterID id,
+                                                           Collection<InstanceState.STATE> states)
+        throws StateStoreException {
+        String entityKey = id.getEntityID().getKey();
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery("GET_INSTANCES_FOR_ENTITY_FOR_STATES");
+        q.setParameter("entityId", entityKey);
+        List<String> instanceStates = new ArrayList<>();
+        for (InstanceState.STATE state : states) {
+            instanceStates.add(state.toString());
+        }
+        q.setParameter("currentState", instanceStates);
+        List result  = q.getResultList();
+        entityManager.close();
+        try {
+            return BeanMapperUtil.convertToInstanceState(result);
+        } catch (IOException e) {
+            throw new StateStoreException(e);
+        }
+    }
+
+    @Override
+    public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster,
+                                                           Collection<InstanceState.STATE> states, DateTime start,
+                                                           DateTime end) throws StateStoreException {
+        String entityKey = new EntityClusterID(entity, cluster).getEntityID().getKey();
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery("GET_INSTANCES_FOR_ENTITY_FOR_STATES_WITH_RANGE");
+        q.setParameter("entityId", entityKey);
+        List<String> instanceStates = new ArrayList<>();
+        for (InstanceState.STATE state : states) {
+            instanceStates.add(state.toString());
+        }
+        q.setParameter("currentState", instanceStates);
+        q.setParameter("startTime", new Timestamp(start.getMillis()));
+        q.setParameter("endTime", new Timestamp(end.getMillis()));
+        List result  = q.getResultList();
+        entityManager.close();
+        try {
+            return BeanMapperUtil.convertToInstanceState(result);
+        } catch (IOException e) {
+            throw new StateStoreException(e);
+        }
+    }
+
+    @Override
+    public InstanceState getLastExecutionInstance(Entity entity, String cluster) throws StateStoreException {
+        String key = new EntityClusterID(entity, cluster).getEntityID().getKey();
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery("GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER");
+        q.setParameter("entityId", key);
+        q.setParameter("cluster", cluster);
+        q.setMaxResults(1);
+        List result = q.getResultList();
+        entityManager.close();
+        if (!result.isEmpty()) {
+            try {
+                return BeanMapperUtil.convertToInstanceState((InstanceBean) result.get(0));
+            } catch (IOException e) {
+                throw new StateStoreException(e);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public boolean executionInstanceExists(InstanceID instanceKey) throws StateStoreException {
+        return getExecutionInstanceByKey(instanceKey) == null ? false : true;
+    }
+
+    @Override
+    public void deleteExecutionInstance(InstanceID instanceID) throws StateStoreException {
+        String instanceKey = instanceID.toString();
+        if (!executionInstanceExists(instanceID)) {
+            throw new StateStoreException("Instance with key, " + instanceKey + " does not exist.");
+        }
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery("DELETE_INSTANCE");
+        q.setParameter("id", instanceKey);
+        q.executeUpdate();
+        commitAndCloseTransaction(entityManager);
+    }
+
+    @Override
+    public void deleteExecutionInstances(EntityID entityID) {
+        String entityKey = entityID.getKey();
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery("DELETE_INSTANCE_FOR_ENTITY");
+        q.setParameter("entityId", entityKey);
+        q.executeUpdate();
+        commitAndCloseTransaction(entityManager);
+    }
+
+    @Override
+    public void deleteExecutionInstances() {
+        if (!isModeDebug()) {
+            throw new UnsupportedOperationException("Delete Instances Table not supported");
+        }
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery("DELETE_INSTANCES_TABLE");
+        q.executeUpdate();
+        commitAndCloseTransaction(entityManager);
+    }
+
+    // Debug enabled for test cases
+    private boolean isModeDebug() {
+        return DEBUG.equals(StartupProperties.get().getProperty("domain")) ? true : false;
+    }
+
+    private void commitAndCloseTransaction(EntityManager entityManager) {
+        entityManager.getTransaction().commit();
+        entityManager.close();
+    }
+
+    private void beginTransaction(EntityManager entityManager) {
+        entityManager.getTransaction().begin();
+    }
+
+    private EntityManager getEntityManager() {
+        return FalconJPAService.get().getEntityManager();
+    }
+
+}


Mime
View raw message