falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject [2/4] falcon git commit: FALCON-1234 State Store for instances scheduled by Falcon (Pavan Kolamuri)
Date Thu, 26 Nov 2015 10:28:26 GMT
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java b/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java
new file mode 100644
index 0000000..72d1aba
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store.service;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.service.FalconService;
+import org.apache.falcon.state.store.jdbc.EntityBean;
+import org.apache.falcon.state.store.jdbc.InstanceBean;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.Persistence;
+import java.text.MessageFormat;
+import java.util.Properties;
+
+/**
+ * Service that manages JPA.
+ */
+public final class FalconJPAService implements FalconService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FalconJPAService.class);
+    public static final String PREFIX = "falcon.statestore.";
+
+    public static final String DB_SCHEMA = PREFIX + "schema.name";
+    public static final String URL = PREFIX + "jdbc.url";
+    public static final String DRIVER = PREFIX + "jdbc.driver";
+    public static final String USERNAME = PREFIX + "jdbc.username";
+    public static final String PASSWORD = PREFIX + "jdbc.password";
+    public static final String CONN_DATA_SOURCE = PREFIX + "connection.data.source";
+    public static final String CONN_PROPERTIES = PREFIX + "connection.properties";
+    public static final String MAX_ACTIVE_CONN = PREFIX + "pool.max.active.conn";
+    public static final String CREATE_DB_SCHEMA = PREFIX + "create.db.schema";
+    public static final String VALIDATE_DB_CONN = PREFIX + "validate.db.connection";
+    public static final String VALIDATE_DB_CONN_EVICTION_INTERVAL = PREFIX + "validate.db.connection.eviction.interval";
+    public static final String VALIDATE_DB_CONN_EVICTION_NUM = PREFIX + "validate.db.connection.eviction.num";
+
+    private EntityManagerFactory entityManagerFactory;
+    // Persistent Unit which is defined in persistence.xml
+    private String persistenceUnit;
+    private static final FalconJPAService FALCON_JPA_SERVICE = new FalconJPAService();
+
+    private FalconJPAService() {
+    }
+
+    public static FalconJPAService get() {
+        return FALCON_JPA_SERVICE;
+    }
+
+    public EntityManagerFactory getEntityManagerFactory() {
+        return entityManagerFactory;
+    }
+
+    public void setPersistenceUnit(String dbType) {
+        if (StringUtils.isEmpty(dbType)) {
+            throw new IllegalArgumentException(" DB type cannot be null or empty");
+        }
+        dbType = dbType.split(":")[0];
+        this.persistenceUnit = "falcon-" + dbType;
+    }
+
+    @Override
+    public String getName() {
+        return this.getClass().getSimpleName();
+    }
+
+    @Override
+    public void init() throws FalconException {
+        Properties props = getPropsforStore();
+        entityManagerFactory = Persistence.
+                createEntityManagerFactory(persistenceUnit, props);
+        EntityManager entityManager = getEntityManager();
+        entityManager.find(EntityBean.class, 1);
+        entityManager.find(InstanceBean.class, 1);
+        LOG.info("All entities initialized");
+
+        // need to use a pseudo no-op transaction so all entities, datasource
+        // and connection pool are initialized one time only
+        entityManager.getTransaction().begin();
+        OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) entityManagerFactory;
+        // Mask the password with '***'
+        String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,");
+        LOG.info("JPA configuration: {0}", logMsg);
+        entityManager.getTransaction().commit();
+        entityManager.close();
+    }
+
+    private Properties getPropsforStore() throws FalconException {
+        String dbSchema = StartupProperties.get().getProperty(DB_SCHEMA);
+        String url = StartupProperties.get().getProperty(URL);
+        String driver = StartupProperties.get().getProperty(DRIVER);
+        String user = StartupProperties.get().getProperty(USERNAME);
+        String password = StartupProperties.get().getProperty(PASSWORD).trim();
+        String maxConn = StartupProperties.get().getProperty(MAX_ACTIVE_CONN).trim();
+        String dataSource = StartupProperties.get().getProperty(CONN_DATA_SOURCE);
+        String connPropsConfig = StartupProperties.get().getProperty(CONN_PROPERTIES);
+        boolean autoSchemaCreation = Boolean.parseBoolean(StartupProperties.get().getProperty(CREATE_DB_SCHEMA,
+                "false"));
+        boolean validateDbConn = Boolean.parseBoolean(StartupProperties.get().getProperty(VALIDATE_DB_CONN, "true"));
+        String evictionInterval = StartupProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_INTERVAL).trim();
+        String evictionNum = StartupProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_NUM).trim();
+
+        if (!url.startsWith("jdbc:")) {
+            throw new FalconException("invalid JDBC URL, must start with 'jdbc:'" + url);
+        }
+        String dbType = url.substring("jdbc:".length());
+        if (dbType.indexOf(":") <= 0) {
+            throw new FalconException("invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'" + url);
+        }
+        setPersistenceUnit(dbType);
+        String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}";
+        connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn);
+        Properties props = new Properties();
+        if (autoSchemaCreation) {
+            connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
+            props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)");
+        } else if (validateDbConn) {
+            // validation can be done only if the schema already exist, else a
+            // connection cannot be obtained to create the schema.
+            String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval;
+            String num = "numTestsPerEvictionRun=" + evictionNum;
+            connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num;
+            connProps += ",ValidationQuery=select count(*) from ENTITIES";
+            connProps = MessageFormat.format(connProps, dbSchema);
+        } else {
+            connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
+        }
+        if (connPropsConfig != null) {
+            connProps += "," + connPropsConfig;
+        }
+        props.setProperty("openjpa.ConnectionProperties", connProps);
+        props.setProperty("openjpa.ConnectionDriverName", dataSource);
+        return props;
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+        if (entityManagerFactory.isOpen()) {
+            entityManagerFactory.close();
+        }
+    }
+
+
+    /**
+     * Return an EntityManager. Used by the StoreService.
+     *
+     * @return an entity manager
+     */
+    public EntityManager getEntityManager() {
+        return getEntityManagerFactory().createEntityManager();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
new file mode 100644
index 0000000..381b0b3
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
@@ -0,0 +1,435 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.tools;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.falcon.cli.CLIParser;
+import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.util.BuildProperties;
+import org.apache.falcon.util.StartupProperties;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Command Line utility for Table Creation, Update.
+ */
+public class FalconStateStoreDBCLI {
+    public static final String HELP_CMD = "help";
+    public static final String VERSION_CMD = "version";
+    public static final String CREATE_CMD = "create";
+    public static final String SQL_FILE_OPT = "sqlfile";
+    public static final String RUN_OPT = "run";
+    public static final String UPGRADE_CMD = "upgrade";
+
+    // Represents whether DB instance exists or not.
+    private boolean instanceExists;
+    private static final String[] FALCON_HELP = {"Falcon DB initialization tool currently supports Derby DB"};
+
+    public static void main(String[] args) {
+        new FalconStateStoreDBCLI().run(args);
+    }
+
+    public FalconStateStoreDBCLI() {
+        instanceExists = false;
+    }
+
+    protected Options getOptions() {
+        Option sqlfile = new Option(SQL_FILE_OPT, true,
+                "Generate SQL script instead of creating/upgrading the DB schema");
+        Option run = new Option(RUN_OPT, false, "Confirmation option regarding DB schema creation/upgrade");
+        Options options = new Options();
+        options.addOption(sqlfile);
+        options.addOption(run);
+        return options;
+    }
+
+    public synchronized int run(String[] args) {
+        if (instanceExists) {
+            throw new IllegalStateException("CLI instance already used");
+        }
+        instanceExists = true;
+
+        CLIParser parser = new CLIParser("falcondb", FALCON_HELP);
+        parser.addCommand(HELP_CMD, "", "Display usage for all commands or specified command", new Options(), false);
+        parser.addCommand(VERSION_CMD, "", "Show Falcon DB version information", new Options(), false);
+        parser.addCommand(CREATE_CMD, "", "Create Falcon DB schema", getOptions(), false);
+        parser.addCommand(UPGRADE_CMD, "", "Upgrade Falcon DB schema", getOptions(), false);
+
+        try {
+            CLIParser.Command command = parser.parse(args);
+            if (command.getName().equals(HELP_CMD)) {
+                parser.showHelp();
+            } else if (command.getName().equals(VERSION_CMD)) {
+                showVersion();
+            } else {
+                if (!command.getCommandLine().hasOption(SQL_FILE_OPT)
+                        && !command.getCommandLine().hasOption(RUN_OPT)) {
+                    throw new Exception("'-sqlfile <FILE>' or '-run' options must be specified");
+                }
+                CommandLine commandLine = command.getCommandLine();
+                String sqlFile = (commandLine.hasOption(SQL_FILE_OPT))
+                        ? commandLine.getOptionValue(SQL_FILE_OPT)
+                        : File.createTempFile("falcondb-", ".sql").getAbsolutePath();
+                boolean run = commandLine.hasOption(RUN_OPT);
+                if (command.getName().equals(CREATE_CMD)) {
+                    createDB(sqlFile, run);
+                } else if (command.getName().equals(UPGRADE_CMD)) {
+                    upgradeDB(sqlFile, run);
+                }
+                System.out.println("The SQL commands have been written to: " + sqlFile);
+                if (!run) {
+                    System.out.println("WARN: The SQL commands have NOT been executed, you must use the '-run' option");
+                }
+            }
+            return 0;
+        } catch (ParseException ex) {
+            System.err.println("Invalid sub-command: " + ex.getMessage());
+            System.err.println();
+            System.err.println(parser.shortHelp());
+            return 1;
+        } catch (Exception ex) {
+            System.err.println();
+            System.err.println("Error: " + ex.getMessage());
+            System.err.println();
+            System.err.println("Stack trace for the error was (for debug purposes):");
+            System.err.println("--------------------------------------");
+            ex.printStackTrace(System.err);
+            System.err.println("--------------------------------------");
+            System.err.println();
+            return 1;
+        }
+    }
+
+    private void upgradeDB(String sqlFile, boolean run) throws Exception {
+        validateConnection();
+        if (!checkDBExists()) {
+            throw new Exception("Falcon DB doesn't exist");
+        }
+        String falconVersion = BuildProperties.get().getProperty("project.version");
+        String dbVersion = getFalconDBVersion();
+        if (dbVersion.compareTo(falconVersion) >= 0) {
+            System.out.println("Falcon DB already upgraded to Falcon version '" + falconVersion + "'");
+            return;
+        }
+
+        createUpgradeDB(sqlFile, run, false);
+        upgradeFalconDBVersion(sqlFile, run, falconVersion);
+
+        // any post upgrade tasks
+        if (run) {
+            System.out.println("Falcon DB has been upgraded to Falcon version '" + falconVersion + "'");
+        }
+    }
+
+
+    private void upgradeFalconDBVersion(String sqlFile, boolean run, String version) throws Exception {
+        String updateDBVersion = "update FALCON_DB_PROPS set data='" + version + "' where name='db.version'";
+        PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
+        writer.println();
+        writer.println(updateDBVersion);
+        writer.close();
+        System.out.println("Upgrade db.version in FALCON_DB_PROPS table to " + version);
+        if (run) {
+            Connection conn = createConnection();
+            Statement st = null;
+            try {
+                conn.setAutoCommit(true);
+                st = conn.createStatement();
+                st.executeUpdate(updateDBVersion);
+                st.close();
+            } catch (Exception ex) {
+                throw new Exception("Could not upgrade db.version in FALCON_DB_PROPS table: " + ex.toString(), ex);
+            } finally {
+                closeStatement(st);
+                conn.close();
+            }
+        }
+        System.out.println("DONE");
+    }
+
+    private static final String GET_FALCON_DB_VERSION = "select data from FALCON_DB_PROPS where name = 'db.version'";
+
+    private String getFalconDBVersion() throws Exception {
+        String version;
+        System.out.println("Get Falcon DB version");
+        Connection conn = createConnection();
+        Statement st = null;
+        ResultSet rs = null;
+        try {
+            st = conn.createStatement();
+            rs = st.executeQuery(GET_FALCON_DB_VERSION);
+            if (rs.next()) {
+                version = rs.getString(1);
+            } else {
+                throw new Exception("ERROR: Could not find Falcon DB 'db.version' in FALCON_DB_PROPS table");
+            }
+        } catch (Exception ex) {
+            throw new Exception("ERROR: Could not query FALCON_DB_PROPS table: " + ex.toString(), ex);
+        } finally {
+            closeResultSet(rs);
+            closeStatement(st);
+            conn.close();
+        }
+        System.out.println("DONE");
+        return version;
+    }
+
+
+    private Map<String, String> getJdbcConf() throws Exception {
+        Map<String, String> jdbcConf = new HashMap<String, String>();
+        jdbcConf.put("driver", StartupProperties.get().getProperty(FalconJPAService.DRIVER));
+        String url = StartupProperties.get().getProperty(FalconJPAService.URL);
+        jdbcConf.put("url", url);
+        jdbcConf.put("user", StartupProperties.get().getProperty(FalconJPAService.USERNAME));
+        jdbcConf.put("password", StartupProperties.get().getProperty(FalconJPAService.PASSWORD));
+        String dbType = url.substring("jdbc:".length());
+        if (dbType.indexOf(":") <= 0) {
+            throw new RuntimeException("Invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'");
+        }
+        dbType = dbType.substring(0, dbType.indexOf(":"));
+        jdbcConf.put("dbtype", dbType);
+        return jdbcConf;
+    }
+
+    private String[] createMappingToolArguments(String sqlFile) throws Exception {
+        Map<String, String> conf = getJdbcConf();
+        List<String> args = new ArrayList<String>();
+        args.add("-schemaAction");
+        args.add("add");
+        args.add("-p");
+        args.add("persistence.xml#falcon-" + conf.get("dbtype"));
+        args.add("-connectionDriverName");
+        args.add(conf.get("driver"));
+        args.add("-connectionURL");
+        args.add(conf.get("url"));
+        args.add("-connectionUserName");
+        args.add(conf.get("user"));
+        args.add("-connectionPassword");
+        args.add(conf.get("password"));
+        if (sqlFile != null) {
+            args.add("-sqlFile");
+            args.add(sqlFile);
+        }
+        args.add("-indexes");
+        args.add("true");
+        args.add("org.apache.falcon.state.store.jdbc.EntityBean");
+        args.add("org.apache.falcon.state.store.jdbc.InstanceBean");
+        return args.toArray(new String[args.size()]);
+    }
+
+    private void createDB(String sqlFile, boolean run) throws Exception {
+        validateConnection();
+        if (checkDBExists()) {
+            return;
+        }
+
+        verifyFalconPropsTable(false);
+        createUpgradeDB(sqlFile, run, true);
+        createFalconPropsTable(sqlFile, run, BuildProperties.get().getProperty("project.version"));
+        if (run) {
+            System.out.println("Falcon DB has been created for Falcon version '"
+                    + BuildProperties.get().getProperty("project.version") + "'");
+        }
+    }
+
+    private static final String CREATE_FALCON_DB_PROPS =
+            "create table FALCON_DB_PROPS (name varchar(100), data varchar(100))";
+
+    private void createFalconPropsTable(String sqlFile, boolean run, String version) throws Exception {
+        String insertDbVerion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')";
+
+        PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
+        writer.println();
+        writer.println(CREATE_FALCON_DB_PROPS);
+        writer.println(insertDbVerion);
+        writer.close();
+        System.out.println("Create FALCON_DB_PROPS table");
+        if (run) {
+            Connection conn = createConnection();
+            Statement st = null;
+            try {
+                conn.setAutoCommit(true);
+                st = conn.createStatement();
+                st.executeUpdate(CREATE_FALCON_DB_PROPS);
+                st.executeUpdate(insertDbVerion);
+                st.close();
+            } catch (Exception ex) {
+                closeStatement(st);
+                throw new Exception("Could not create FALCON_DB_PROPS table: " + ex.toString(), ex);
+            } finally {
+                conn.close();
+            }
+        }
+        System.out.println("DONE");
+    }
+
+    private static final String FALCON_DB_PROPS_EXISTS = "select count(*) from FALCON_DB_PROPS";
+
+    private boolean verifyFalconPropsTable(boolean exists) throws Exception {
+        System.out.println((exists) ? "Check FALCON_DB_PROPS table exists"
+                : "Checking FALCON_DB_PROPS table does not exist");
+        boolean tableExists;
+        Connection conn = createConnection();
+        Statement st = null;
+        ResultSet rs = null;
+        try {
+            st = conn.createStatement();
+            rs = st.executeQuery(FALCON_DB_PROPS_EXISTS);
+            rs.next();
+            tableExists = true;
+        } catch (Exception ex) {
+            tableExists = false;
+        } finally {
+            closeResultSet(rs);
+            closeStatement(st);
+            conn.close();
+        }
+        if (tableExists != exists) {
+            throw new Exception("FALCON_DB_PROPS_TABLE table " + ((exists) ? "does not exist" : "exists"));
+        }
+        System.out.println("DONE");
+        return tableExists;
+    }
+
+    private void closeResultSet(ResultSet rs) {
+        try {
+            if (rs != null) {
+                rs.close();
+            }
+        } catch (Exception e) {
+            System.out.println("Unable to close ResultSet " + rs);
+        }
+    }
+
+    private void closeStatement(Statement st) throws Exception {
+        try {
+            if (st != null) {
+                st.close();
+            }
+        } catch (Exception e) {
+            System.out.println("Unable to close SQL Statement " + st);
+            throw new Exception(e);
+        }
+    }
+
+    private Connection createConnection() throws Exception {
+        Map<String, String> conf = getJdbcConf();
+        Class.forName(conf.get("driver")).newInstance();
+        return DriverManager.getConnection(conf.get("url"), conf.get("user"), conf.get("password"));
+    }
+
+    private void validateConnection() throws Exception {
+        System.out.println("Validating DB Connection");
+        try {
+            createConnection().close();
+            System.out.println("DONE");
+        } catch (Exception ex) {
+            throw new Exception("Could not connect to the database: " + ex.toString(), ex);
+        }
+    }
+
+    private static final String ENTITY_STATUS_QUERY =
+            "select count(*) from ENTITIES where current_state IN ('RUNNING', 'SUSPENDED')";
+    private static final String INSTANCE_STATUS_QUERY =
+            "select count(*) from INSTANCES where current_state IN ('RUNNING', 'SUSPENDED')";
+
+    private boolean checkDBExists() throws Exception {
+        boolean schemaExists;
+        Connection conn = createConnection();
+        ResultSet rs =  null;
+        Statement st = null;
+        try {
+            st = conn.createStatement();
+            rs = st.executeQuery(ENTITY_STATUS_QUERY);
+            rs.next();
+            schemaExists = true;
+        } catch (Exception ex) {
+            schemaExists = false;
+        } finally {
+            closeResultSet(rs);
+            closeStatement(st);
+            conn.close();
+        }
+        System.out.println("DB schema " + ((schemaExists) ? "exists" : "does not exist"));
+        return schemaExists;
+    }
+
+    private void createUpgradeDB(String sqlFile, boolean run, boolean create) throws Exception {
+        System.out.println((create) ? "Create SQL schema" : "Upgrade SQL schema");
+        String[] args = createMappingToolArguments(sqlFile);
+        org.apache.openjpa.jdbc.meta.MappingTool.main(args);
+        if (run) {
+            args = createMappingToolArguments(null);
+            org.apache.openjpa.jdbc.meta.MappingTool.main(args);
+        }
+        System.out.println("DONE");
+    }
+
+    private void showVersion() throws Exception {
+        System.out.println("Falcon Server version: "
+                + BuildProperties.get().getProperty("project.version"));
+        validateConnection();
+        if (!checkDBExists()) {
+            throw new Exception("Falcon DB doesn't exist");
+        }
+        try {
+            verifyFalconPropsTable(true);
+        } catch (Exception ex) {
+            throw new Exception("ERROR: It seems this Falcon DB was never upgraded with the 'falcondb' tool");
+        }
+        showFalconPropsInfo();
+    }
+
+    private static final String GET_FALCON_PROPS_INFO = "select name, data from FALCON_DB_PROPS order by name";
+
+    private void showFalconPropsInfo() throws Exception {
+        Connection conn = createConnection();
+        Statement st = null;
+        ResultSet rs = null;
+        try {
+            System.out.println("Falcon DB Version Information");
+            System.out.println("--------------------------------------");
+            st = conn.createStatement();
+            rs = st.executeQuery(GET_FALCON_PROPS_INFO);
+            while (rs.next()) {
+                System.out.println(rs.getString(1) + ": " + rs.getString(2));
+            }
+            System.out.println("--------------------------------------");
+        } catch (Exception ex) {
+            throw new Exception("ERROR querying FALCON_DB_PROPS table: " + ex.toString(), ex);
+        } finally {
+            closeResultSet(rs);
+            closeStatement(st);
+            conn.close();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/scheduler/src/main/resources/META-INF/persistence.xml b/scheduler/src/main/resources/META-INF/persistence.xml
new file mode 100644
index 0000000..86558de
--- /dev/null
+++ b/scheduler/src/main/resources/META-INF/persistence.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<persistence xmlns="http://java.sun.com/xml/ns/persistence"
+             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+             version="1.0">
+
+    <persistence-unit name="falcon-derby" transaction-type="RESOURCE_LOCAL">
+        <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
+
+        <class>org.apache.falcon.state.store.jdbc.EntityBean</class>
+        <class>org.apache.falcon.state.store.jdbc.InstanceBean</class>
+
+        <properties>
+            <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
+
+            <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
+
+            <property name="openjpa.MetaDataFactory"
+                      value="jpa(Types=org.apache.falcon.state.store.EntityBean;
+                org.apache.falcon.state.store.InstanceBean)"></property>
+
+            <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
+            <property name="openjpa.LockManager" value="pessimistic"/>
+            <property name="openjpa.ReadLockLevel" value="read"/>
+            <property name="openjpa.WriteLockLevel" value="write"/>
+            <property name="openjpa.jdbc.TransactionIsolation" value="read-committed"/> <!--CUSTOM-->
+            <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/>
+            <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/>
+            <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
+            <property name="openjpa.Log" value="log4j"/>
+        </properties>
+    </persistence-unit>
+
+</persistence>

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/resources/falcon-buildinfo.properties
----------------------------------------------------------------------
diff --git a/scheduler/src/main/resources/falcon-buildinfo.properties b/scheduler/src/main/resources/falcon-buildinfo.properties
new file mode 100644
index 0000000..5a7cb82
--- /dev/null
+++ b/scheduler/src/main/resources/falcon-buildinfo.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######################
+*.domain=all
+
+*.build.user=${user.name}
+*.build.epoch=${timestamp}
+*.project.version=${pom.version}
+*.build.version=${pom.version}-r${buildNumber}
+*.vc.revision=${buildNumber}
+*.vc.source.url=${scm.connection}
+######################

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
index bff92c9..2a9fbce 100644
--- a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
@@ -19,7 +19,6 @@ package org.apache.falcon.execution;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.entity.AbstractTestBase;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
@@ -36,6 +35,7 @@ import org.apache.falcon.notification.service.impl.DataAvailabilityService;
 import org.apache.falcon.notification.service.impl.JobCompletionService;
 import org.apache.falcon.notification.service.impl.SchedulerService;
 import org.apache.falcon.service.Services;
+import org.apache.falcon.state.AbstractSchedulerTestBase;
 import org.apache.falcon.state.EntityClusterID;
 import org.apache.falcon.state.EntityID;
 import org.apache.falcon.state.EntityState;
@@ -43,7 +43,8 @@ import org.apache.falcon.state.ID;
 import org.apache.falcon.state.InstanceID;
 import org.apache.falcon.state.InstanceState;
 import org.apache.falcon.state.store.AbstractStateStore;
-import org.apache.falcon.state.store.InMemoryStateStore;
+import org.apache.falcon.state.store.StateStore;
+import org.apache.falcon.state.store.service.FalconJPAService;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.engine.DAGEngine;
 import org.apache.falcon.workflow.engine.DAGEngineFactory;
@@ -59,6 +60,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -68,32 +70,38 @@ import java.util.Iterator;
 /**
  * Tests the API of FalconExecution Service and in turn the FalconExecutionService.get()s.
  */
-public class FalconExecutionServiceTest extends AbstractTestBase {
+public class FalconExecutionServiceTest extends AbstractSchedulerTestBase {
 
-    private InMemoryStateStore stateStore = null;
+    private StateStore stateStore = null;
     private AlarmService mockTimeService;
     private DataAvailabilityService mockDataService;
     private SchedulerService mockSchedulerService;
     private JobCompletionService mockCompletionService;
     private DAGEngine dagEngine;
     private int instanceCount = 0;
+    private static FalconJPAService falconJPAService = FalconJPAService.get();
 
     @BeforeClass
     public void init() throws Exception {
         this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
         this.conf = dfsCluster.getConf();
         setupServices();
+        super.setup();
+        createDB(DB_SQL_FILE);
+        falconJPAService.init();
         setupConfigStore();
     }
 
     @AfterClass
-    public void tearDown() {
+    public void tearDown() throws FalconException, IOException {
+        super.cleanup();
         this.dfsCluster.shutdown();
+        falconJPAService.destroy();
     }
 
     // State store is set up to sync with Config Store. That gets tested too.
     public void setupConfigStore() throws Exception {
-        stateStore = (InMemoryStateStore) AbstractStateStore.get();
+        stateStore = AbstractStateStore.get();
         getStore().registerListener(stateStore);
         storeEntity(EntityType.CLUSTER, "testCluster");
         storeEntity(EntityType.FEED, "clicksFeed");
@@ -160,6 +168,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         FalconExecutionService.get().onEvent(event);
 
         // Ensure the instance is ready for execution
+        instance = stateStore.getExecutionInstance(new InstanceID(instance.getInstance()));
         Assert.assertEquals(instance.getCurrentState(), InstanceState.STATE.READY);
 
         // Simulate a scheduled notification
@@ -211,12 +220,15 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         FalconExecutionService.get().onEvent(event);
 
         // One in ready and one in waiting. Both should be suspended.
+        instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
         Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.READY);
         Assert.assertEquals(instance1.getInstance().getAwaitingPredicates().size(), 0);
         Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.WAITING);
 
         FalconExecutionService.get().suspend(process);
 
+        instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
+        instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance()));
         Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUSPENDED);
         Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.SUSPENDED);
         Mockito.verify(mockDataService).unregister(FalconExecutionService.get(),
@@ -225,7 +237,11 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
                 instance2.getInstance().getId());
         Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), executorID);
 
