falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject falcon git commit: FALCON-2071 Falcon Spark SQL failing with Yarn Client Mode
Date Tue, 19 Jul 2016 20:22:03 GMT
Repository: falcon
Updated Branches:
  refs/heads/0.10 3b05c17fe -> de422a2f2


FALCON-2071 Falcon Spark SQL failing with Yarn Client Mode

Author: peeyush b <pbishnoi@hortonworks.com>

Reviewers: "Praveen Adlakha <adlakha.praveen@gmail.com>, Balu Vellanki <balu@apache.org>,
Venkat Ranganathan  <venkat@hortonworks.com>"

Closes #220 from peeyushb/FALCON-2071

(cherry picked from commit 73339264d6d8cd627aee0be53007a7bd4e4956ac)
Signed-off-by: bvellanki <bvellanki@hortonworks.com>


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/de422a2f
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/de422a2f
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/de422a2f

Branch: refs/heads/0.10
Commit: de422a2f2b01bc4a86d8a2f5068c65aace99caaa
Parents: 3b05c17
Author: peeyush b <pbishnoi@hortonworks.com>
Authored: Tue Jul 19 13:21:51 2016 -0700
Committer: bvellanki <bvellanki@hortonworks.com>
Committed: Tue Jul 19 13:21:59 2016 -0700

----------------------------------------------------------------------
 .../process/SparkProcessWorkflowBuilder.java    | 34 +++++++++-----------
 .../OozieProcessWorkflowBuilderTest.java        |  6 ++--
 2 files changed, 20 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/de422a2f/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
b/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
index 5f4fafa..51db75d 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
@@ -30,12 +30,10 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.oozie.spark.CONFIGURATION.Property;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.CONFIGURATION;
 import org.apache.falcon.util.OozieUtils;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import javax.xml.bind.JAXBElement;
@@ -59,7 +57,7 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
         org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue();
 
         String sparkMasterURL = entity.getSparkAttributes().getMaster();
-        String sparkFilePath = entity.getSparkAttributes().getJar();
+        Path sparkJarFilePath = new Path(entity.getSparkAttributes().getJar());
         String sparkJobName = entity.getSparkAttributes().getName();
         String sparkOpts = entity.getSparkAttributes().getSparkOpts();
         String sparkClassName = entity.getSparkAttributes().getClazz();
@@ -94,18 +92,28 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
         addOutputFeedsAsArgument(argList, cluster);
         addInputFeedsAsArgument(argList, cluster);
 
-        sparkAction.setJar(addUri(sparkFilePath, cluster));
-
-        setSparkLibFileToWorkflowLib(sparkFilePath, entity);
+        // In Oozie spark action, value for jar is either Java jar file path or Python file
path.
+        validateSparkJarFilePath(sparkJarFilePath);
+        sparkAction.setJar(sparkJarFilePath.getName());
+        setSparkLibFileToWorkflowLib(sparkJarFilePath.toString(), entity);
         propagateEntityProperties(sparkAction);
 
         OozieUtils.marshalSparkAction(action, actionJaxbElement);
         return action;
     }
 
-    private void setSparkLibFileToWorkflowLib(String sparkFile, Process entity) {
+    private void setSparkLibFileToWorkflowLib(String sparkJarFilePath, Process entity) {
         if (StringUtils.isEmpty(entity.getWorkflow().getLib())) {
-            entity.getWorkflow().setLib(sparkFile);
+            entity.getWorkflow().setLib(sparkJarFilePath);
+        } else {
+            String workflowLib = entity.getWorkflow().getLib() + "," + sparkJarFilePath;
+            entity.getWorkflow().setLib(workflowLib);
+        }
+    }
+
+    private void validateSparkJarFilePath(Path sparkJarFilePath) throws FalconException {
+        if (!sparkJarFilePath.isAbsolute()) {
+            throw new FalconException("Spark jar file path must be absolute:"+sparkJarFilePath);
         }
     }
 
@@ -188,16 +196,6 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
         }
     }
 
-    private String addUri(String jarFile, Cluster cluster) throws FalconException {
-        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                ClusterHelper.getConfiguration(cluster));
-        Path jarFilePath = new Path(jarFile);
-        if (jarFilePath.isAbsoluteAndSchemeAuthorityNull()) {
-            return fs.makeQualified(jarFilePath).toString();
-        }
-        return jarFile;
-    }
-
     private String getClusterEntitySparkMaster(Cluster cluster) {
         return ClusterHelper.getSparkMasterEndPoint(cluster);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/de422a2f/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 30ff537..a692d0c 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -372,6 +372,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase
{
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}",
"");
         WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
         testParentWorkflow(process, parentWorkflow);
+        assertEquals(process.getWorkflow().getLib(), "/resources/action/lib/falcon-examples.jar");
 
         ACTION sparkNode = getAction(parentWorkflow, "user-action");
 
@@ -380,7 +381,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase
{
         org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue();
 
         assertEquals(sparkAction.getMaster(), "local");
-        assertEquals(sparkAction.getJar(), "jail://testCluster:00/resources/action/lib/falcon-examples.jar");
+        assertEquals(sparkAction.getJar(), "falcon-examples.jar");
 
         Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
         List<String> argsList = sparkAction.getArg();
@@ -430,6 +431,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase
{
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}",
"");
         WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
         testParentWorkflow(process, parentWorkflow);
+        assertEquals(process.getWorkflow().getLib(), "/resources/action/lib/spark-wordcount.jar");
 
         ACTION sparkNode = getAction(parentWorkflow, "user-action");
 
@@ -437,7 +439,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase
{
                 OozieUtils.unMarshalSparkAction(sparkNode);
         org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue();
         assertEquals(sparkAction.getMaster(), "local");
-        assertEquals(sparkAction.getJar(), "jail://testCluster:00/resources/action/lib/spark-wordcount.jar");
+        assertEquals(sparkAction.getJar(), "spark-wordcount.jar");
         List<String> argsList = sparkAction.getArg();
         Input input = process.getInputs().getInputs().get(0);
         Output output = process.getOutputs().getOutputs().get(0);


Mime
View raw message