Return-Path: X-Original-To: apmail-falcon-commits-archive@minotaur.apache.org Delivered-To: apmail-falcon-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7B6C61064A for ; Mon, 16 Feb 2015 10:02:05 +0000 (UTC) Received: (qmail 12458 invoked by uid 500); 16 Feb 2015 10:02:05 -0000 Delivered-To: apmail-falcon-commits-archive@falcon.apache.org Received: (qmail 12423 invoked by uid 500); 16 Feb 2015 10:02:05 -0000 Mailing-List: contact commits-help@falcon.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@falcon.apache.org Delivered-To: mailing list commits@falcon.apache.org Received: (qmail 12414 invoked by uid 99); 16 Feb 2015 10:02:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Feb 2015 10:02:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 458A4E03E8; Mon, 16 Feb 2015 10:02:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: suhasv@apache.org To: commits@falcon.apache.org Message-Id: <5563c12d967a4eb58e535ccb5a210290@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: falcon git commit: FALCON-973 Add LogMover Service for yarn. Contributed by pavan kumar kolamuri Date: Mon, 16 Feb 2015 10:02:05 +0000 (UTC) Repository: falcon Updated Branches: refs/heads/master 280ea92a7 -> 8f32de0f3 FALCON-973 Add LogMover Service for yarn. Contributed by pavan kumar kolamuri Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/8f32de0f Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/8f32de0f Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/8f32de0f Branch: refs/heads/master Commit: 8f32de0f3626468ebc95fcb72d035479758bf22d Parents: 280ea92 Author: Suhas Vasu Authored: Mon Feb 16 15:31:37 2015 +0530 Committer: Suhas Vasu Committed: Mon Feb 16 15:31:37 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 3 + oozie/pom.xml | 5 + .../falcon/logging/DefaultTaskLogRetriever.java | 12 +- .../org/apache/falcon/logging/JobLogMover.java | 31 +++-- .../falcon/logging/TaskLogRetrieverYarn.java | 77 +++++++++++ .../falcon/logging/TaskLogURLRetriever.java | 3 +- .../oozie/logging/TaskLogRetrieverYarnTest.java | 130 +++++++++++++++++++ 7 files changed, 245 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/8f32de0f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8d16073..57061d3 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -79,6 +79,9 @@ Trunk (Unreleased) (Suhas vasu) BUG FIXES + FALCON-973 Add LogMover Service for yarn + (pavan kumar kolamuri via Suhas Vasu) + FALCON-993 Falcon Oozie adaptor test case failed with umask issue (Peeyush Bishnoi via Srikanth Sundarrajan) http://git-wip-us.apache.org/repos/asf/falcon/blob/8f32de0f/oozie/pom.xml ---------------------------------------------------------------------- diff --git a/oozie/pom.xml b/oozie/pom.xml index d5c422f..a150f3f 100644 --- a/oozie/pom.xml +++ b/oozie/pom.xml @@ -77,6 +77,11 @@ org.testng testng + + + org.mockito + mockito-all + http://git-wip-us.apache.org/repos/asf/falcon/blob/8f32de0f/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java b/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java index a685faf..962f891 100644 --- a/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java +++ b/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java @@ -28,6 +28,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; /** * Default task log retriever. By default checks the hadoop runningJob. @@ -37,7 +39,7 @@ public class DefaultTaskLogRetriever extends Configured implements TaskLogURLRet private static final Logger LOG = LoggerFactory.getLogger(DefaultTaskLogRetriever.class); @Override - public String retrieveTaskLogURL(String jobId) throws IOException { + public List retrieveTaskLogURL(String jobId) throws IOException { JobConf jobConf = new JobConf(getConf()); JobClient jobClient = new JobClient(jobConf); @@ -46,18 +48,20 @@ public class DefaultTaskLogRetriever extends Configured implements TaskLogURLRet LOG.warn("No running job for job id: {}", jobId); return getFromHistory(jobId); } + List taskLogUrls = new ArrayList(); TaskCompletionEvent[] tasks = job.getTaskCompletionEvents(0); // 0th even is setup, 1 event is launcher, 2 event is cleanup if (tasks != null && tasks.length == 3 && tasks[1] != null) { - return tasks[1].getTaskTrackerHttp() + "/tasklog?attemptid=" - + tasks[1].getTaskAttemptId() + "&all=true"; + taskLogUrls.add(tasks[1].getTaskTrackerHttp() + "/tasklog?attemptid=" + + tasks[1].getTaskAttemptId() + "&all=true"); + return taskLogUrls; } else { LOG.warn("No running task for job: {}", jobId); return getFromHistory(jobId); } } - protected String getFromHistory(String jodId) throws IOException { + protected List getFromHistory(String jodId) throws IOException { return null; } } http://git-wip-us.apache.org/repos/asf/falcon/blob/8f32de0f/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java index 4a22ff2..243487e 100644 --- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java +++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java @@ -51,6 +51,8 @@ import java.util.Set; public class JobLogMover { private static final Logger LOG = LoggerFactory.getLogger(JobLogMover.class); + private static final String YARN = "yarn"; + private static final String MAPREDUCE_FRAMEWORK = "mapreduce.framework.name"; public static final Set FALCON_ACTIONS = new HashSet(Arrays.asList(new String[]{"eviction", "replication", })); @@ -124,14 +126,18 @@ public class JobLogMover { private void copyTTlogs(FileSystem fs, Path path, WorkflowAction action) throws Exception { - String ttLogURL = getTTlogURL(action.getExternalId()); - if (ttLogURL != null) { - LOG.info("Fetching log for action: {} from url: {}", action.getExternalId(), ttLogURL); - InputStream in = getURLinputStream(new URL(ttLogURL)); - OutputStream out = fs.create(new Path(path, action.getName() + "_" - + getMappedStatus(action.getStatus()) + ".log")); - IOUtils.copyBytes(in, out, 4096, true); - LOG.info("Copied log to {}", path); + List ttLogUrls = getTTlogURL(action.getExternalId()); + if (ttLogUrls != null) { + int index = 1; + for (String ttLogURL : ttLogUrls) { + LOG.info("Fetching log for action: {} from url: {}", action.getExternalId(), ttLogURL); + InputStream in = getURLinputStream(new URL(ttLogURL)); + OutputStream out = fs.create(new Path(path, action.getName() + "_" + + getMappedStatus(action.getStatus()) + "-" + index + ".log")); + IOUtils.copyBytes(in, out, 4096, true); + LOG.info("Copied log to {}", path); + index++; + } } } @@ -145,15 +151,18 @@ public class JobLogMover { } } - private String getTTlogURL(String jobId) throws Exception { + private List getTTlogURL(String jobId) throws Exception { TaskLogURLRetriever logRetriever = ReflectionUtils - .newInstance(getLogRetrieverClassName(), getConf()); + .newInstance(getLogRetrieverClassName(getConf()), getConf()); return logRetriever.retrieveTaskLogURL(jobId); } @SuppressWarnings("unchecked") - private Class getLogRetrieverClassName() { + private Class getLogRetrieverClassName(Configuration conf) { try { + if (YARN.equals(conf.get(MAPREDUCE_FRAMEWORK))) { + return TaskLogRetrieverYarn.class; + } return (Class) Class.forName("org.apache.falcon.logging.v1.TaskLogRetrieverV1"); } catch (ClassNotFoundException e) { http://git-wip-us.apache.org/repos/asf/falcon/blob/8f32de0f/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java b/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java new file mode 100644 index 0000000..61c5afb --- /dev/null +++ b/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.logging; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.TaskCompletionEvent; +import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.v2.LogParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Task log retriever for jobs running in yarn. + */ +public class TaskLogRetrieverYarn extends DefaultTaskLogRetriever { + private static final Logger LOG = LoggerFactory.getLogger(TaskLogRetrieverYarn.class); + protected static final String SCHEME = "http://"; + protected static final String YARN_LOG_SERVER_URL = "yarn.log.server.url"; + + @Override + public List retrieveTaskLogURL(String jobIdStr) throws IOException { + List taskLogUrls = new ArrayList(); + Configuration conf = getConf(); + Cluster cluster = getCluster(conf); + JobID jobID = JobID.forName(jobIdStr); + if (jobID == null) { + LOG.warn("External id for workflow action is null"); + return null; + } + try { + Job job = cluster.getJob(jobID); + if (job != null) { + TaskCompletionEvent[] events = job.getTaskCompletionEvents(0); + for (TaskCompletionEvent event : events) { + LogParams params = cluster.getLogParams(jobID, event.getTaskAttemptId()); + String url = SCHEME + conf.get(YARN_LOG_SERVER_URL) + "/" + + event.getTaskTrackerHttp() + "/" + + params.getContainerId() + "/" + + params.getApplicationId() + "/" + + params.getOwner() + "?start=0"; + LOG.info("Task Log URL for the job {} is {}" + jobIdStr, url); + taskLogUrls.add(url); + } + return taskLogUrls; + } + LOG.warn("Unable to find the job in cluster {}" + jobIdStr); + return null; + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + protected Cluster getCluster(Configuration conf) throws IOException { + return new Cluster(conf); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/8f32de0f/oozie/src/main/java/org/apache/falcon/logging/TaskLogURLRetriever.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/logging/TaskLogURLRetriever.java b/oozie/src/main/java/org/apache/falcon/logging/TaskLogURLRetriever.java index 95860a6..e63f9bf 100644 --- a/oozie/src/main/java/org/apache/falcon/logging/TaskLogURLRetriever.java +++ b/oozie/src/main/java/org/apache/falcon/logging/TaskLogURLRetriever.java @@ -21,11 +21,12 @@ package org.apache.falcon.logging; import org.apache.hadoop.conf.Configurable; import java.io.IOException; +import java.util.List; /** * Interface to abstract hadoop version specific task log url retrieval differences. */ public interface TaskLogURLRetriever extends Configurable { - String retrieveTaskLogURL(String jobId) throws IOException; + List retrieveTaskLogURL(String jobId) throws IOException; } http://git-wip-us.apache.org/repos/asf/falcon/blob/8f32de0f/oozie/src/test/java/org/apache/falcon/oozie/logging/TaskLogRetrieverYarnTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/logging/TaskLogRetrieverYarnTest.java b/oozie/src/test/java/org/apache/falcon/oozie/logging/TaskLogRetrieverYarnTest.java new file mode 100644 index 0000000..0d9cf12 --- /dev/null +++ b/oozie/src/test/java/org/apache/falcon/oozie/logging/TaskLogRetrieverYarnTest.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.oozie.logging; + +import org.apache.falcon.logging.TaskLogRetrieverYarn; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapred.TaskID; +import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapred.TaskCompletionEvent; +import org.apache.hadoop.mapreduce.v2.LogParams; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for TaskLog Retrieval in Yarn. + */ +public class TaskLogRetrieverYarnTest extends TaskLogRetrieverYarn { + + private Cluster mockcluster; + private Configuration conf = null; + private static Random random = new Random(); + + + @DataProvider(name = "testData") + public Object[][] testData() throws IOException, InterruptedException { + int samples = getRandomValueInRange(10) + 1; + Object[][] resultSet = new Object[samples][2]; + for (int count = 0; count < samples; count++) { + List expectedResult = new ArrayList(); + Cluster cluster = getCluster(getConf()); + String jobId = new JobID("job", random.nextInt(1000)).toString(); + boolean success = random.nextBoolean(); + JobID jobID = JobID.forName(jobId); + int numEvents = getRandomValueInRange(10) + 1; + TaskCompletionEvent[] events = getTaskCompletionEvents(numEvents, jobID); + Job job = mock(Job.class); + when(cluster.getJob(jobID)).thenReturn(job); + when(job.getTaskCompletionEvents(0)).thenReturn(events); + for (TaskCompletionEvent event : events) { + if (success) { + LogParams params = getLogParams(); + when(cluster.getLogParams(jobID, event.getTaskAttemptId())).thenReturn(params); + String url = SCHEME + getConf().get(YARN_LOG_SERVER_URL) + "/" + + event.getTaskTrackerHttp() + "/" + + params.getContainerId() + "/" + + params.getApplicationId() + "/" + + params.getOwner() + "?start=0"; + expectedResult.add(url); + } else { + when(cluster.getJob(jobID)).thenReturn(null); + expectedResult = null; + } + resultSet[count] = new Object[]{jobId, expectedResult}; + } + } + return resultSet; + } + + @Test(dataProvider = "testData") + public void testSuccess(String jobId, List expectedResult) throws Exception { + List actual = this.retrieveTaskLogURL(jobId); + Assert.assertEquals(actual, expectedResult); + } + + @Override + protected Cluster getCluster(Configuration configuration) { + if (mockcluster == null) { + this.mockcluster = mock(Cluster.class); + } + return mockcluster; + } + + @Override + public Configuration getConf() { + if (conf == null) { + conf = new Configuration(); + conf.set(YARN_LOG_SERVER_URL, "host:4000"); + } + return conf; + } + + + private TaskCompletionEvent[] getTaskCompletionEvents(int numEvents, JobID jobID) { + TaskCompletionEvent[] taskCompletionEvents = new TaskCompletionEvent[numEvents]; + for (int i = 0; i < numEvents; i++) { + TaskAttemptID taskAttemptID = new TaskAttemptID(new TaskID(jobID, true, 0), i); + TaskCompletionEvent taskCompletionEvent = new TaskCompletionEvent(0, taskAttemptID, 0, + true, TaskCompletionEvent.Status.SUCCEEDED, "tracker:0"); + taskCompletionEvents[i] = taskCompletionEvent; + } + return taskCompletionEvents; + } + + private LogParams getLogParams() { + int containerIndex = getRandomValueInRange(10); + return new LogParams("c" + containerIndex, "a1", "n1", "own1"); + } + + private int getRandomValueInRange(int range) { + return random.nextInt(range); + } +}