+        Mockito.verify(mockDataService).unregister(FalconExecutionService.get(), executorID);
+
         FalconExecutionService.get().resume(process);
+        instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
+        instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance()));
         Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.READY);
         Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.WAITING);
 
@@ -237,17 +253,22 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         FalconExecutionService.get().onEvent(event);
 
         // One in running and the other in ready. Both should be suspended
+        instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
         Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
         Mockito.when(dagEngine.isScheduled(instance1.getInstance())).thenReturn(true);
+        instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance()));
         Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
 
         FalconExecutionService.get().suspend(process);
 
+        instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
+        instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance()));
         Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUSPENDED);
         Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.SUSPENDED);
 
         FalconExecutionService.get().resume(process);
-
+        instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
+        instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance()));
         Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
         Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
 
@@ -255,6 +276,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         event = createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, instance1.getInstance());
         FalconExecutionService.get().onEvent(event);
 
+        instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
         Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUCCEEDED);
     }
 
@@ -294,6 +316,9 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         FalconExecutionService.get().onEvent(event);
 
         // One in ready, one in waiting and one running.
+        instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
+        instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance()));
+        instance3 = stateStore.getExecutionInstance(new InstanceID(instance3.getInstance()));
         Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
         Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
         Assert.assertEquals(instance3.getCurrentState(), InstanceState.STATE.WAITING);
