falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sowmya...@apache.org
Subject [1/3] falcon git commit: FALCON-1459 Ability to import from database. Contributed by Venkat Ramachandran
Date Thu, 29 Oct 2015 01:08:38 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 35006fe32 -> 89040a296


http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/resources/action/feed/import-sqoop-database-action.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/import-sqoop-database-action.xml b/oozie/src/main/resources/action/feed/import-sqoop-database-action.xml
new file mode 100644
index 0000000..34424c9
--- /dev/null
+++ b/oozie/src/main/resources/action/feed/import-sqoop-database-action.xml
@@ -0,0 +1,47 @@
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * -->
+
+<action name="db-import-sqoop" xmlns='uri:oozie:workflow:0.3'>
+    <sqoop xmlns="uri:oozie:sqoop-action:0.3">
+        <job-tracker>${jobTracker}</job-tracker>
+        <name-node>${nameNode}</name-node>
+        <configuration>
+            <property>
+                <name>mapred.job.queue.name</name>
+                <value>${queueName}</value>
+            </property>
+            <property>
+                <name>oozie.launcher.mapred.job.priority</name>
+                <value>${jobPriority}</value>
+            </property>
+            <property>
+                <name>mapred.compress.map.output</name>
+                <value>true</value>
+            </property>
+            <!-- Assuming the connectors are in oozie share lib -->
+            <property>
+            <!-- Will enable using sharelib -->
+                <name>oozie.use.system.libpath</name>
+                <value>true</value>
+            </property>
+        </configuration>
+        <command>${sqoopCommand}</command>
+    </sqoop>
+    <ok to="end"/>
+    <error to="fail"/>
+</action>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/oozie/src/main/resources/action/post-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/post-process.xml b/oozie/src/main/resources/action/post-process.xml
index df0d286..fa5d804 100644
--- a/oozie/src/main/resources/action/post-process.xml
+++ b/oozie/src/main/resources/action/post-process.xml
@@ -88,6 +88,8 @@
         <arg>${falconInputFeeds}</arg>
         <arg>-falconInPaths</arg>
         <arg>${falconInPaths}</arg>
+        <arg>-datasource</arg>
+        <arg>${datasource == 'NA' ? 'IGNORE' : datasource}</arg>
     </java>
     <ok to="end"/>
     <error to="fail"/>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 87c55e3..6f2c480 100644
