falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From suh...@apache.org
Subject falcon git commit: FALCON-973 Add LogMover Service for yarn. Contributed by pavan kumar kolamuri
Date Mon, 16 Feb 2015 10:02:05 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 280ea92a7 -> 8f32de0f3


FALCON-973 Add LogMover Service for yarn. Contributed by pavan kumar kolamuri


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/8f32de0f
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/8f32de0f
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/8f32de0f

Branch: refs/heads/master
Commit: 8f32de0f3626468ebc95fcb72d035479758bf22d
Parents: 280ea92
Author: Suhas Vasu <suhas.v@inmobi.com>
Authored: Mon Feb 16 15:31:37 2015 +0530
Committer: Suhas Vasu <suhas.v@inmobi.com>
Committed: Mon Feb 16 15:31:37 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 oozie/pom.xml                                   |   5 +
 .../falcon/logging/DefaultTaskLogRetriever.java |  12 +-
 .../org/apache/falcon/logging/JobLogMover.java  |  31 +++--
 .../falcon/logging/TaskLogRetrieverYarn.java    |  77 +++++++++++
 .../falcon/logging/TaskLogURLRetriever.java     |   3 +-
 .../oozie/logging/TaskLogRetrieverYarnTest.java | 130 +++++++++++++++++++
 7 files changed, 245 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/8f32de0f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8d16073..57061d3 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -79,6 +79,9 @@ Trunk (Unreleased)
    (Suhas vasu)
 
   BUG FIXES
+   FALCON-973 Add LogMover Service for yarn
+   (pavan kumar kolamuri via Suhas Vasu)
+
    FALCON-993 Falcon Oozie adaptor test case failed with umask issue
    (Peeyush Bishnoi via Srikanth Sundarrajan)
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/8f32de0f/oozie/pom.xml
----------------------------------------------------------------------
diff --git a/oozie/pom.xml b/oozie/pom.xml
index d5c422f..a150f3f 100644
--- a/oozie/pom.xml
+++ b/oozie/pom.xml
@@ -77,6 +77,11 @@
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/falcon/blob/8f32de0f/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java b/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
index a685faf..962f891 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
@@ -28,6 +28,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Default task log retriever. By default checks the hadoop runningJob.
@@ -37,7 +39,7 @@ public class DefaultTaskLogRetriever extends Configured implements TaskLogURLRet
     private static final Logger LOG = LoggerFactory.getLogger(DefaultTaskLogRetriever.class);
 
     @Override
-    public String retrieveTaskLogURL(String jobId) throws IOException {
+    public List<String> retrieveTaskLogURL(String jobId) throws IOException {
         JobConf jobConf = new JobConf(getConf());
         JobClient jobClient = new JobClient(jobConf);
 
@@ -46,18 +48,20 @@ public class DefaultTaskLogRetriever extends Configured implements TaskLogURLRet
             LOG.warn("No running job for job id: {}", jobId);
             return getFromHistory(jobId);
         }
+        List<String> taskLogUrls = new ArrayList<String>();
         TaskCompletionEvent[] tasks = job.getTaskCompletionEvents(0);
         // 0th even is setup, 1 event is launcher, 2 event is cleanup
         if (tasks != null && tasks.length == 3 && tasks[1] != null) {
-            return tasks[1].getTaskTrackerHttp() + "/tasklog?attemptid="
-                    + tasks[1].getTaskAttemptId() + "&all=true";
+            taskLogUrls.add(tasks[1].getTaskTrackerHttp() + "/tasklog?attemptid="
+                    + tasks[1].getTaskAttemptId() + "&all=true");
+            return taskLogUrls;
         } else {
             LOG.warn("No running task for job: {}", jobId);
             return getFromHistory(jobId);
         }
     }
 