@@ -329,6 +354,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
 
         FalconExecutionService.get().onEvent(dataEvent);
 
+        instanceState = stateStore.getExecutionInstance(new InstanceID(instanceState.getInstance()));
         Assert.assertEquals(instanceState.getCurrentState(), InstanceState.STATE.TIMED_OUT);
     }
 
@@ -390,6 +416,9 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         FalconExecutionService.get().onEvent(event);
 
         // One in ready, one in waiting and one running.
+        instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
+        instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance()));
+        instance3 = stateStore.getExecutionInstance(new InstanceID(instance3.getInstance()));
         Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
         Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
         Assert.assertEquals(instance3.getCurrentState(), InstanceState.STATE.WAITING);
@@ -442,7 +471,9 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         EntityID processID = new EntityID(process);
 
         // Store couple of instances in store
-        stateStore.getEntity(processID).setCurrentState(EntityState.STATE.SCHEDULED);
+        EntityState entityState = stateStore.getEntity(processID);
+        entityState.setCurrentState(EntityState.STATE.SCHEDULED);
+        stateStore.updateEntity(entityState);
         ProcessExecutionInstance instance1 = new ProcessExecutionInstance(process,
                 new DateTime(System.currentTimeMillis() - 60 * 60 * 1000), clusterName);
         InstanceState instanceState1 = new InstanceState(instance1);