--- a/pom.xml
+++ b/pom.xml
@@ -307,6 +307,9 @@
                                 <exclude>**/maven-eclipse.xml</exclude>
                                 <exclude>**/.externalToolBuilders/**</exclude>
                                 <exclude>html5-ui/**</exclude>
+                                <exclude>**/db1.log</exclude>
+                                <exclude>**/db1.properties</exclude>
+                                <exclude>**/db1.script</exclude>
                             </excludes>
                         </configuration>
                         <executions>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 9e4dc8f..0999c36 100644
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -568,6 +568,9 @@
 	          <exclude>**/data.txt</exclude>
 	          <exclude>**/maven-eclipse.xml</exclude>
 	          <exclude>**/.externalToolBuilders/**</exclude>
+                  <exclude>**/db1.log</exclude>
+                  <exclude>**/db1.properties</exclude>
+                  <exclude>**/db1.script</exclude>
                 </excludes>
               </configuration>
       	    </plugin>	

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java
new file mode 100644
index 0000000..8cc1273
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.util.HsqldbTestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import java.util.Map;
+
+/**
+ * Integration test for Feed Import.
+ */
+
+@Test
+public class FeedImportIT {
+    public static final Log LOG = LogFactory.getLog(HsqldbTestUtils.class.getName());
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        HsqldbTestUtils.start();
+        HsqldbTestUtils.changeSAPassword("sqoop");
+        HsqldbTestUtils.createAndPopulateCustomerTable();
+
+        TestContext.cleanupStore();
+        TestContext.prepare();
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        HsqldbTestUtils.tearDown();
+    }
+
+    @Test
+    public void testFeedImportHSql() throws Exception {
+        Assert.assertEquals(4, HsqldbTestUtils.getNumberOfRows());
+    }
+
+    @Test
+    public void testSqoopImport() throws Exception {
+        TestContext context = new TestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+        String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE,
overlay);
+        context.setCluster(filePath);
+        LOG.info("entity -submit -type cluster -file " + filePath);
+        Assert.assertEquals(TestContext.executeWithURL("entity -submit -type cluster -file
" + filePath), 0);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.DATASOURCE_TEMPLATE,
overlay);
+        LOG.info("entity -submit -type datasource -file " + filePath);
+        Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file
" + filePath), 0);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE3,
overlay);
+        LOG.info("entity -submitAndSchedule -type feed -file " + filePath);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type
feed -file "
+                + filePath));
+    }
+
+    @Test
+    public void testSqoopImportDeleteDatasource() throws Exception {
+        TestContext context = new TestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+        String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE,
overlay);
+        context.setCluster(filePath);
+        LOG.info("entity -submit -type cluster -file " + filePath);
+        Assert.assertEquals(TestContext.executeWithURL("entity -submit -type cluster -file
" + filePath), 0);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.DATASOURCE_TEMPLATE,
overlay);
+        LOG.info("entity -submit -type datasource -file " + filePath);
+        Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file
" + filePath), 0);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE3,
overlay);
+        LOG.info("entity -submit -type feed -file " + filePath);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type feed -file
"
+                + filePath));
+
+        LOG.info("entity -delete -type datasource -name datasource-test");
+        Assert.assertEquals(-1, TestContext.executeWithURL("entity -delete -type datasource
-name datasource-test"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index d067dee..0697b3d 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -90,6 +90,9 @@ import java.util.regex.Pattern;
 public class TestContext {
     public static final String FEED_TEMPLATE1 = "/feed-template1.xml";
     public static final String FEED_TEMPLATE2 = "/feed-template2.xml";
+    public static final String FEED_TEMPLATE3 = "/feed-template3.xml";
+
+    public static final String DATASOURCE_TEMPLATE = "/datasource-template.xml";
 
     public static final String CLUSTER_TEMPLATE = "/cluster-template.xml";
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/webapp/src/test/java/org/apache/falcon/util/HsqldbTestUtils.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/util/HsqldbTestUtils.java b/webapp/src/test/java/org/apache/falcon/util/HsqldbTestUtils.java
new file mode 100644
index 0000000..a92629f
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/util/HsqldbTestUtils.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.sql.SQLException;
+
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.hsqldb.Server;
+
+/**
+ * Create a simple hsqldb server and schema to use for testing.
+ */
+public final class HsqldbTestUtils {
+
+    public static final Log LOG = LogFactory.getLog(HsqldbTestUtils.class.getName());
+
+    // singleton server instance.
+    private static Server server;
+
+    private static final String IN_MEM = "mem:/";
+
+    private static boolean inMemoryDB = IN_MEM.equals(getServerHost());
+
+    private HsqldbTestUtils() {}
+
+    public static String getServerHost() {
+        String host = System.getProperty("hsql.server.host", IN_MEM);
+        host = "localhost";
+        if (!host.endsWith("/")) { host += "/"; }
+        return host;
+    }
+
+    // Database name can be altered too
+    private static final String DATABASE_NAME = System.getProperty("hsql.database.name",
 "db1");
+    private static final String CUSTOMER_TABLE_NAME = "CUSTOMER";
+    private static final String DB_URL = "jdbc:hsqldb:" + getServerHost() + DATABASE_NAME;
+    private static final String DRIVER_CLASS = "org.hsqldb.jdbcDriver";
+
+    public static String getUrl() {
+        return DB_URL;
+    }
+
+    public static String getDatabaseName() {
+        return DATABASE_NAME;
+    }
+
+    /**
+     * start the server.
+     */
+    public static void start() {
+        if (null == server) {
+            LOG.info("Starting new hsqldb server; database=" + DATABASE_NAME);
+            String tmpDir = System.getProperty("test.build.data", "/tmp/");
+            String dbLocation = tmpDir + "/falcon/testdb.file";
+            if (inMemoryDB) {dbLocation = IN_MEM; }
+            server = new Server();
+            server.setDatabaseName(0, DATABASE_NAME);
+            server.putPropertiesFromString("database.0=" + dbLocation
+                    + ";no_system_exit=true;");
+            server.start();
+            LOG.info("Started server with url=" + DB_URL);
+        }
+    }
+
+    public static void stop() {
+        if (null != server) {
+            server.stop();
+        }
+    }
+
+    public static void tearDown() throws SQLException {
+        dropExistingSchema();
+        stop();
+    }
+
+    public static void changeSAPassword(String passwd) throws Exception {
+        Connection connection = null;
+        Statement st = null;
+
+        LOG.info("Changing password for SA");
+        try {
+            connection = getConnectionSystem();
+
+            st = connection.createStatement();
+            st.executeUpdate("SET PASSWORD \"" + passwd + "\"");
+            connection.commit();
+        } finally {
+            if (null != st) {
+                st.close();
+            }
+
+            if (null != connection) {
+                connection.close();
+            }
+        }
+    }
+    private static Connection getConnectionSystem() throws SQLException {
+        return getConnection("SA", "");
+    }
+
+    private static Connection getConnection() throws SQLException {
+        return getConnection("SA", "sqoop");
+    }
+    private static Connection getConnection(String user, String password) throws SQLException
{
+        try {
+            Class.forName(DRIVER_CLASS);
+        } catch (ClassNotFoundException cnfe) {
+            LOG.error("Could not get connection; driver class not found: "
+                    + DRIVER_CLASS);
+            return null;
+        }
+        Connection connection = DriverManager.getConnection(DB_URL, user, password);
+        connection.setAutoCommit(false);
+        return connection;
+    }
+
+    /**
+     * Returns database URL for the server instance.
+     * @return String representation of DB_URL
+     */
+    public static String getDbUrl() {
+        return DB_URL;
+    }
+
+    public static int getNumberOfRows() throws SQLException {
+        Connection connection = null;
+        Statement st = null;
+        try {
+            connection = getConnection();
+
+            st = connection.createStatement();
+            ResultSet rs = st.executeQuery("SELECT COUNT(*) FROM " + CUSTOMER_TABLE_NAME);
+            int rowCount = 0;
+            if (rs.next()) {
+                rowCount = rs.getInt(1);
+            }
+            return rowCount;
+        } finally {
+            if (null != st) {
+                st.close();
+            }
+
+            if (null != connection) {
+                connection.close();
+            }
+        }
+    }
+
+    public static void createAndPopulateCustomerTable() throws SQLException, ClassNotFoundException
{
+
+        LOG.info("createAndPopulateCustomerTable");
+        Connection connection = null;
+        Statement st = null;
+        try {
+            connection = getConnection();
+
+            st = connection.createStatement();
+            st.executeUpdate("DROP TABLE " + CUSTOMER_TABLE_NAME + " IF EXISTS");
+            st.executeUpdate("CREATE TABLE " + CUSTOMER_TABLE_NAME + "(id INT NOT NULL PRIMARY
KEY, name VARCHAR(64))");
+
+            st.executeUpdate("INSERT INTO " + CUSTOMER_TABLE_NAME + " VALUES(1, 'Apple')");
+            st.executeUpdate("INSERT INTO " + CUSTOMER_TABLE_NAME + " VALUES(2, 'Blackberry')");
+            st.executeUpdate("INSERT INTO " + CUSTOMER_TABLE_NAME + " VALUES(3, 'Caterpillar')");
+            st.executeUpdate("INSERT INTO " + CUSTOMER_TABLE_NAME + " VALUES(4, 'DuPont')");
+
+            connection.commit();
+        } finally {
+            if (null != st) {
+                st.close();
+            }
+
+            if (null != connection) {
+                connection.close();
+            }
+        }
+    }
+
+    /**
+     * Delete any existing tables.
+     */
+    public static void dropExistingSchema() throws SQLException {
+        String [] tables = listTables();
+        if (null != tables) {
+            Connection conn = getConnection();
+            for (String table : tables) {
+                Statement s = conn.createStatement();
+                try {
+                    s.executeUpdate("DROP TABLE " + table);
+                    conn.commit();
+                } finally {
+                    s.close();
+                }
+            }
+        }
+    }
+
+    public static String[] listTables() {
+        ResultSet results = null;
+        String [] tableTypes = {"TABLE"};
+        try {
+            try {
+                DatabaseMetaData metaData = getConnection().getMetaData();
+                results = metaData.getTables(null, null, null, tableTypes);
+            } catch (SQLException sqlException) {
+                LOG.error("Error reading database metadata: "
+                        + sqlException.toString(), sqlException);
+                return null;
+            }
+
+            if (null == results) {
+                return null;
+            }
+
+            try {
+                ArrayList<String> tables = new ArrayList<String>();
+                while (results.next()) {
+                    String tableName = results.getString("TABLE_NAME");
+                    tables.add(tableName);
+                }
+
+                return tables.toArray(new String[0]);
+            } catch (SQLException sqlException) {
+                LOG.error("Error reading from database: "
+                        + sqlException.toString(), sqlException);
+                return null;
+            }
+        } finally {
+            if (null != results) {
+                try {
+                    results.close();
+                    getConnection().commit();
+                } catch (SQLException sqlE) {
+                    LOG.error("Exception closing ResultSet: "
+                            + sqlE.toString(), sqlE);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/webapp/src/test/resources/datasource-template.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/datasource-template.xml b/webapp/src/test/resources/datasource-template.xml
new file mode 100644
index 0000000..fb7a329
--- /dev/null
+++ b/webapp/src/test/resources/datasource-template.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<datasource colo="##colo##" description="" type="hsql" name="datasource-test" xmlns="uri:falcon:datasource:0.1">
+    <interfaces>
+        <interface type="readonly" endpoint="jdbc:hsqldb:localhost/db1">
+            <credential type="password-text">
+                <userName>SA</userName>
+                <passwordText></passwordText>
+            </credential>
+        </interface>
+
+        <interface type="write" endpoint="jdbc:hsqldb:localhost/db1">
+            <credential type="password-text">
+                <userName>SA</userName>
+                <passwordText>sqoop</passwordText>
+            </credential>
+        </interface>
+
+        <credential type="password-text">
+            <userName>SA</userName>
+            <passwordText>sqoop</passwordText>
+        </credential>
+    </interfaces>
+
+    <driver>
+       <clazz>org.hsqldb.jdbcDriver</clazz>
+       <jar>/user/oozie/share/lib/lib_20150721010816/sqoop/hsqldb-1.8.0.7.jar</jar>
+    </driver>
+</datasource>

http://git-wip-us.apache.org/repos/asf/falcon/blob/89040a29/webapp/src/test/resources/feed-template3.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/feed-template3.xml b/webapp/src/test/resources/feed-template3.xml
new file mode 100644
index 0000000..a6c1d6b
--- /dev/null
+++ b/webapp/src/test/resources/feed-template3.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="Customer table from RDB" name="##inputFeedName##" xmlns="uri:falcon:feed:0.1">
+    <groups>input</groups>
+
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="##cluster##" type="source">
+            <validity start="2010-01-01T00:00Z" end="2020-04-21T00:00Z"/>
+            <retention limit="hours(24)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <import>
+                <source name="datasource-test" tableName="simple">
+                    <extract type="full">
+                        <mergepolicy>snapshot</mergepolicy>
+                    </extract>
+                    <fields>
+                        <includes>
+                            <field>id</field>
+                            <field>name</field>
+                        </includes>
+                    </fields>
+                </source>
+                <arguments>
+                    <argument name="--split-by" value="id"/>
+                    <argument name="--num-mappers" value="2"/>
+                </arguments>
+            </import>
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/falcon/test/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="##user##" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>


Mime
View raw message