Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 35544200B40 for ; Fri, 1 Jul 2016 18:56:37 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 33E55160A61; Fri, 1 Jul 2016 16:56:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0C006160A4D for ; Fri, 1 Jul 2016 18:56:35 +0200 (CEST) Received: (qmail 14237 invoked by uid 500); 1 Jul 2016 16:56:35 -0000 Mailing-List: contact commits-help@falcon.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.apache.org Delivered-To: mailing list commits@falcon.apache.org Received: (qmail 14228 invoked by uid 99); 1 Jul 2016 16:56:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Jul 2016 16:56:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2424DE009D; Fri, 1 Jul 2016 16:56:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: balu@apache.org To: commits@falcon.apache.org Message-Id: <3955a11a8fa54b91b4e4dd51a79ffc99@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: falcon git commit: FALCON-1938 Add support to execute Spark SQL process Date: Fri, 1 Jul 2016 16:56:35 +0000 (UTC) archived-at: Fri, 01 Jul 2016 16:56:37 -0000 Repository: falcon Updated Branches: refs/heads/0.10 fbe84bc10 -> 7871dce21 FALCON-1938 Add support to execute Spark SQL process Author: peeyush b Reviewers: "Venkat Ranganathan " Closes #188 from peeyushb/FALCON-1938 (cherry picked from commit c12c999b4cc89bd3fbe24873567f441eed02a4ef) Signed-off-by: bvellanki Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7871dce2 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7871dce2 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7871dce2 Branch: refs/heads/0.10 Commit: 7871dce21273aa55eba49afee21fd14892113411 Parents: fbe84bc Author: peeyush b Authored: Fri Jul 1 09:56:21 2016 -0700 Committer: bvellanki Committed: Fri Jul 1 09:56:31 2016 -0700 ---------------------------------------------------------------------- docs/src/site/twiki/EntitySpecification.twiki | 23 ++++++ examples/entity/spark/spark-sql-process.xml | 55 +++++++++++++++ examples/pom.xml | 10 +++ .../example/spark/SparkSQLProcessTable.java | 51 ++++++++++++++ .../process/SparkProcessWorkflowBuilder.java | 9 +++ .../OozieProcessWorkflowBuilderTest.java | 73 ++++++++++++++++++++ .../config/process/spark-sql-process.xml | 53 ++++++++++++++ 7 files changed, 274 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/7871dce2/docs/src/site/twiki/EntitySpecification.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki index 278dc0e..9f9e210 100644 --- a/docs/src/site/twiki/EntitySpecification.twiki +++ b/docs/src/site/twiki/EntitySpecification.twiki @@ -927,6 +927,29 @@ Input and Output data to the Spark application will be set as argument when Spar In the set of arguments, first argument will always correspond to input feed, second argument will always correspond to output feed and then user's provided argument will be set. +For running the Spark SQL process entity, that read and write the data stored on Hive, the datanucleus jars under the $HIVE_HOME/lib directory and hive-site.xml +under $SPARK_HOME/conf/ directory need to be available on the driver and all executors launched by the YARN cluster. +The convenient way to do this is adding them through the --jars option and --file option of the spark-opts attribute. +Example: + + +... + + + local + Spark SQL + org.examples.SparkSQLProcessTable + /resources/action/lib/spark-application.jar + --num-executors 1 --driver-memory 512m --jars /usr/local/hive/lib/datanucleus-rdbms.jar,/usr/local/hive/lib/datanucleus-core.jar,/usr/local/hive/lib/datanucleus-api-jdo.jar --files /usr/local/spark/conf/hive-site.xml + +... + + + +Input and Output to the Spark SQL application will be set as argument when Spark workflow will be generated, if input and output feed entity is defined in the process entity. +If input feed is of table type, then input table partition, table name and database name will be set as input arguments. If output feed is of table type, then output table partition, table name and database name will be set as output arguments. +Once input and output arguments is set, then user's provided argument will be set. + ---+++ Retry Retry policy defines how the workflow failures should be handled. Three retry policies are defined: periodic, exp-backoff(exponential backoff) and final. Depending on the delay and number of attempts, the workflow is re-tried after specific intervals. If user sets the onTimeout attribute to "true", retries will happen for TIMED_OUT instances. Syntax: http://git-wip-us.apache.org/repos/asf/falcon/blob/7871dce2/examples/entity/spark/spark-sql-process.xml ---------------------------------------------------------------------- diff --git a/examples/entity/spark/spark-sql-process.xml b/examples/entity/spark/spark-sql-process.xml new file mode 100644 index 0000000..cdd2ccc --- /dev/null +++ b/examples/entity/spark/spark-sql-process.xml @@ -0,0 +1,55 @@ + + + + + + + + + + + + 1 + LIFO + minutes(5) + UTC + + + + + + + + + + + + + + + local + Spark SQL + org.apache.falcon.example.spark.SparkSQLProcessTable + /app/spark/lib/falcon-examples.jar + --num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1 + + + + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/7871dce2/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml index a8ec659..a1aedf8 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -64,6 +64,16 @@ + org.apache.spark + spark-sql_2.10 + ${spark.version} + + + org.apache.spark + spark-hive_2.10 + ${spark.version} + + org.testng testng http://git-wip-us.apache.org/repos/asf/falcon/blob/7871dce2/examples/src/main/java/org/apache/falcon/example/spark/SparkSQLProcessTable.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/falcon/example/spark/SparkSQLProcessTable.java b/examples/src/main/java/org/apache/falcon/example/spark/SparkSQLProcessTable.java new file mode 100644 index 0000000..5e9f092 --- /dev/null +++ b/examples/src/main/java/org/apache/falcon/example/spark/SparkSQLProcessTable.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.example.spark; + +import org.apache.spark.SparkContext; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.hive.HiveContext; + +/** + * Spark SQL Example. + */ + +public final class SparkSQLProcessTable { + + private SparkSQLProcessTable() { + } + public static void main(String[] args) { + if (args.length < 1) { + System.out.println("Arguments must contain details for input or output table"); + System.exit(0); + } + + SparkConf conf = new SparkConf().setAppName("SparkSQL example"); + SparkContext sc = new SparkContext(conf); + HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc); + + String sqlQuery = "FROM " +args[2]+"."+args[1]+ " INSERT OVERWRITE TABLE " +args[5]+"."+args[4] + +" PARTITION("+args[3]+") SELECT word, SUM(cnt) AS cnt WHERE "+args[0]+" GROUP BY word"; + + DataFrame df = sqlContext.sql(sqlQuery); + df.show(); + } +} + http://git-wip-us.apache.org/repos/asf/falcon/blob/7871dce2/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java index 8c06711..5f4fafa 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java @@ -46,6 +46,7 @@ import java.util.List; */ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder { private static final String ACTION_TEMPLATE = "/action/process/spark-action.xml"; + private static final String FALCON_PREFIX = "falcon_"; public SparkProcessWorkflowBuilder(Process entity) { super(entity); @@ -155,6 +156,10 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder final String inputName = input.getName(); if (storage.getType() == Storage.TYPE.FILESYSTEM) { argList.add(0, "${" + inputName + "}"); + } else if (storage.getType() == Storage.TYPE.TABLE) { + argList.add(0, "${" + FALCON_PREFIX+inputName+"_database" + "}"); + argList.add(0, "${" + FALCON_PREFIX+inputName+"_table" + "}"); + argList.add(0, "${" + FALCON_PREFIX+inputName+"_partition_filter_hive" + "}"); } numInputFeed--; } @@ -174,6 +179,10 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder final String outputName = output.getName(); if (storage.getType() == Storage.TYPE.FILESYSTEM) { argList.add(0, "${" + outputName + "}"); + } else if (storage.getType() == Storage.TYPE.TABLE) { + argList.add(0, "${" + FALCON_PREFIX+outputName+"_database" + "}"); + argList.add(0, "${" + FALCON_PREFIX+outputName+"_table" + "}"); + argList.add(0, "${" + FALCON_PREFIX+outputName+"_partitions_hive" + "}"); } numOutputFeed--; } http://git-wip-us.apache.org/repos/asf/falcon/blob/7871dce2/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java index 85100e7..30ff537 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java @@ -326,6 +326,79 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { } @Test + public void testSparkSQLProcess() throws Exception { + URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml"); + Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource); + ConfigurationStore.get().publish(EntityType.FEED, inFeed); + + resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml"); + Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource); + ConfigurationStore.get().publish(EntityType.FEED, outFeed); + + resource = this.getClass().getResource("/config/process/spark-sql-process.xml"); + Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource); + ConfigurationStore.get().publish(EntityType.PROCESS, process); + + prepare(process); + OozieEntityBuilder builder = OozieEntityBuilder.get(process); + Path bundlePath = new Path("/falcon/staging/workflows", process.getName()); + builder.build(cluster, bundlePath); + assertTrue(fs.exists(bundlePath)); + + BUNDLEAPP bundle = getBundle(fs, bundlePath); + assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName()); + assertEquals(1, bundle.getCoordinator().size()); + assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(), + bundle.getCoordinator().get(0).getName()); + String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", ""); + + COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath)); + HashMap props = getCoordProperties(coord); + HashMap wfProps = getWorkflowProperties(fs, coord); + + verifyEntityProperties(process, cluster, + WorkflowExecutionContext.EntityOperations.GENERATE, wfProps); + verifyBrokerProperties(cluster, wfProps); + + // verify table and hive props + Map expected = getExpectedProperties(inFeed, outFeed, process); + expected.putAll(ClusterHelper.getHiveProperties(cluster)); + for (Map.Entry entry : props.entrySet()) { + if (expected.containsKey(entry.getKey())) { + Assert.assertEquals(entry.getValue(), expected.get(entry.getKey())); + } + } + + String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""); + WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml")); + testParentWorkflow(process, parentWorkflow); + + ACTION sparkNode = getAction(parentWorkflow, "user-action"); + + JAXBElement actionJaxbElement = + OozieUtils.unMarshalSparkAction(sparkNode); + org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue(); + + assertEquals(sparkAction.getMaster(), "local"); + assertEquals(sparkAction.getJar(), "jail://testCluster:00/resources/action/lib/falcon-examples.jar"); + + Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process)); + List argsList = sparkAction.getArg(); + + Input input = process.getInputs().getInputs().get(0); + Output output = process.getOutputs().getOutputs().get(0); + + assertEquals(argsList.get(0), "${falcon_"+input.getName()+"_partition_filter_hive}"); + assertEquals(argsList.get(1), "${falcon_"+input.getName()+"_table}"); + assertEquals(argsList.get(2), "${falcon_"+input.getName()+"_database}"); + assertEquals(argsList.get(3), "${falcon_"+output.getName()+"_partitions_hive}"); + assertEquals(argsList.get(4), "${falcon_"+output.getName()+"_table}"); + assertEquals(argsList.get(5), "${falcon_"+output.getName()+"_database}"); + + ConfigurationStore.get().remove(EntityType.PROCESS, process.getName()); + } + + @Test public void testSparkProcess() throws Exception { URL resource = this.getClass().getResource(SPARK_PROCESS_XML); http://git-wip-us.apache.org/repos/asf/falcon/blob/7871dce2/oozie/src/test/resources/config/process/spark-sql-process.xml ---------------------------------------------------------------------- diff --git a/oozie/src/test/resources/config/process/spark-sql-process.xml b/oozie/src/test/resources/config/process/spark-sql-process.xml new file mode 100644 index 0000000..55ff89b --- /dev/null +++ b/oozie/src/test/resources/config/process/spark-sql-process.xml @@ -0,0 +1,53 @@ + + + + + + + + + + + + 1 + LIFO + hours(1) + UTC + + + + + + + + + + + + + local + Spark SQL + org.apache.falcon.example.spark.SparkSQLProcessTable + /resources/action/lib/falcon-examples.jar + --num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1 + + + + + \ No newline at end of file