@@ -459,11 +490,13 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         // Simulate a scheduled notification. This should cause the reload from state store
         Event event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instanceState2.getInstance());
         FalconExecutionService.get().onEvent(event);
+        instanceState2 = stateStore.getExecutionInstance(new InstanceID(instanceState2.getInstance()));
         Assert.assertEquals(instanceState2.getCurrentState(), InstanceState.STATE.RUNNING);
 
         // Simulate a Job completion notification and ensure the instance resumes from where it left
         event = createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, instanceState1.getInstance());
         FalconExecutionService.get().onEvent(event);
+        instanceState1 = stateStore.getExecutionInstance(new InstanceID(instanceState1.getInstance()));
         Assert.assertEquals(instanceState1.getCurrentState(), InstanceState.STATE.SUCCEEDED);
     }
 
@@ -500,6 +533,8 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
             Assert.fail("Exception expected.");
         } catch (Exception e) {
             // One instance must fail and the other not
+            instanceState1 = stateStore.getExecutionInstance(new InstanceID(instanceState1.getInstance()));
+            instanceState2 = stateStore.getExecutionInstance(new InstanceID(instanceState2.getInstance()));
             Assert.assertEquals(instanceState2.getCurrentState(), state);
             Assert.assertEquals(instanceState1.getCurrentState(), InstanceState.STATE.RUNNING);
         }