-    protected String getFromHistory(String jodId) throws IOException {
+    protected List<String> getFromHistory(String jodId) throws IOException {
         return null;
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/8f32de0f/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
index 4a22ff2..243487e 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
@@ -51,6 +51,8 @@ import java.util.Set;
 public class JobLogMover {
 
     private static final Logger LOG = LoggerFactory.getLogger(JobLogMover.class);
+    private static final String YARN = "yarn";
+    private static final String MAPREDUCE_FRAMEWORK = "mapreduce.framework.name";
 
     public static final Set<String> FALCON_ACTIONS =
         new HashSet<String>(Arrays.asList(new String[]{"eviction", "replication", }));
@@ -124,14 +126,18 @@ public class JobLogMover {
 
     private void copyTTlogs(FileSystem fs, Path path,
                             WorkflowAction action) throws Exception {
-        String ttLogURL = getTTlogURL(action.getExternalId());
-        if (ttLogURL != null) {
-            LOG.info("Fetching log for action: {} from url: {}", action.getExternalId(),
ttLogURL);
-            InputStream in = getURLinputStream(new URL(ttLogURL));
-            OutputStream out = fs.create(new Path(path, action.getName() + "_"
-                    + getMappedStatus(action.getStatus()) + ".log"));
-            IOUtils.copyBytes(in, out, 4096, true);
-            LOG.info("Copied log to {}", path);
+        List<String> ttLogUrls = getTTlogURL(action.getExternalId());
+        if (ttLogUrls != null) {
+            int index = 1;
+            for (String ttLogURL : ttLogUrls) {
+                LOG.info("Fetching log for action: {} from url: {}", action.getExternalId(),
ttLogURL);
+                InputStream in = getURLinputStream(new URL(ttLogURL));
+                OutputStream out = fs.create(new Path(path, action.getName() + "_"
+                        + getMappedStatus(action.getStatus()) + "-" + index + ".log"));
+                IOUtils.copyBytes(in, out, 4096, true);
+                LOG.info("Copied log to {}", path);
+                index++;
+            }
         }
     }
 
@@ -145,15 +151,18 @@ public class JobLogMover {
         }
     }
 
-    private String getTTlogURL(String jobId) throws Exception {
+    private List<String> getTTlogURL(String jobId) throws Exception {
         TaskLogURLRetriever logRetriever = ReflectionUtils
-                .newInstance(getLogRetrieverClassName(), getConf());
+                .newInstance(getLogRetrieverClassName(getConf()), getConf());
         return logRetriever.retrieveTaskLogURL(jobId);
     }
 
     @SuppressWarnings("unchecked")
-    private Class<? extends TaskLogURLRetriever> getLogRetrieverClassName() {
+    private Class<? extends TaskLogURLRetriever> getLogRetrieverClassName(Configuration
conf) {
         try {
+            if (YARN.equals(conf.get(MAPREDUCE_FRAMEWORK))) {
+                return TaskLogRetrieverYarn.class;
+            }
             return (Class<? extends TaskLogURLRetriever>)
                     Class.forName("org.apache.falcon.logging.v1.TaskLogRetrieverV1");
         } catch (ClassNotFoundException e) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/8f32de0f/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java b/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java
new file mode 100644
index 0000000..61c5afb
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.logging;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Task log retriever for jobs running in yarn.
+ */
+public class TaskLogRetrieverYarn extends DefaultTaskLogRetriever {
+    private static final Logger LOG = LoggerFactory.getLogger(TaskLogRetrieverYarn.class);
+    protected static final String SCHEME = "http://";
+    protected static final String YARN_LOG_SERVER_URL = "yarn.log.server.url";
+
+    @Override
+    public List<String> retrieveTaskLogURL(String jobIdStr) throws IOException {
+        List<String> taskLogUrls = new ArrayList<String>();
+        Configuration conf = getConf();
+        Cluster cluster = getCluster(conf);
+        JobID jobID = JobID.forName(jobIdStr);
+        if (jobID == null) {
+            LOG.warn("External id for workflow action is null");
+            return null;
+        }
+        try {
+            Job job = cluster.getJob(jobID);
+            if (job != null) {
+                TaskCompletionEvent[] events = job.getTaskCompletionEvents(0);
+                for (TaskCompletionEvent event : events) {
+                    LogParams params = cluster.getLogParams(jobID, event.getTaskAttemptId());
+                    String url = SCHEME + conf.get(YARN_LOG_SERVER_URL) + "/"
+                            + event.getTaskTrackerHttp() + "/"
+                            + params.getContainerId() + "/"
+                            + params.getApplicationId() + "/"
+                            + params.getOwner() + "?start=0";
+                    LOG.info("Task Log URL for the job {} is {}" + jobIdStr, url);
+                    taskLogUrls.add(url);
+                }
+                return taskLogUrls;
+            }
+            LOG.warn("Unable to find the job in cluster {}" + jobIdStr);
+            return null;
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    protected Cluster getCluster(Configuration conf) throws IOException {
+        return new Cluster(conf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8f32de0f/oozie/src/main/java/org/apache/falcon/logging/TaskLogURLRetriever.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/TaskLogURLRetriever.java b/oozie/src/main/java/org/apache/falcon/logging/TaskLogURLRetriever.java
index 95860a6..e63f9bf 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/TaskLogURLRetriever.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/TaskLogURLRetriever.java
@@ -21,11 +21,12 @@ package org.apache.falcon.logging;
 import org.apache.hadoop.conf.Configurable;
 
 import java.io.IOException;
+import java.util.List;
 
 /**
  * Interface to abstract hadoop version specific task log url retrieval differences.
  */
 public interface TaskLogURLRetriever extends Configurable {
 
-    String retrieveTaskLogURL(String jobId) throws IOException;
+    List<String> retrieveTaskLogURL(String jobId) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/8f32de0f/oozie/src/test/java/org/apache/falcon/oozie/logging/TaskLogRetrieverYarnTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/logging/TaskLogRetrieverYarnTest.java
b/oozie/src/test/java/org/apache/falcon/oozie/logging/TaskLogRetrieverYarnTest.java
new file mode 100644
index 0000000..0d9cf12
--- /dev/null
+++ b/oozie/src/test/java/org/apache/falcon/oozie/logging/TaskLogRetrieverYarnTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.logging;
+
+import org.apache.falcon.logging.TaskLogRetrieverYarn;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for TaskLog Retrieval in Yarn.
+ */
+public class TaskLogRetrieverYarnTest extends TaskLogRetrieverYarn {
+
+    private Cluster mockcluster;
+    private Configuration conf = null;
+    private static Random random = new Random();
+
+
+    @DataProvider(name = "testData")
+    public Object[][] testData() throws IOException, InterruptedException {
+        int samples = getRandomValueInRange(10) + 1;
+        Object[][] resultSet = new Object[samples][2];
+        for (int count = 0; count < samples; count++) {
+            List<String> expectedResult = new ArrayList<String>();
+            Cluster cluster = getCluster(getConf());
+            String jobId = new JobID("job", random.nextInt(1000)).toString();
+            boolean success = random.nextBoolean();
+            JobID jobID = JobID.forName(jobId);
+            int numEvents = getRandomValueInRange(10) + 1;
+            TaskCompletionEvent[] events = getTaskCompletionEvents(numEvents, jobID);
+            Job job = mock(Job.class);
+            when(cluster.getJob(jobID)).thenReturn(job);
+            when(job.getTaskCompletionEvents(0)).thenReturn(events);
+            for (TaskCompletionEvent event : events) {
+                if (success) {
+                    LogParams params = getLogParams();
+                    when(cluster.getLogParams(jobID, event.getTaskAttemptId())).thenReturn(params);
+                    String url = SCHEME + getConf().get(YARN_LOG_SERVER_URL) + "/"
+                            + event.getTaskTrackerHttp() + "/"
+                            + params.getContainerId() + "/"
+                            + params.getApplicationId() + "/"
+                            + params.getOwner() + "?start=0";
+                    expectedResult.add(url);
+                } else {
+                    when(cluster.getJob(jobID)).thenReturn(null);
+                    expectedResult = null;
+                }
+                resultSet[count] = new Object[]{jobId, expectedResult};
+            }
+        }
+        return resultSet;
+    }
+
+    @Test(dataProvider = "testData")
+    public void testSuccess(String jobId, List<String> expectedResult) throws Exception
{
+        List<String> actual = this.retrieveTaskLogURL(jobId);
+        Assert.assertEquals(actual, expectedResult);
+    }
+
+    @Override
+    protected Cluster getCluster(Configuration configuration) {
+        if (mockcluster == null) {
+            this.mockcluster = mock(Cluster.class);
+        }
+        return mockcluster;
+    }
+
+    @Override
+    public Configuration getConf() {
+        if (conf == null) {
+            conf = new Configuration();
+            conf.set(YARN_LOG_SERVER_URL, "host:4000");
+        }
+        return conf;
+    }
+
+
+    private TaskCompletionEvent[] getTaskCompletionEvents(int numEvents, JobID jobID) {
+        TaskCompletionEvent[] taskCompletionEvents = new TaskCompletionEvent[numEvents];
+        for (int i = 0; i < numEvents; i++) {
+            TaskAttemptID taskAttemptID = new TaskAttemptID(new TaskID(jobID, true, 0), i);
+            TaskCompletionEvent taskCompletionEvent = new TaskCompletionEvent(0, taskAttemptID,
0,
+                    true, TaskCompletionEvent.Status.SUCCEEDED, "tracker:0");
+            taskCompletionEvents[i] = taskCompletionEvent;
+        }
+        return taskCompletionEvents;
+    }
+
+    private LogParams getLogParams() {
+        int containerIndex = getRandomValueInRange(10);
+        return new LogParams("c" + containerIndex, "a1", "n1", "own1");
+    }
+
+    private int getRandomValueInRange(int range) {
+        return random.nextInt(range);
+    }
+}


Mime
View raw message