falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srik...@apache.org
Subject git commit: FALCON 221 Logmover is not copying all action level logs. Contributed by Srikanth Sundarrajan
Date Fri, 31 Jan 2014 03:55:24 GMT
Updated Branches:
  refs/heads/master 4496b2dbf -> c1228fd74


FALCON 221 Logmover is not copying all action level logs. Contributed by Srikanth Sundarrajan


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/c1228fd7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/c1228fd7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/c1228fd7

Branch: refs/heads/master
Commit: c1228fd7442fa641a218f4c03dc83cb9b555543e
Parents: 4496b2d
Author: srikanth.sundarrajan <srikanth.sundarrajan@inmobi.com>
Authored: Fri Jan 31 09:24:59 2014 +0530
Committer: srikanth.sundarrajan <srikanth.sundarrajan@inmobi.com>
Committed: Fri Jan 31 09:24:59 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     | 28 +++---
 oozie/pom.xml                                   | 23 +++++
 .../falcon/logging/DefaultTaskLogRetriever.java | 62 ++++++++++++++
 .../org/apache/falcon/logging/LogMover.java     | 33 +++----
 .../falcon/logging/TaskLogURLRetriever.java     | 31 +++++++
 .../falcon/logging/v1/TaskLogRetrieverV1.java   | 90 ++++++++++++++++++++
 .../falcon/retention/FeedEvictorTest.java       |  8 +-
 7 files changed, 241 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c1228fd7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1e92966..a1e2388 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,12 +18,14 @@ Trunk (Unreleased)
   IMPROVEMENTS
     FALCON-60 Feed retention doesn't delete empty parent dirs. (Shaik Idris)
 
-    FALCON-247 Add scripts to check the status of falcon and prism. (Jean-Baptiste Onofré
-    via Shwetha GS)
+    FALCON-247 Add scripts to check the status of falcon and prism. (Jean-Baptiste 
+    Onofré via Shwetha GS)
 
-    FALCON-245 POM should use Apache POM as parent. (Jean-Baptiste Onofré via Shwetha GS)
+    FALCON-245 POM should use Apache POM as parent. (Jean-Baptiste Onofré via 
+    Shwetha GS)
 
-    FALCON-252 Upgrade to json-simple 1.1.1. (Jean-Baptiste Onofré via Shwetha GS)
+    FALCON-252 Upgrade to json-simple 1.1.1. (Jean-Baptiste Onofré via 
+    Shwetha GS)
 
     FALCON-233 Update hadoop 2 version to 2.2.0. (Venkatesh Seetharam
     via Shwetha GS)
@@ -33,21 +35,27 @@ Trunk (Unreleased)
     FALCON-66 Make oozie version change configurable. (Shwetha GS
     via Srikanth Sundarrajan)
     
-    FALCON-38 Falcon's parent workflow actions (pre-processing & prost-processing) should
have multiple retries. (Shaik Idris)
+    FALCON-38 Falcon's parent workflow actions (pre-processing & prost-processing) 
+    should have multiple retries. (Shaik Idris)
 
 
   OPTIMIZATIONS
     FALCON-123 Improve build speeds in falcon. (Srikanth Sundarrajan via Shwetha GS)
 
   BUG FIXES
-    FALCON-270 Checkstyle can not be run on a module. (Jean-Baptiste Onofré via Shwetha
GS)
+    FALCON-221 Logmover is not copying all action level logs. (Srikanth Sundarrajan)
+
+    FALCON-270 Checkstyle can not be run on a module. (Jean-Baptiste Onofré via 
+    Shwetha GS)
 
-    FALCON-260 When a process is scheduled, the user workflow is failing with OozieClientException.
-    (Shwetha GS)
+    FALCON-260 When a process is scheduled, the user workflow is failing with 
+    OozieClientException. (Shwetha GS)
 
-    FALCON-268 Checkstyle/Findbugs issues on FalconCLI. (Jean-Baptiste Onofré via Shwetha
GS)
+    FALCON-268 Checkstyle/Findbugs issues on FalconCLI. (Jean-Baptiste Onofré via 
+    Shwetha GS)
 
-    FALCON-258 Falcon status throws an error when external jobids are missing (Suhas Vasu
via Shaik Idris)
+    FALCON-258 Falcon status throws an error when external jobids are missing 
+    (Suhas Vasu via Shaik Idris)
 
     FALCON-262 Example files should use aligned dependency versions. (Jean-Baptiste Onofré
     via Shwetha GS)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c1228fd7/oozie/pom.xml