@@ -508,6 +543,8 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         ((MockDAGEngine)dagEngine).removeFailInstance(instance1);
         m.invoke(FalconExecutionService.get(), process);
 
+        instanceState1 = stateStore.getExecutionInstance(new InstanceID(instanceState1.getInstance()));
+        instanceState2 = stateStore.getExecutionInstance(new InstanceID(instanceState2.getInstance()));
         // Both instances must be in expected state.
         Assert.assertEquals(instanceState2.getCurrentState(), state);
         Assert.assertEquals(instanceState1.getCurrentState(), state);

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
index 001f466..c43ccf0 100644
--- a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
@@ -40,7 +40,7 @@ import org.apache.falcon.state.ID;
 import org.apache.falcon.state.InstanceID;
 import org.apache.falcon.state.InstanceState;
 import org.apache.falcon.state.store.AbstractStateStore;
-import org.apache.falcon.state.store.InMemoryStateStore;
+import org.apache.falcon.state.store.StateStore;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.engine.DAGEngine;
 import org.apache.falcon.workflow.engine.DAGEngineFactory;
@@ -63,10 +63,10 @@ import static org.apache.falcon.state.InstanceState.STATE;
  */
 public class SchedulerServiceTest extends AbstractTestBase {
 
-    private SchedulerService scheduler = Mockito.spy(new SchedulerService());
+    private SchedulerService scheduler;
     private NotificationHandler handler;
     private static String cluster = "testCluster";
-    private static InMemoryStateStore stateStore = (InMemoryStateStore) AbstractStateStore.get();
+    private static StateStore stateStore;
     private static DAGEngine mockDagEngine;
     private static Process process;
     private volatile boolean failed = false;
@@ -79,6 +79,10 @@ public class SchedulerServiceTest extends AbstractTestBase {
 
     @BeforeClass
     public void init() throws Exception {
+        StartupProperties.get().setProperty("falcon.state.store.impl",
+                "org.apache.falcon.state.store.InMemoryStateStore");
+        stateStore = AbstractStateStore.get();
+        scheduler = Mockito.spy(new SchedulerService());
         this.dfsCluster = EmbeddedCluster.newCluster(cluster);
         this.conf = dfsCluster.getConf();
         setupConfigStore();
@@ -97,6 +101,7 @@ public class SchedulerServiceTest extends AbstractTestBase {
         scheduler.init();
         StartupProperties.get().setProperty("dag.engine.impl", MockDAGEngine.class.getName());
         mockDagEngine =  DAGEngineFactory.getDAGEngine("testCluster");
+
     }
 
     @AfterClass
@@ -199,7 +204,7 @@ public class SchedulerServiceTest extends AbstractTestBase {
                 WorkflowJob.Status.SUCCEEDED, DateTime.now()));
         // Dependency now satisfied. Now, the first instance should get scheduled after retry delay.
         Thread.sleep(100);
-        Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1));
+        Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
new file mode 100644
index 0000000..48c1426
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.tools.FalconStateStoreDBCLI;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * TestBase for tests in scheduler.
+ */
+public class AbstractSchedulerTestBase extends AbstractTestBase {
+    private static final String DB_BASE_DIR = "target/test-data/falcondb";
+    protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db";
+    protected static String url = "jdbc:derby:"+ dbLocation +";create=true";
+    protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql";
+    protected static final String DB_UPDATE_SQL_FILE = DB_BASE_DIR + File.separator + "update.sql";
+    protected LocalFileSystem fs = new LocalFileSystem();
+
+    public void setup() throws Exception {
+        StartupProperties.get();
+        StartupProperties.get().setProperty(FalconJPAService.URL, url);
+        Configuration localConf = new Configuration();
+        fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf);
+        fs.mkdirs(new Path(DB_BASE_DIR));
+    }
+
+    public void cleanup() throws IOException {
+        cleanupDB();
+    }
+
+    private void cleanupDB() throws IOException {
+        fs.delete(new Path(DB_BASE_DIR), true);
+    }
+
+    protected void createDB(String file) {
+        File sqlFile = new File(file);
+        String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" };
+        int result = execDBCLICommands(argsCreate);
+        Assert.assertEquals(0, result);
+        Assert.assertTrue(sqlFile.exists());
+
+    }
+
+    protected int execDBCLICommands(String[] args) {
+        return new FalconStateStoreDBCLI().run(args);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java b/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
index 2f32b43..6676754 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
@@ -18,55 +18,72 @@
 package org.apache.falcon.state;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.exception.InvalidStateTransitionException;
+import org.apache.falcon.exception.StateStoreException;
 import org.apache.falcon.state.store.AbstractStateStore;
-import org.apache.falcon.state.store.InMemoryStateStore;
+import org.apache.falcon.util.StartupProperties;
 import org.mockito.Mockito;
 import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 /**
  * Tests to ensure entity state changes happen correctly.
  */
-public class EntityStateServiceTest {
+public class EntityStateServiceTest extends AbstractSchedulerTestBase{
 
     private EntityStateChangeHandler listener = Mockito.mock(EntityStateChangeHandler.class);
 
-    @BeforeMethod
-    public void setUp() {
-        ((InMemoryStateStore) AbstractStateStore.get()).clear();
+    @BeforeClass
+    public void setup() throws Exception {
+        StartupProperties.get().setProperty("falcon.state.store.impl",
+                "org.apache.falcon.state.store.InMemoryStateStore");
+        super.setup();
+        this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+        this.conf = dfsCluster.getConf();
+    }
+
+    @AfterMethod
+    public void setUp() throws StateStoreException {
+        AbstractStateStore.get().clear();
     }
 
     // Tests a schedulable entity's lifecycle : Submit -> run -> suspend -> resume
     @Test
-    public void testLifeCycle() throws FalconException {
+    public void testLifeCycle() throws Exception {
         Process mockEntity = new Process();
         mockEntity.setName("test");
-
+        storeEntity(EntityType.PROCESS, "test");
         StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUBMIT, listener);
         EntityState entityFromStore = AbstractStateStore.get().getAllEntities().iterator().next();
         Mockito.verify(listener).onSubmit(mockEntity);
         Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SUBMITTED));
         StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SCHEDULE, listener);
         Mockito.verify(listener).onSchedule(mockEntity);
+        entityFromStore = AbstractStateStore.get().getAllEntities().iterator().next();
         Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SCHEDULED));
         StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUSPEND, listener);
         Mockito.verify(listener).onSuspend(mockEntity);
+        entityFromStore = AbstractStateStore.get().getAllEntities().iterator().next();
         Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SUSPENDED));
         StateService.get().handleStateChange(mockEntity, EntityState.EVENT.RESUME, listener);
         Mockito.verify(listener).onResume(mockEntity);
+        entityFromStore = AbstractStateStore.get().getAllEntities().iterator().next();
         Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SCHEDULED));
     }
 
     @Test
