falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject falcon git commit: FALCON-1312 Falcon post processing action should use Oozie prepared configuration. Contributed by Venkat Ranganathan.
Date Fri, 28 Aug 2015 19:46:33 GMT
Repository: falcon
Updated Branches:
  refs/heads/master c85a3c092 -> d9b824dc2


FALCON-1312 Falcon post processing action should use Oozie prepared configuration. Contributed
by Venkat Ranganathan.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d9b824dc
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d9b824dc
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d9b824dc

Branch: refs/heads/master
Commit: d9b824dc2f57c855f48185a3ea117c7da1bbc3a3
Parents: c85a3c0
Author: Ajay Yadava <ajaynsit@gmail.com>
Authored: Sat Aug 29 01:15:32 2015 +0530
Committer: Ajay Yadava <ajaynsit@gmail.com>
Committed: Sat Aug 29 01:15:32 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                             |  2 ++
 .../falcon/workflow/WorkflowExecutionContext.java       | 12 +++++++++++-
 .../java/org/apache/falcon/logging/JobLogMover.java     | 11 +++++++++--
 .../apache/falcon/workflow/FalconPostProcessing.java    |  6 ++++--
 4 files changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/d9b824dc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c3757fa..7bc10a8 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -95,6 +95,8 @@ Trunk (Unreleased)
     (Suhas Vasu)
 
   BUG FIXES
+    FALCON-1312 Falcon post processing action should use Oozie prepared configuration(Venkat
Ranganathan via Ajay Yadava)
+
     FALCON-1038 Log mover fails for map-reduce action(Peeyush Bishnoi via Ajay Yadava)
     
     FALCON-1412 Process waits indefinitely and finally timedout even though missing dependencies
are met(Pallavi Rao via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/d9b824dc/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 53ef5de..4454239 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -28,6 +28,7 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.json.simple.JSONValue;
@@ -95,6 +96,7 @@ public class WorkflowExecutionContext {
 
     private final Map<WorkflowExecutionArgs, String> context;
     private final long creationTime;
+    private Configuration actionJobConf;
 
     public WorkflowExecutionContext(Map<WorkflowExecutionArgs, String> context) {
         this.context = context;
@@ -301,7 +303,9 @@ public class WorkflowExecutionContext {
         OutputStream out = null;
         Path file = new Path(contextFile);
         try {
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(file.toUri());
+            FileSystem fs =
+                    actionJobConf == null ? HadoopClientFactory.get().createProxiedFileSystem(file.toUri())
+                                 : HadoopClientFactory.get().createProxiedFileSystem(file.toUri(),
actionJobConf);
             out = fs.create(file);
             out.write(JSONValue.toJSONString(context).getBytes());
         } catch (IOException e) {
@@ -346,7 +350,12 @@ public class WorkflowExecutionContext {
         return new Path(logDir + parentSuffix, entityName + "-wf-post-exec-context.json").toString();
     }
 
+
     public static WorkflowExecutionContext create(String[] args, Type type) throws FalconException
{
+        return create(args, type, null);
+    }
+
+    public static WorkflowExecutionContext create(String[] args, Type type, Configuration
conf) throws FalconException {
         Map<WorkflowExecutionArgs, String> wfProperties = new HashMap<WorkflowExecutionArgs,
String>();
 
         try {
@@ -362,6 +371,7 @@ public class WorkflowExecutionContext {
         }
 
         WorkflowExecutionContext executionContext = new WorkflowExecutionContext(wfProperties);
+        executionContext.actionJobConf = conf;
         executionContext.context.put(WorkflowExecutionArgs.CONTEXT_TYPE, type.name());
         executionContext.context.put(WorkflowExecutionArgs.CONTEXT_FILE,
                 getFilePath(executionContext.getLogDir(), executionContext.getEntityName(),

http://git-wip-us.apache.org/repos/asf/falcon/blob/d9b824dc/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
index 478d68c..830641e 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
@@ -22,6 +22,7 @@ import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.process.EngineType;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -58,7 +59,13 @@ public class JobLogMover {
         new HashSet<String>(Arrays.asList(new String[]{"eviction", "replication", }));
 
     private Configuration getConf() {
-        return new Configuration();
+        Configuration conf = null;
+        try {
+            conf = OozieActionConfigurationHelper.createActionConf();
+        } catch (IOException ioe) {
+            LOG.warn("Cannot get Oozie configuration.  Returning default");
+        }
+        return conf == null ? new Configuration(): conf;
     }
 
     public int run(WorkflowExecutionContext context) {
@@ -76,7 +83,7 @@ public class JobLogMover {
             //the corresponding job logs are stored within the respective dir
             Path path = new Path(context.getLogDir() + "/"
                     + String.format("%03d", context.getWorkflowRunId()));
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(path.toUri());
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(path.toUri(),
getConf());
 
             if (EntityType.FEED.name().equalsIgnoreCase(context.getEntityType())
                     || notUserWorkflowEngineIsOozie(context.getUserWorkflowEngine())) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/d9b824dc/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index 7557153..cff1187 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -20,6 +20,7 @@ package org.apache.falcon.workflow;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.logging.JobLogMover;
 import org.apache.falcon.messaging.JMSMessageProducer;
+import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -35,13 +36,14 @@ public class FalconPostProcessing extends Configured implements Tool {
     private static final Logger LOG = LoggerFactory.getLogger(FalconPostProcessing.class);
 
     public static void main(String[] args) throws Exception {
-        ToolRunner.run(new Configuration(), new FalconPostProcessing(), args);
+        Configuration conf = OozieActionConfigurationHelper.createActionConf();
+        ToolRunner.run(conf, new FalconPostProcessing(), args);
     }
 
     @Override
     public int run(String[] args) throws Exception {
         WorkflowExecutionContext context = WorkflowExecutionContext.create(args,
-                WorkflowExecutionContext.Type.POST_PROCESSING);
+                WorkflowExecutionContext.Type.POST_PROCESSING, getConf());
         LOG.info("Post workflow execution context created {}", context);
         // serialize the context to HDFS under logs dir before sending the message
         context.serialize();


Mime
View raw message