----------------------------------------------------------------------
diff --git a/oozie/pom.xml b/oozie/pom.xml
index a6514e4..7e2cb77 100644
--- a/oozie/pom.xml
+++ b/oozie/pom.xml
@@ -43,6 +43,29 @@
                     <artifactId>hadoop-core</artifactId>
                 </dependency>
             </dependencies>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>build-helper-maven-plugin</artifactId>
+                        <version>1.5</version>
+                        <executions>
+                            <execution>
+                                <id>add-source</id>
+                                <phase>generate-sources</phase>
+                                <goals>
+                                    <goal>add-source</goal>
+                                </goals>
+                                <configuration>
+                                    <sources>
+                                        <source>${project.basedir}/src/versioned-src/v1/java</source>
+                                    </sources>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
         </profile>
         <profile>
             <id>hadoop-2</id>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c1228fd7/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java b/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
new file mode 100644
index 0000000..ae4dd12
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.logging;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * Default task log retriever. By default checks the hadoop runningJob.
+ */
+public class DefaultTaskLogRetriever extends Configured implements TaskLogURLRetriever {
+
+    private static final Logger LOG = Logger.getLogger(DefaultTaskLogRetriever.class);
+
+    @Override
+    public String retrieveTaskLogURL(String jobId) throws IOException {
+        JobConf jobConf = new JobConf(getConf());
+        JobClient jobClient = new JobClient(jobConf);
+
+        RunningJob job = jobClient.getJob(JobID.forName(jobId));
+        if (job == null) {
+            LOG.warn("No running job for job id: " + jobId);
+            return getFromHistory(jobId);
+        }
+        TaskCompletionEvent[] tasks = job.getTaskCompletionEvents(0);
+        // 0th even is setup, 1 event is launcher, 2 event is cleanup
+        if (tasks != null && tasks.length == 3 && tasks[1] != null) {
+            return tasks[1].getTaskTrackerHttp() + "/tasklog?attemptid="
+                    + tasks[1].getTaskAttemptId() + "&all=true";
+        } else {
+            LOG.warn("No running task for job: " + jobId);
+            return getFromHistory(jobId);
+        }
+    }
+
+    protected String getFromHistory(String jodId) throws IOException {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c1228fd7/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogMover.java b/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
index afdc36d..d544311 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
@@ -28,11 +28,7 @@ import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TaskCompletionEvent;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Logger;
@@ -205,22 +201,19 @@ public class LogMover extends Configured implements Tool {
     }
 
     private String getTTlogURL(String jobId) throws Exception {
-        JobConf jobConf = new JobConf(getConf());
-        JobClient jobClient = new JobClient(jobConf);
-        RunningJob job = jobClient.getJob(JobID.forName(jobId));
-        if (job == null) {
-            LOG.warn("No running job for job id: " + jobId);
-            return null;
-        }
-        TaskCompletionEvent[] tasks = job.getTaskCompletionEvents(0);
-        // 0th even is setup, 1 event is launcher, 2 event is cleanup
-        if (tasks != null && tasks.length == 3 && tasks[1] != null) {
-            return tasks[1].getTaskTrackerHttp() + "/tasklog?attemptid="
-                    + tasks[1].getTaskAttemptId() + "&all=true";
-        } else {
-            LOG.warn("No running task for job: " + jobId);
+        TaskLogURLRetriever logRetriever = ReflectionUtils.newInstance(getLogRetrieverClassName(),
getConf());
+        return logRetriever.retrieveTaskLogURL(jobId);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Class<? extends TaskLogURLRetriever> getLogRetrieverClassName() {
+        try {
+            return (Class<? extends TaskLogURLRetriever>)
+                    Class.forName("org.apache.falcon.logging.v1.TaskLogRetrieverV1");
+        } catch (ClassNotFoundException e) {
+            LOG.warn("V1 Retriever missing, falling back to Default retriever");
+            return DefaultTaskLogRetriever.class;
         }
-        return null;
     }
 
     private InputStream getURLinputStream(URL url) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c1228fd7/oozie/src/main/java/org/apache/falcon/logging/TaskLogURLRetriever.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/TaskLogURLRetriever.java b/oozie/src/main/java/org/apache/falcon/logging/TaskLogURLRetriever.java
new file mode 100644
index 0000000..95860a6
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/logging/TaskLogURLRetriever.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.logging;
+
+import org.apache.hadoop.conf.Configurable;
+
+import java.io.IOException;
+
+/**
+ * Interface to abstract hadoop version specific task log url retrieval differences.
+ */
+public interface TaskLogURLRetriever extends Configurable {
+
+    String retrieveTaskLogURL(String jobId) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c1228fd7/oozie/src/versioned-src/v1/java/org/apache/falcon/logging/v1/TaskLogRetrieverV1.java
----------------------------------------------------------------------
diff --git a/oozie/src/versioned-src/v1/java/org/apache/falcon/logging/v1/TaskLogRetrieverV1.java
b/oozie/src/versioned-src/v1/java/org/apache/falcon/logging/v1/TaskLogRetrieverV1.java
new file mode 100644
index 0000000..5a2e570
--- /dev/null
+++ b/oozie/src/versioned-src/v1/java/org/apache/falcon/logging/v1/TaskLogRetrieverV1.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.logging.v1;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpStatus;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.falcon.logging.DefaultTaskLogRetriever;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.DefaultJobHistoryParser;
+import org.apache.hadoop.mapred.JobHistory;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLDecoder;
+
+/**
+ * Hadoop v1 task log retriever based on job history
+ */
+public final class TaskLogRetrieverV1 extends DefaultTaskLogRetriever {
+    private static final Logger LOG = Logger.getLogger(TaskLogRetrieverV1.class);
+
+    @Override
+    public String getFromHistory(String jobId) throws IOException {
+        Configuration conf = getConf();
+        String file = getHistoryFile(conf, jobId);
+        if (file == null) return null;
+        JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
+        DefaultJobHistoryParser.parseJobTasks(file, jobInfo, new Path(file).getFileSystem(conf));
+        LOG.info("History file:" + file);
+        LOG.debug("Number of tasks in the history file: " + jobInfo.getAllTasks().size());
+        for (JobHistory.Task task : jobInfo.getAllTasks().values()) {
+            if (task.get(JobHistory.Keys.TASK_TYPE).equals(JobHistory.Values.MAP.name())
&&
+                    task.get(JobHistory.Keys.TASK_STATUS).equals(JobHistory.Values.SUCCESS.name()))
{
+                for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) {
+                    if (attempt.get(JobHistory.Keys.TASK_STATUS).equals(JobHistory.Values.SUCCESS.name()))
{
+                        return JobHistory.getTaskLogsUrl(attempt);
+                    }
+                }
+            }
+        }
+        LOG.warn("Unable to find successful map task attempt");
+        return null;
+    }
+
+    private String getHistoryFile(Configuration conf, String jobId) throws IOException {
+        String jtAddress = "scheme://" + conf.get("mapred.job.tracker");
+        String jtHttpAddr = "scheme://" + conf.get("mapred.job.tracker.http.address");
+        try {
+            String host = new URI(jtAddress).getHost();
+            int port = new URI(jtHttpAddr).getPort();
+            HttpClient client = new HttpClient();
+            String jobUrl = "http://" + host + ":" + port + "/jobdetails.jsp";
+            GetMethod get = new GetMethod(jobUrl);
+            get.setQueryString("jobid=" + jobId);
+            get.setFollowRedirects(false);
+            int status = client.executeMethod(get);
+            String file = null;
+            if (status == HttpStatus.SC_MOVED_PERMANENTLY || status == HttpStatus.SC_MOVED_TEMPORARILY)
{
+                file = get.getResponseHeader("Location").toString();
+                file = file.substring(file.lastIndexOf('=') + 1);
+                file = JobHistory.JobInfo.decodeJobHistoryFileName(file);
+            } else {
+                LOG.warn("JobURL " + jobUrl + " for id: " + jobId + " returned " + status);
+            }
+            return file;
+        } catch (URISyntaxException e) {
+            throw new IOException("JT Address: " + jtAddress + ", http Address: " + jtHttpAddr,
e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c1228fd7/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
----------------------------------------------------------------------
diff --git a/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java b/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
index 27d965c..3c14b4d 100644
--- a/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
+++ b/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
@@ -122,10 +122,10 @@ public class FeedEvictorTest {
             assertFailures(fs, pair);
             compare(map.get("feed1"), stream.getBuffer());
 
-            Assert.assertEquals(readLogFile(new Path(logFile)),
-                    getExpectedInstancePaths(dataPath.replaceAll(storageUrl, "")));
-            String deletedPath = getExpectedInstancePaths(dataPath.replaceAll(storageUrl,
"")).
-                    split(",")[0].split("=")[1];
+            String expectedInstancePaths = getExpectedInstancePaths(dataPath.replaceAll(storageUrl,
""));
+            Assert.assertEquals(readLogFile(new Path(logFile)), expectedInstancePaths);
+
+            String deletedPath = expectedInstancePaths.split(",")[0].split("=")[1];
             Assert.assertFalse(fs.exists(new Path(deletedPath)));
             //empty parents
             Assert.assertFalse(fs.exists(new Path(deletedPath).getParent()));


Mime
View raw message