-    public void testInvalidTransitions() throws FalconException {
+    public void testInvalidTransitions() throws Exception {
         Feed mockEntity = new Feed();
         mockEntity.setName("test");
+        storeEntity(EntityType.FEED, "test");
         StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUBMIT, listener);
         // Attempt suspending a submitted entity
         try {
@@ -99,10 +116,10 @@ public class EntityStateServiceTest {
 
     @Test(dataProvider = "state_and_events")
     public void testIdempotency(EntityState.STATE state, EntityState.EVENT event)
-        throws InvalidStateTransitionException {
+        throws Exception {
         Process mockEntity = new Process();
         mockEntity.setName("test");
-
+        storeEntity(EntityType.PROCESS, "test");
         EntityState entityState = new EntityState(mockEntity).setCurrentState(state);
         entityState.nextTransition(event);
         Assert.assertEquals(entityState.getCurrentState(), state);

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java b/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
index 43c3c54..f0ae7b2 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
@@ -23,11 +23,12 @@ import org.apache.falcon.exception.InvalidStateTransitionException;
 import org.apache.falcon.exception.StateStoreException;
 import org.apache.falcon.execution.ProcessExecutionInstance;
 import org.apache.falcon.state.store.AbstractStateStore;
-import org.apache.falcon.state.store.InMemoryStateStore;
+import org.apache.falcon.util.StartupProperties;
 import org.joda.time.DateTime;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -40,6 +41,12 @@ public class InstanceStateServiceTest {
     private InstanceStateChangeHandler listener = Mockito.mock(InstanceStateChangeHandler.class);
     private ProcessExecutionInstance mockInstance;
 
+    @BeforeClass
+    public void init() {
+        StartupProperties.get().setProperty("falcon.state.store.impl",
+                "org.apache.falcon.state.store.InMemoryStateStore");
+    }
+
     @BeforeMethod
     public void setup() {
         Process testProcess = new Process();
@@ -47,13 +54,14 @@ public class InstanceStateServiceTest {
         // Setup new mocks so we can verify the no. of invocations
         mockInstance = Mockito.mock(ProcessExecutionInstance.class);
         Mockito.when(mockInstance.getEntity()).thenReturn(testProcess);
+        Mockito.when(mockInstance.getCreationTime()).thenReturn(DateTime.now());
         Mockito.when(mockInstance.getInstanceTime()).thenReturn(DateTime.now());
         Mockito.when(mockInstance.getCluster()).thenReturn("testCluster");
     }
 
     @AfterMethod
-    public void tearDown() {
-        ((InMemoryStateStore) AbstractStateStore.get()).clear();
+    public void tearDown() throws StateStoreException {
+        AbstractStateStore.get().clear();
     }
 
     // Tests an entity instance's lifecycle : Trigger -> waiting -> ready -> running
@@ -67,18 +75,28 @@ public class InstanceStateServiceTest {
         Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.WAITING));
         StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.CONDITIONS_MET, listener);
         Mockito.verify(listener).onConditionsMet(mockInstance);
+        instanceFromStore = AbstractStateStore.get()
+                .getExecutionInstance(new InstanceID(mockInstance));
         Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.READY));
         StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SCHEDULE, listener);
         Mockito.verify(listener).onSchedule(mockInstance);
+        instanceFromStore = AbstractStateStore.get()
+                .getExecutionInstance(new InstanceID(mockInstance));
         Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.RUNNING));
         StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SUSPEND, listener);
         Mockito.verify(listener).onSuspend(mockInstance);
+        instanceFromStore = AbstractStateStore.get()
+                .getExecutionInstance(new InstanceID(mockInstance));
         Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.SUSPENDED));
         StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.RESUME_RUNNING, listener);
         Mockito.verify(listener).onResume(mockInstance);
+        instanceFromStore = AbstractStateStore.get()
+                .getExecutionInstance(new InstanceID(mockInstance));
         Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.RUNNING));
         StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SUCCEED, listener);
         Mockito.verify(listener).onSuccess(mockInstance);
+        instanceFromStore = AbstractStateStore.get()
+                .getExecutionInstance(new InstanceID(mockInstance));
         Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.SUCCEEDED));
         Assert.assertEquals(AbstractStateStore.get().getAllEntities().size(), 0);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java b/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java
