falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject falcon git commit: FALCON-1938 Add support to execute Spark SQL process
Date Fri, 01 Jul 2016 16:56:35 GMT
Repository: falcon
Updated Branches:
  refs/heads/0.10 fbe84bc10 -> 7871dce21


FALCON-1938 Add support to execute Spark SQL process

Author: peeyush b <pbishnoi@hortonworks.com>

Reviewers: "Venkat Ranganathan  <venkat@hortonworks.com>"

Closes #188 from peeyushb/FALCON-1938

(cherry picked from commit c12c999b4cc89bd3fbe24873567f441eed02a4ef)
Signed-off-by: bvellanki <bvellanki@hortonworks.com>


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7871dce2
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7871dce2
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7871dce2

Branch: refs/heads/0.10
Commit: 7871dce21273aa55eba49afee21fd14892113411
Parents: fbe84bc
Author: peeyush b <pbishnoi@hortonworks.com>
Authored: Fri Jul 1 09:56:21 2016 -0700
Committer: bvellanki <bvellanki@hortonworks.com>
Committed: Fri Jul 1 09:56:31 2016 -0700

----------------------------------------------------------------------
 docs/src/site/twiki/EntitySpecification.twiki   | 23 ++++++
 examples/entity/spark/spark-sql-process.xml     | 55 +++++++++++++++
 examples/pom.xml                                | 10 +++
 .../example/spark/SparkSQLProcessTable.java     | 51 ++++++++++++++
 .../process/SparkProcessWorkflowBuilder.java    |  9 +++
 .../OozieProcessWorkflowBuilderTest.java        | 73 ++++++++++++++++++++
 .../config/process/spark-sql-process.xml        | 53 ++++++++++++++
 7 files changed, 274 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/7871dce2/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index 278dc0e..9f9e210 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -927,6 +927,29 @@ Input and Output data to the Spark application will be set as argument
when Spar
 In the set of arguments, first argument will always correspond to input feed, second argument
will always correspond to output feed and then user's provided argument will be set.
 
 
+For running the Spark SQL process entity, that read and write the data stored on Hive, the
datanucleus jars under the $HIVE_HOME/lib directory and hive-site.xml
+under $SPARK_HOME/conf/ directory need to be available on the driver and all executors launched
by the YARN cluster.
+The convenient way to do this is adding them through the --jars option and --file option
of the spark-opts attribute.
+Example:
+<verbatim>
+<process name="spark-process">
+...
+    <workflow engine="spark" path="/resources/action">
+    <spark-attributes>
+        <master>local</master>
+        <name>Spark SQL</name>
+        <class>org.examples.SparkSQLProcessTable</class>
+        <jar>/resources/action/lib/spark-application.jar</jar>
+        <spark-opts>--num-executors 1 --driver-memory 512m --jars /usr/local/hive/lib/datanucleus-rdbms.jar,/usr/local/hive/lib/datanucleus-core.jar,/usr/local/hive/lib/datanucleus-api-jdo.jar
--files /usr/local/spark/conf/hive-site.xml</spark-opts>
+    </spark-attributes>
+...
+</process>
+</verbatim>
+
+Input and Output to the Spark SQL application will be set as argument when Spark workflow
will be generated, if input and output feed entity is defined in the process entity.
+If input feed is of table type, then input table partition, table name and database name
will be set as input arguments. If output feed is of table type, then output table partition,
table name and database name will be set as output arguments.
+Once input and output arguments is set, then user's provided argument will be set.
+
 ---+++ Retry
 Retry policy defines how the workflow failures should be handled. Three retry policies are
defined: periodic, exp-backoff(exponential backoff) and final. Depending on the delay and
number of attempts, the workflow is re-tried after specific intervals. If user sets the onTimeout
attribute to "true", retries will happen for TIMED_OUT instances.
 Syntax:

http://git-wip-us.apache.org/repos/asf/falcon/blob/7871dce2/examples/entity/spark/spark-sql-process.xml
----------------------------------------------------------------------
diff --git a/examples/entity/spark/spark-sql-process.xml b/examples/entity/spark/spark-sql-process.xml
new file mode 100644
index 0000000..cdd2ccc
--- /dev/null
+++ b/examples/entity/spark/spark-sql-process.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="spark-sql-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="hcat-local">
+            <validity start="2013-11-15T00:05Z" end="2013-11-15T01:05Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>minutes(5)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what -->
+    <inputs>
+        <!-- In the workflow, the input paths will be available in a variable 'inpaths'
-->
+        <input name="inparts" feed="hcat-in" start="now(0,-5)" end="now(0,-1)"/>
+    </inputs>
+
+    <outputs>
+        <!-- In the workflow, the output path will be available in a variable 'outpath'
-->
+        <output name="outpart" feed="hcat-out" instance="now(0,0)"/>
+    </outputs>
+
+    <workflow engine="spark" path="/app/spark"/>
+    <spark-attributes>
+        <master>local</master>
+        <name>Spark SQL</name>
+        <class>org.apache.falcon.example.spark.SparkSQLProcessTable</class>
+        <jar>/app/spark/lib/falcon-examples.jar</jar>
+        <spark-opts>--num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores
1</spark-opts>
+    </spark-attributes>
+
+    <retry policy="periodic" delay="minutes(3)" attempts="3"/>
+
+</process>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/7871dce2/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index a8ec659..a1aedf8 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -64,6 +64,16 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.10</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-hive_2.10</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/falcon/blob/7871dce2/examples/src/main/java/org/apache/falcon/example/spark/SparkSQLProcessTable.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/falcon/example/spark/SparkSQLProcessTable.java
b/examples/src/main/java/org/apache/falcon/example/spark/SparkSQLProcessTable.java
new file mode 100644
index 0000000..5e9f092
--- /dev/null
+++ b/examples/src/main/java/org/apache/falcon/example/spark/SparkSQLProcessTable.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.example.spark;
+
+import org.apache.spark.SparkContext;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.hive.HiveContext;
+
+/**
+ * Spark SQL Example.
+ */
+
+public final class SparkSQLProcessTable {
+
+    private SparkSQLProcessTable() {
+    }
+    public static void main(String[] args) {
+        if (args.length < 1) {
+            System.out.println("Arguments must contain details for input or output table");
+            System.exit(0);
+        }
+
+        SparkConf conf = new SparkConf().setAppName("SparkSQL example");
+        SparkContext sc = new SparkContext(conf);
+        HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);
+
+        String sqlQuery = "FROM " +args[2]+"."+args[1]+ " INSERT OVERWRITE TABLE " +args[5]+"."+args[4]
+                +" PARTITION("+args[3]+")  SELECT word, SUM(cnt) AS cnt WHERE "+args[0]+"
GROUP BY word";
+
+        DataFrame df = sqlContext.sql(sqlQuery);
+        df.show();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/falcon/blob/7871dce2/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
b/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
index 8c06711..5f4fafa 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
@@ -46,6 +46,7 @@ import java.util.List;
  */
 public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
     private static final String ACTION_TEMPLATE = "/action/process/spark-action.xml";
+    private static final String FALCON_PREFIX = "falcon_";
 
     public SparkProcessWorkflowBuilder(Process entity) {
         super(entity);
@@ -155,6 +156,10 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
             final String inputName = input.getName();
             if (storage.getType() == Storage.TYPE.FILESYSTEM) {
                 argList.add(0, "${" + inputName + "}");
+            } else if (storage.getType() == Storage.TYPE.TABLE) {
+                argList.add(0, "${" + FALCON_PREFIX+inputName+"_database" + "}");
+                argList.add(0, "${" + FALCON_PREFIX+inputName+"_table" + "}");
+                argList.add(0, "${" + FALCON_PREFIX+inputName+"_partition_filter_hive" +
"}");
             }
             numInputFeed--;
         }
@@ -174,6 +179,10 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
             final String outputName = output.getName();
             if (storage.getType() == Storage.TYPE.FILESYSTEM) {
                 argList.add(0, "${" + outputName + "}");
+            } else if (storage.getType() == Storage.TYPE.TABLE) {
+                argList.add(0, "${" + FALCON_PREFIX+outputName+"_database" + "}");
+                argList.add(0, "${" + FALCON_PREFIX+outputName+"_table" + "}");
+                argList.add(0, "${" + FALCON_PREFIX+outputName+"_partitions_hive" + "}");
             }
             numOutputFeed--;
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7871dce2/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 85100e7..30ff537 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -326,6 +326,79 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase
{
     }
 
     @Test
+    public void testSparkSQLProcess() throws Exception {
+        URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
+        Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, inFeed);
+
+        resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
+        Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, outFeed);
+
+        resource = this.getClass().getResource("/config/process/spark-sql-process.xml");
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+        prepare(process);
+        OozieEntityBuilder builder = OozieEntityBuilder.get(process);
+        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+        builder.build(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(fs, bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}",
"");
+
+        COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
+        HashMap<String, String> props = getCoordProperties(coord);
+        HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
+
+        verifyEntityProperties(process, cluster,
+                WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
+        verifyBrokerProperties(cluster, wfProps);
+
+        // verify table and hive props
+        Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process);
+        expected.putAll(ClusterHelper.getHiveProperties(cluster));
+        for (Map.Entry<String, String> entry : props.entrySet()) {
+            if (expected.containsKey(entry.getKey())) {
+                Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
+            }
+        }
+
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}",
"");
+        WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
+        testParentWorkflow(process, parentWorkflow);
+
+        ACTION sparkNode = getAction(parentWorkflow, "user-action");
+
+        JAXBElement<org.apache.falcon.oozie.spark.ACTION> actionJaxbElement =
+                OozieUtils.unMarshalSparkAction(sparkNode);
+        org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue();
+
+        assertEquals(sparkAction.getMaster(), "local");
+        assertEquals(sparkAction.getJar(), "jail://testCluster:00/resources/action/lib/falcon-examples.jar");
+
+        Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
+        List<String> argsList = sparkAction.getArg();
+
+        Input input = process.getInputs().getInputs().get(0);
+        Output output = process.getOutputs().getOutputs().get(0);
+
+        assertEquals(argsList.get(0), "${falcon_"+input.getName()+"_partition_filter_hive}");
+        assertEquals(argsList.get(1), "${falcon_"+input.getName()+"_table}");
+        assertEquals(argsList.get(2), "${falcon_"+input.getName()+"_database}");
+        assertEquals(argsList.get(3), "${falcon_"+output.getName()+"_partitions_hive}");
+        assertEquals(argsList.get(4), "${falcon_"+output.getName()+"_table}");
+        assertEquals(argsList.get(5), "${falcon_"+output.getName()+"_database}");
+
+        ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
+    }
+
+    @Test
     public void testSparkProcess() throws Exception {
 
         URL resource = this.getClass().getResource(SPARK_PROCESS_XML);

http://git-wip-us.apache.org/repos/asf/falcon/blob/7871dce2/oozie/src/test/resources/config/process/spark-sql-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/config/process/spark-sql-process.xml b/oozie/src/test/resources/config/process/spark-sql-process.xml
new file mode 100644
index 0000000..55ff89b
--- /dev/null
+++ b/oozie/src/test/resources/config/process/spark-sql-process.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="spark-sql-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what -->
+    <inputs>
+        <input name="input" feed="clicks-raw-table" start="yesterday(0,0)" end="yesterday(20,0)"/>
+    </inputs>
+
+    <outputs>
+        <output name="output" feed="clicks-summary-table" instance="today(0,0)"/>
+    </outputs>
+
+    <workflow engine="spark" path="/resources/action"/>
+    <spark-attributes>
+        <master>local</master>
+        <name>Spark SQL</name>
+        <class>org.apache.falcon.example.spark.SparkSQLProcessTable</class>
+        <jar>/resources/action/lib/falcon-examples.jar</jar>
+        <spark-opts>--num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores
1</spark-opts>
+    </spark-attributes>
+
+    <retry policy="periodic" delay="minutes(3)" attempts="3"/>
+
+</process>
\ No newline at end of file


Mime
View raw message