falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject falcon git commit: FALCON-2175 java.lang.IllegalArgumentException in LogMover service
Date Tue, 15 Nov 2016 10:09:42 GMT
Repository: falcon
Updated Branches:
  refs/heads/master c980aa800 -> 54a88b814


FALCON-2175 java.lang.IllegalArgumentException in LogMover service

Author: Pallavi Rao <pallavi.rao@inmobi.com>

Reviewers: @sandeepSamudrala, @praveen8927

Closes #292 from pallavi-rao/2175 and squashes the following commits:

38e16ee [Pallavi Rao] Fixing checkstyle issue
144c588 [Pallavi Rao] Revert "FALCON-1821 Update git pull merge script to accept and update
JIRA type"
3848b10 [Pallavi Rao] FALCON-2175  java.lang.IllegalArgumentException in LogMover service
5ecb344 [Pallavi Rao] FALCON-2175  java.lang.IllegalArgumentException in LogMover service
be17164 [Pallavi Rao] Merge remote-tracking branch 'upstream/master'
a6d8c6c [Pallavi Rao] FALCON-1821 Update git pull merge script to accept and update JIRA type


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/54a88b81
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/54a88b81
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/54a88b81

Branch: refs/heads/master
Commit: 54a88b8146ddaad9e230a8f090fff0f825cc778c
Parents: c980aa8
Author: Pallavi Rao <pallavi.rao@inmobi.com>
Authored: Tue Nov 15 15:39:27 2016 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Tue Nov 15 15:39:27 2016 +0530

----------------------------------------------------------------------
 .../workflow/WorkflowExecutionContext.java      |  2 +-
 .../falcon/messaging/JMSMessageConsumer.java    | 46 ++++++++++----------
 .../org/apache/falcon/logging/JobLogMover.java  | 14 ++++--
 3 files changed, 36 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/54a88b81/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index cccbe3b..d8040f0 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -170,7 +170,7 @@ public class WorkflowExecutionContext {
         return getValue(WorkflowExecutionArgs.LOG_FILE);
     }
 
-    String getNominalTime() {
+    public String getNominalTime() {
         return getValue(WorkflowExecutionArgs.NOMINAL_TIME);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/54a88b81/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index 5383e7f..db22460 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -18,6 +18,25 @@
 
 package org.apache.falcon.messaging;
 
+import java.lang.reflect.InvocationTargetException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Pair;
@@ -35,26 +54,6 @@ import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import java.lang.reflect.InvocationTargetException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TimeZone;
-
 /**
  * Subscribes to the falcon topic for handling retries and alerts.
  */
@@ -156,9 +155,12 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener
{
             wfProperties.put(WorkflowExecutionArgs.ENTITY_NAME, entityTypePair.first);
             wfProperties.put(WorkflowExecutionArgs.ENTITY_TYPE, entityTypePair.second.name());
             wfProperties.put(WorkflowExecutionArgs.WORKFLOW_USER, message.getStringProperty("user"));
-            wfProperties.put(WorkflowExecutionArgs.OPERATION, getOperation(appName).name());
+            WorkflowExecutionContext.EntityOperations operation = getOperation(appName);
+            wfProperties.put(WorkflowExecutionArgs.OPERATION, operation.name());
+            String subflowId = (operation.equals(WorkflowExecutionContext.EntityOperations.GENERATE))
+                    ? "@user-action" : "";
             wfProperties.put(WorkflowExecutionArgs.USER_SUBFLOW_ID,
-                    json.getString("id").concat("@user-action"));
+                    json.getString("id").concat(subflowId));
             String appType = message.getStringProperty("appType");
             return WorkflowExecutionContext.create(wfProperties, WorkflowExecutionContext.Type.valueOf(appType));
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/54a88b81/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
index 72c3dc5..535f62a 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.logging;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.process.EngineType;
 import org.apache.falcon.hadoop.HadoopClientFactory;
@@ -84,17 +85,23 @@ public class JobLogMover {
 
     public int run(WorkflowExecutionContext context) {
         try {
-            OozieClient client = new OozieClient(context.getWorkflowEngineUrl());
+            String engineUrl = context.getWorkflowEngineUrl();
+            if (StringUtils.isBlank(engineUrl)) {
+                LOG.warn("Unable to retrieve workflow url for {} with status {} ",
+                        context.getWorkflowId(), context.getWorkflowStatus());
+                return 0;
+            }
+            OozieClient client = new OozieClient(engineUrl);
             WorkflowJob jobInfo;
             try {
-                jobInfo = client.getJobInfo(context.getUserSubflowId());
+                jobInfo = client.getJobInfo(context.getWorkflowId());
             } catch (OozieClientException e) {
                 LOG.error("Error getting jobinfo for: {}", context.getUserSubflowId(), e);
                 return 0;
             }
             //Assumption is - Each wf run will have a directory
             //the corresponding job logs are stored within the respective dir
-            Path path = new Path(context.getLogDir() + "/"
+            Path path = new Path(context.getLogDir() + "/" + context.getNominalTime() + "/"
                     + String.format("%03d", context.getWorkflowRunId()));
             FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(path.toUri(),
getConf());
 
@@ -117,6 +124,7 @@ public class JobLogMover {
                         ||context.getUserWorkflowEngine().equals("hive")) {
                     flowId = jobInfo.getId();
                 } else {
+                    jobInfo = client.getJobInfo(context.getUserSubflowId());
                     // if process wf with oozie engine
                     flowId = jobInfo.getExternalId();
                 }


Mime
View raw message