new file mode 100644
index 0000000..ecd5293
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.service;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.state.AbstractSchedulerTestBase;
+import org.apache.falcon.state.store.service.FalconJPAService;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.persistence.EntityManager;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Test cases for FalconJPAService.
+ */
+public class TestFalconJPAService extends AbstractSchedulerTestBase {
+
+    private static FalconJPAService falconJPAService = FalconJPAService.get();
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        super.setup();
+        createDB(DB_SQL_FILE);
+    }
+
+    @Test
+    public void testService() throws FalconException {
+        // initialize it
+        falconJPAService.init();
+        EntityManager entityManager = falconJPAService.getEntityManager();
+        Map<String, Object> props = entityManager.getProperties();
+        Assert.assertNotNull(props);
+        entityManager.close();
+    }
+
+    @AfterClass
+    public void tearDown() throws FalconException, IOException {
+        falconJPAService.destroy();
+        super.cleanup();
+    }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
new file mode 100644
index 0000000..6d5bd49
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.service.store;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.exception.StateStoreException;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.execution.FalconExecutionService;
+import org.apache.falcon.execution.MockDAGEngine;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.impl.AlarmService;
+import org.apache.falcon.notification.service.impl.DataAvailabilityService;
+import org.apache.falcon.notification.service.impl.JobCompletionService;
+import org.apache.falcon.notification.service.impl.SchedulerService;
+import org.apache.falcon.predicate.Predicate;
+import org.apache.falcon.service.Services;
+import org.apache.falcon.state.AbstractSchedulerTestBase;
+import org.apache.falcon.state.EntityClusterID;
+import org.apache.falcon.state.EntityID;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceID;
+import org.apache.falcon.state.InstanceState;
+import org.apache.falcon.state.store.jdbc.BeanMapperUtil;
+import org.apache.falcon.state.store.jdbc.JDBCStateStore;
+import org.apache.falcon.state.store.StateStore;
+import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.engine.DAGEngine;
+import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.apache.falcon.workflow.engine.OozieDAGEngine;
+import org.joda.time.DateTime;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Test cases for JDBCStateStore.
+ */
+public class TestJDBCStateStore extends AbstractSchedulerTestBase {
+    private static StateStore stateStore = JDBCStateStore.get();
+    private static Random randomValGenerator = new Random();
+    private static FalconJPAService falconJPAService = FalconJPAService.get();
+    private AlarmService mockTimeService;
+    private DataAvailabilityService mockDataService;
+    private SchedulerService mockSchedulerService;
+    private JobCompletionService mockCompletionService;
+    private DAGEngine dagEngine;
+
+    @BeforeClass
+    public void setup() throws Exception {
+        super.setup();
+        createDB(DB_SQL_FILE);
+        falconJPAService.init();
+        this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+        this.conf = dfsCluster.getConf();
+        registerServices();
+    }
+
+    private void registerServices() throws FalconException {
+        mockTimeService = Mockito.mock(AlarmService.class);
+        Mockito.when(mockTimeService.getName()).thenReturn("AlarmService");
+        Mockito.when(mockTimeService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+                Mockito.any(ID.class))).thenCallRealMethod();
+        mockDataService = Mockito.mock(DataAvailabilityService.class);
+        Mockito.when(mockDataService.getName()).thenReturn("DataAvailabilityService");
+        Mockito.when(mockDataService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+                Mockito.any(ID.class))).thenCallRealMethod();
+        dagEngine = Mockito.mock(OozieDAGEngine.class);
+        Mockito.doNothing().when(dagEngine).resume(Mockito.any(ExecutionInstance.class));
+        mockSchedulerService = Mockito.mock(SchedulerService.class);
+        Mockito.when(mockSchedulerService.getName()).thenReturn("JobSchedulerService");
+        StartupProperties.get().setProperty("dag.engine.impl", MockDAGEngine.class.getName());
+        StartupProperties.get().setProperty("execution.service.impl", FalconExecutionService.class.getName());
+        dagEngine = Mockito.spy(DAGEngineFactory.getDAGEngine("testCluster"));
+        Mockito.when(mockSchedulerService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+                Mockito.any(ID.class))).thenCallRealMethod();
+        mockCompletionService = Mockito.mock(JobCompletionService.class);
+        Mockito.when(mockCompletionService.getName()).thenReturn("JobCompletionService");
+        Mockito.when(mockCompletionService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+                Mockito.any(ID.class))).thenCallRealMethod();
+        Services.get().register(mockTimeService);
+        Services.get().register(mockDataService);
+        Services.get().register(mockSchedulerService);
+        Services.get().register(mockCompletionService);
+    }
+
+
+    @Test
+    public void testInsertRetrieveAndUpdate() throws Exception {
+        EntityState entityState = getEntityState(EntityType.PROCESS, "process");
+        stateStore.putEntity(entityState);
+        EntityID entityID = new EntityID(entityState.getEntity());
+        EntityState actualEntityState = stateStore.getEntity(entityID);
+        Assert.assertEquals(actualEntityState.getEntity(), entityState.getEntity());
+        Assert.assertEquals(actualEntityState.getCurrentState(), entityState.getCurrentState());
+        try {
+            stateStore.putEntity(entityState);
+            Assert.fail("Exception must have been thrown");
+        } catch (StateStoreException e) {
+            //no op
+        }
+
+        entityState.setCurrentState(EntityState.STATE.SCHEDULED);
+        stateStore.updateEntity(entityState);
+        actualEntityState = stateStore.getEntity(entityID);
+        Assert.assertEquals(actualEntityState.getEntity(), entityState.getEntity());
+        Assert.assertEquals(actualEntityState.getCurrentState(), entityState.getCurrentState());
+
+        stateStore.deleteEntity(entityID);
+        boolean entityExists = stateStore.entityExists(entityID);
+        Assert.assertEquals(entityExists, false);
+
+        try {
+            stateStore.getEntity(entityID);
+            Assert.fail("Exception must have been thrown");
+        } catch (StateStoreException e){
+            // no op
+        }
+
+        try {
+            stateStore.updateEntity(entityState);
+            Assert.fail("Exception must have been thrown");
+        } catch (StateStoreException e) {
+            // no op
+        }
+
+        try {
+            stateStore.deleteEntity(entityID);
+            Assert.fail("Exception must have been thrown");
+        } catch (StateStoreException e){
+            // no op
+        }
+    }
+
+
+    @Test
+    public void testGetEntities() throws Exception {
+        EntityState entityState1 = getEntityState(EntityType.PROCESS, "process1");
+        EntityState entityState2 = getEntityState(EntityType.PROCESS, "process2");
+        EntityState entityState3 = getEntityState(EntityType.FEED, "feed1");
+
+        Collection<EntityState> result = stateStore.getAllEntities();
+        Assert.assertEquals(result.size(), 0);
+
+        stateStore.putEntity(entityState1);
+        stateStore.putEntity(entityState2);
+        stateStore.putEntity(entityState3);
+
+        result = stateStore.getAllEntities();
+        Assert.assertEquals(result.size(), 3);
+
+        Collection<Entity> entities = stateStore.getEntities(EntityState.STATE.SUBMITTED);
+        Assert.assertEquals(entities.size(), 3);
+    }
+
+
+    @Test
+    public void testInstanceInsertionAndUpdate() throws Exception {
+        storeEntity(EntityType.CLUSTER, "testCluster");
+        storeEntity(EntityType.FEED, "clicksFeed");
+        storeEntity(EntityType.FEED, "clicksSummary");
+        EntityState entityState = getEntityState(EntityType.PROCESS, "process");
+        ExecutionInstance executionInstance = BeanMapperUtil.getExecutionInstance(
+                entityState.getEntity().getEntityType(), entityState.getEntity(),
+                System.currentTimeMillis(), "cluster", System.currentTimeMillis());
+        InstanceState instanceState = new InstanceState(executionInstance);
+        initInstanceState(instanceState);
+        stateStore.putExecutionInstance(instanceState);
+        InstanceID instanceID = new InstanceID(instanceState.getInstance());
+        InstanceState actualInstanceState = stateStore.getExecutionInstance(instanceID);
+        Assert.assertEquals(actualInstanceState, instanceState);
+
+        instanceState.setCurrentState(InstanceState.STATE.RUNNING);
+        Predicate predicate = new Predicate(Predicate.TYPE.DATA);
+        instanceState.getInstance().getAwaitingPredicates().add(predicate);
+
+        stateStore.updateExecutionInstance(instanceState);
+        actualInstanceState = stateStore.getExecutionInstance(instanceID);
+        Assert.assertEquals(actualInstanceState, instanceState);
+
+        try {
+            stateStore.putExecutionInstance(instanceState);
+            Assert.fail("Exception must have been thrown");
+        } catch (StateStoreException e) {
+            // no op
+        }
+
+        stateStore.deleteExecutionInstance(instanceID);
+
+        try {
+            stateStore.getExecutionInstance(instanceID);
+            Assert.fail("Exception must have been thrown");
+        } catch (StateStoreException e) {
+            // no op
+        }
+
+        try {
+            stateStore.deleteExecutionInstance(instanceID);
+            Assert.fail("Exception must have been thrown");
+        } catch (StateStoreException e) {
+            // no op
+        }
+
+        try {
+            stateStore.updateExecutionInstance(instanceState);
+            Assert.fail("Exception must have been thrown");
+        } catch (StateStoreException e) {
+            // no op
+        }
+    }
+
+
+    @Test
+    public void testBulkInstanceOperations() throws Exception {
+        storeEntity(EntityType.CLUSTER, "testCluster");
+        storeEntity(EntityType.FEED, "clicksFeed");
+        storeEntity(EntityType.FEED, "clicksSummary");
+        EntityState entityState = getEntityState(EntityType.PROCESS, "process1");
+        ExecutionInstance processExecutionInstance1 = BeanMapperUtil.getExecutionInstance(
+                entityState.getEntity().getEntityType(), entityState.getEntity(),
+                System.currentTimeMillis() - 60000, "cluster1", System.currentTimeMillis() - 60000);
+        InstanceState instanceState1 = new InstanceState(processExecutionInstance1);
+        instanceState1.setCurrentState(InstanceState.STATE.READY);
+
+        ExecutionInstance processExecutionInstance2 = BeanMapperUtil.getExecutionInstance(
+                entityState.getEntity().getEntityType(), entityState.getEntity(),
+                System.currentTimeMillis(), "cluster1", System.currentTimeMillis());
+        InstanceState instanceState2 = new InstanceState(processExecutionInstance2);
+        instanceState2.setCurrentState(InstanceState.STATE.RUNNING);
+
+        ExecutionInstance processExecutionInstance3 = BeanMapperUtil.getExecutionInstance(
+                entityState.getEntity().getEntityType(), entityState.getEntity(),
+                System.currentTimeMillis(), "cluster2", System.currentTimeMillis());
+        InstanceState instanceState3 = new InstanceState(processExecutionInstance3);
+        instanceState3.setCurrentState(InstanceState.STATE.READY);
+
+        stateStore.putExecutionInstance(instanceState1);
+        stateStore.putExecutionInstance(instanceState2);
+        stateStore.putExecutionInstance(instanceState3);
+
+        Collection<InstanceState> actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(),
+                "cluster1");
+        Assert.assertEquals(actualInstances.size(), 2);
+        Assert.assertEquals(actualInstances.toArray()[0], instanceState1);
+        Assert.assertEquals(actualInstances.toArray()[1], instanceState2);
+
+        actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(),
+                "cluster2");
+        Assert.assertEquals(actualInstances.size(), 1);
+        Assert.assertEquals(actualInstances.toArray()[0], instanceState3);
+
+        List<InstanceState.STATE> states = new ArrayList<>();
+        states.add(InstanceState.STATE.READY);
+
+        actualInstances = stateStore.getExecutionInstances(entityState.getEntity(), "cluster1", states);
+        Assert.assertEquals(actualInstances.size(), 1);
+        Assert.assertEquals(actualInstances.toArray()[0], instanceState1);
+
+        EntityClusterID entityClusterID = new EntityClusterID(entityState.getEntity(), "testCluster");
+        actualInstances = stateStore.getExecutionInstances(entityClusterID, states);
+        Assert.assertEquals(actualInstances.size(), 2);
+        Assert.assertEquals(actualInstances.toArray()[0], instanceState1);
+        Assert.assertEquals(actualInstances.toArray()[1], instanceState3);
+
+        states.add(InstanceState.STATE.RUNNING);
+        actualInstances = stateStore.getExecutionInstances(entityState.getEntity(), "cluster1", states);
+        Assert.assertEquals(actualInstances.size(), 2);
+        Assert.assertEquals(actualInstances.toArray()[0], instanceState1);
+        Assert.assertEquals(actualInstances.toArray()[1], instanceState2);
+
+        InstanceState lastInstanceState = stateStore.getLastExecutionInstance(entityState.getEntity(), "cluster1");
+        Assert.assertEquals(lastInstanceState, instanceState2);
+
+
+        InstanceID instanceKey = new InstanceID(instanceState3.getInstance());
+        stateStore.deleteExecutionInstance(instanceKey);
+
+        actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster2");
+        Assert.assertEquals(actualInstances.size(), 0);
+
+        actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster1");
+        Assert.assertEquals(actualInstances.size(), 2);
+
+        stateStore.putExecutionInstance(instanceState3);
+
+        actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster2");
+        Assert.assertEquals(actualInstances.size(), 1);
+
+        stateStore.deleteExecutionInstances(entityClusterID.getEntityID());
+        actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster1");
+        Assert.assertEquals(actualInstances.size(), 0);
+
+        actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster2");
+        Assert.assertEquals(actualInstances.size(), 0);
+
+    }
+
+
+    @Test
+    public void testGetExecutionInstancesWithRange() throws Exception {
+        storeEntity(EntityType.CLUSTER, "testCluster");
+        storeEntity(EntityType.FEED, "clicksFeed");
+        storeEntity(EntityType.FEED, "clicksSummary");
+
+        long instance1Time = System.currentTimeMillis() - 180000;
+        long instance2Time = System.currentTimeMillis();
+        EntityState entityState = getEntityState(EntityType.PROCESS, "process1");
+        ExecutionInstance processExecutionInstance1 = BeanMapperUtil.getExecutionInstance(
+                entityState.getEntity().getEntityType(), entityState.getEntity(),
+                instance1Time, "cluster1", instance1Time);
+        InstanceState instanceState1 = new InstanceState(processExecutionInstance1);
+        instanceState1.setCurrentState(InstanceState.STATE.RUNNING);
+
+        ExecutionInstance processExecutionInstance2 = BeanMapperUtil.getExecutionInstance(
+                entityState.getEntity().getEntityType(), entityState.getEntity(),
+                instance2Time, "cluster1", instance2Time);
+        InstanceState instanceState2 = new InstanceState(processExecutionInstance2);
+        instanceState2.setCurrentState(InstanceState.STATE.RUNNING);
+
+        stateStore.putExecutionInstance(instanceState1);
+        stateStore.putExecutionInstance(instanceState2);
+
+        List<InstanceState.STATE> states = new ArrayList<>();
+        states.add(InstanceState.STATE.RUNNING);
+
+        Collection<InstanceState> actualInstances = stateStore.getExecutionInstances(entityState.getEntity(),
+                "cluster1", states, new DateTime(instance1Time), new DateTime(instance1Time + 60000));
+        Assert.assertEquals(1, actualInstances.size());
+        Assert.assertEquals(instanceState1, actualInstances.toArray()[0]);
+
+        actualInstances = stateStore.getExecutionInstances(entityState.getEntity(),
+                "cluster1", states, new DateTime(instance2Time), new DateTime(instance2Time + 60000));
+        Assert.assertEquals(1, actualInstances.size());
+        Assert.assertEquals(instanceState2, actualInstances.toArray()[0]);
+
+    }
+
+
+    private void initInstanceState(InstanceState instanceState) {
+        instanceState.setCurrentState(InstanceState.STATE.READY);
+        instanceState.getInstance().setExternalID(RandomStringUtils.randomNumeric(6));
+        instanceState.getInstance().setInstanceSequence(randomValGenerator.nextInt());
+        instanceState.getInstance().setActualStart(new DateTime(System.currentTimeMillis()));
+        instanceState.getInstance().setActualEnd(new DateTime(System.currentTimeMillis()));
+        List<Predicate> predicates = new ArrayList<>();
+        Predicate predicate = new Predicate(Predicate.TYPE.JOB_COMPLETION);
+        predicates.add(predicate);
+        instanceState.getInstance().setAwaitingPredicates(predicates);
+    }
+
+    private EntityState getEntityState(EntityType entityType, String name) throws Exception {
+        storeEntity(entityType, name);
+        Entity entity = getStore().get(entityType, name);
+        Assert.assertNotNull(entity);
+        return new EntityState(entity);
+    }
+
+    @AfterTest
+    public void cleanUpTables() throws StateStoreException {
+        stateStore.deleteEntities();
+        stateStore.deleteExecutionInstances();
+    }
+
+    @AfterClass
+    public void cleanup() throws IOException {
+        super.cleanup();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/tools/TestFalconStateStoreDBCLI.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/tools/TestFalconStateStoreDBCLI.java b/scheduler/src/test/java/org/apache/falcon/tools/TestFalconStateStoreDBCLI.java
new file mode 100644
index 0000000..8a42830
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/tools/TestFalconStateStoreDBCLI.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.tools;
+
+
+import org.apache.falcon.state.AbstractSchedulerTestBase;
+import org.apache.falcon.util.BuildProperties;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+
+/**
+ * Tests for DB operations tool.
+ */
+public class TestFalconStateStoreDBCLI extends AbstractSchedulerTestBase {
+
+    @BeforeClass
+    public void setup() throws Exception {
+        super.setup();
+    }
+
+    @AfterClass
+    public void cleanup() throws IOException {
+        super.cleanup();
+    }
+
+
+    @Test
+    public void testFalconDBCLI() throws Exception {
+        File sqlFile = new File(DB_SQL_FILE);
+        String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" };
+        int result = execDBCLICommands(argsCreate);
+        Assert.assertEquals(0, result);
+        Assert.assertTrue(sqlFile.exists());
+
+        ByteArrayOutputStream data = new ByteArrayOutputStream();
+        PrintStream oldOut = System.out;
+        try {
+            // show versions
+            System.setOut(new PrintStream(data));
+            String[] argsVersion = { "version" };
+            Assert.assertEquals(0, execDBCLICommands(argsVersion));
+            Assert.assertTrue(data.toString().contains("db.version: "
+                    + BuildProperties.get().getProperty("project.version")));
+            // show help information
+            data.reset();
+            String[] argsHelp = { "help" };
+            Assert.assertEquals(0, execDBCLICommands(argsHelp));
+            Assert.assertTrue(data.toString().contains("falcondb create <OPTIONS> : Create Falcon DB schema"));
+            Assert.assertTrue(data.toString().contains("falcondb upgrade <OPTIONS> : Upgrade Falcon DB schema"));
+            // try run invalid command
+            data.reset();
+            String[] argsInvalidCommand = { "invalidCommand" };
+            Assert.assertEquals(1, execDBCLICommands(argsInvalidCommand));
+        } finally {
+            System.setOut(oldOut);
+        }
+        // generate an upgrade script
+        File update = new File(DB_UPDATE_SQL_FILE);
+
+        String[] argsUpgrade = { "upgrade", "-sqlfile", update.getAbsolutePath(), "-run" };
+        BuildProperties.get().setProperty("project.version", "99999-SNAPSHOT");
+        Assert.assertEquals(0, execDBCLICommands(argsUpgrade));
+
+        Assert.assertTrue(update.exists());
+    }
+
+}


Mime
View raw message