Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DD2E8FA7F for ; Sat, 6 Apr 2013 03:09:17 +0000 (UTC) Received: (qmail 1372 invoked by uid 500); 6 Apr 2013 03:09:17 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 924 invoked by uid 500); 6 Apr 2013 03:09:16 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 877 invoked by uid 99); 6 Apr 2013 03:09:14 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 06 Apr 2013 03:09:14 +0000 X-ASF-Spam-Status: No, hits=-1999.0 required=5.0 tests=ALL_TRUSTED,FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 06 Apr 2013 03:09:10 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id E689A23888E7 for ; Sat, 6 Apr 2013 03:08:48 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1465171 - in /hadoop/common/branches/branch-1: ./ src/core/org/apache/hadoop/io/retry/ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/web/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ Date: Sat, 06 Apr 2013 03:08:48 -0000 To: common-commits@hadoop.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130406030848.E689A23888E7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: acmurthy Date: Sat Apr 6 03:08:47 2013 New Revision: 1465171 URL: http://svn.apache.org/r1465171 Log: MAPREDUCE-5131. Fix handling of job monitoring APIs during JobTracker restart. Contributed by Arun C. Murthy. Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerNotYetInitializedException.java Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/retry/RetryUtils.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1465171&r1=1465170&r2=1465171&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Sat Apr 6 03:08:47 2013 @@ -579,6 +579,9 @@ Release 1.2.0 - unreleased submitted the job rather than JobTracker user. (tomwhite, acmurthy via acmurthy) + MAPREDUCE-5131. Fix handling of job monitoring APIs during JobTracker + restart. (acmurthy) + Release 1.1.2 - 2013.01.30 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/retry/RetryUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/retry/RetryUtils.java?rev=1465171&r1=1465170&r2=1465171&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/retry/RetryUtils.java (original) +++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/io/retry/RetryUtils.java Sat Apr 6 03:08:47 2013 @@ -47,7 +47,7 @@ public class RetryUtils { * @param defaultRetryPolicyEnabled default retryPolicyEnabledKey conf value * @param retryPolicySpecKey conf property key for retry policy spec * @param defaultRetryPolicySpec default retryPolicySpecKey conf value - * @param remoteExceptionToRetry The particular RemoteException to retry + * @param remoteExceptionsToRetry The particular RemoteExceptions to retry * @return the default retry policy. */ public static RetryPolicy getDefaultRetryPolicy( @@ -56,7 +56,7 @@ public class RetryUtils { boolean defaultRetryPolicyEnabled, String retryPolicySpecKey, String defaultRetryPolicySpec, - final Class remoteExceptionToRetry + final Class ... remoteExceptionsToRetry ) { final RetryPolicy multipleLinearRandomRetry = @@ -81,8 +81,14 @@ public class RetryUtils { final RetryPolicy p; if (e instanceof RemoteException) { final RemoteException re = (RemoteException)e; - p = remoteExceptionToRetry.getName().equals(re.getClassName())? - multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL; + RetryPolicy found = null; + for(Class reToRetry : remoteExceptionsToRetry) { + if (reToRetry.getName().equals(re.getClassName())) { + found = multipleLinearRandomRetry; + break; + } + } + p = found != null? found: RetryPolicies.TRY_ONCE_THEN_FAIL; } else if (e instanceof IOException) { p = multipleLinearRandomRetry; } else { //non-IOException Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1465171&r1=1465170&r2=1465171&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Sat Apr 6 03:08:47 2013 @@ -147,6 +147,7 @@ public class DFSClient implements FSCons private static ClientProtocol createNamenode(ClientProtocol rpcNamenode, Configuration conf) throws IOException { //default policy + @SuppressWarnings("unchecked") final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy( conf, Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1465171&r1=1465170&r2=1465171&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Sat Apr 6 03:08:47 2013 @@ -154,6 +154,7 @@ public class WebHdfsFileSystem extends F } @Override + @SuppressWarnings("unchecked") public synchronized void initialize(URI uri, Configuration conf ) throws IOException { super.initialize(uri, conf); Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1465171&r1=1465170&r2=1465171&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobClient.java Sat Apr 6 03:08:47 2013 @@ -525,6 +525,12 @@ public class JobClient extends Configure JobSubmissionProtocol rpcJobSubmitClient, Configuration conf) throws IOException { + /* + * Default is to retry on JobTrackerNotYetInitializedException + * i.e. wait for JobTracker to get to RUNNING state and for + * SafeModeException + */ + @SuppressWarnings("unchecked") RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy( conf, @@ -532,13 +538,14 @@ public class JobClient extends Configure MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT, MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY, MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT, + JobTrackerNotYetInitializedException.class, SafeModeException.class ); - - /* + + /* * Method specific retry policies for killJob and killTask... - * - * No retries on any exception including + * + * No retries on any exception including * ConnectionException and SafeModeException */ Map methodNameToPolicyMap = Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1465171&r1=1465170&r2=1465171&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTracker.java Sat Apr 6 03:08:47 2013 @@ -198,8 +198,11 @@ public class JobTracker implements MRCon private final float MIN_HEARTBEATS_SCALING_FACTOR = 0.01f; private final float DEFAULT_HEARTBEATS_SCALING_FACTOR = 1.0f; + final static String JT_INIT_CONFIG_KEY_FOR_TESTS = + "mapreduce.jobtracker.init.for.tests"; + public static enum State { INITIALIZING, RUNNING } - State state = State.INITIALIZING; + volatile State state = State.INITIALIZING; private static final int FS_ACCESS_RETRY_PERIOD = 1000; static final String JOB_INFO_FILE = "job-info"; static final String JOB_TOKEN_FILE = "jobToken"; @@ -2029,6 +2032,8 @@ public class JobTracker implements MRCon + " could not be started", t); } } + + this.initDone.set(conf.getBoolean(JT_INIT_CONFIG_KEY_FOR_TESTS, true)); } private static SimpleDateFormat getDateFormat() { @@ -2169,6 +2174,8 @@ public class JobTracker implements MRCon completedJobsStoreThread.start(); } + // Just for unit-tests + waitForInit(); synchronized (this) { state = State.RUNNING; } @@ -2179,6 +2186,29 @@ public class JobTracker implements MRCon LOG.info("Stopped interTrackerServer"); } + AtomicBoolean initDone = new AtomicBoolean(true); + Object initDoneLock = new Object(); + + private void waitForInit() { + synchronized (initDoneLock) { + while (!initDone.get()) { + try { + LOG.debug("About to wait since initDone = false"); + initDoneLock.wait(); + } catch (InterruptedException ie) { + LOG.debug("Ignoring ", ie); + } + } + } + } + + void setInitDone(boolean done) { + synchronized (initDoneLock) { + initDone.set(done); + initDoneLock.notify(); + } + } + void close() throws IOException { if (plugins != null) { for (ServicePlugin p : plugins) { @@ -3498,6 +3528,9 @@ public class JobTracker implements MRCon * Allocates a new JobId string. */ public synchronized JobID getNewJobId() throws IOException { + // Check for JobTracker operational state + checkJobTrackerState(); + return new JobID(getTrackerIdentifier(), nextJobId++); } @@ -3511,6 +3544,9 @@ public class JobTracker implements MRCon */ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException { + // Check for JobTracker operational state + checkJobTrackerState(); + return submitJob(jobId, jobSubmitDir, null, ts, false); } @@ -3737,6 +3773,9 @@ public class JobTracker implements MRCon return; } + // Check for JobTracker operational state + checkJobTrackerState(); + // No 'killJob' in safe-mode checkSafeMode(); @@ -3883,6 +3922,9 @@ public class JobTracker implements MRCon public synchronized void setJobPriority(JobID jobid, String priority) throws IOException { + // Check for JobTracker operational state + checkJobTrackerState(); + JobInProgress job = jobs.get(jobid); if (null == job) { LOG.info("setJobPriority(): JobId " + jobid.toString() @@ -3910,7 +3952,10 @@ public class JobTracker implements MRCon return job.inited(); } - public JobProfile getJobProfile(JobID jobid) { + public JobProfile getJobProfile(JobID jobid) throws IOException { + // Check for JobTracker operational state + checkJobTrackerState(); + synchronized (this) { JobInProgress job = jobs.get(jobid); if (job != null) { @@ -3927,7 +3972,10 @@ public class JobTracker implements MRCon return completedJobStatusStore.readJobProfile(jobid); } - public JobStatus getJobStatus(JobID jobid) { + public JobStatus getJobStatus(JobID jobid) throws IOException { + // Check for JobTracker operational state + checkJobTrackerState(); + if (null == jobid) { LOG.warn("JobTracker.getJobStatus() cannot get status for null jobid"); return null; @@ -3950,6 +3998,9 @@ public class JobTracker implements MRCon private static final Counters EMPTY_COUNTERS = new Counters(); public Counters getJobCounters(JobID jobid) throws IOException { + // Check for JobTracker operational state + checkJobTrackerState(); + UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser(); synchronized (this) { JobInProgress job = jobs.get(jobid); @@ -3984,6 +4035,9 @@ public class JobTracker implements MRCon public synchronized TaskReport[] getMapTaskReports(JobID jobid) throws IOException { + // Check for JobTracker operational state + checkJobTrackerState(); + JobInProgress job = jobs.get(jobid); if (job != null) { // Check authorization @@ -4012,6 +4066,9 @@ public class JobTracker implements MRCon public synchronized TaskReport[] getReduceTaskReports(JobID jobid) throws IOException { + // Check for JobTracker operational state + checkJobTrackerState(); + JobInProgress job = jobs.get(jobid); if (job != null) { // Check authorization @@ -4038,6 +4095,9 @@ public class JobTracker implements MRCon public synchronized TaskReport[] getCleanupTaskReports(JobID jobid) throws IOException { + // Check for JobTracker operational state + checkJobTrackerState(); + JobInProgress job = jobs.get(jobid); if (job != null) { // Check authorization @@ -4067,6 +4127,9 @@ public class JobTracker implements MRCon public synchronized TaskReport[] getSetupTaskReports(JobID jobid) throws IOException { + // Check for JobTracker operational state + checkJobTrackerState(); + JobInProgress job = jobs.get(jobid); if (job != null) { // Check authorization @@ -4110,6 +4173,9 @@ public class JobTracker implements MRCon */ public TaskCompletionEvent[] getTaskCompletionEvents( JobID jobid, int fromEventId, int maxEvents) throws IOException{ + // Check for JobTracker operational state + checkJobTrackerState(); + JobInProgress job = this.jobs.get(jobid); if (null != job) { @@ -4131,6 +4197,9 @@ public class JobTracker implements MRCon */ public synchronized String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException { + // Check for JobTracker operational state + checkJobTrackerState(); + List taskDiagnosticInfo = null; JobID jobId = taskId.getJobID(); TaskID tipId = taskId.getTaskID(); @@ -4195,6 +4264,9 @@ public class JobTracker implements MRCon */ public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail) throws IOException { + // Check for JobTracker operational state + checkJobTrackerState(); + // No 'killTask' in safe-mode checkSafeMode(); @@ -5093,13 +5165,23 @@ public class JobTracker implements MRCon private void checkSafeMode() throws SafeModeException { if (isInSafeMode()) { - try { - throw new SafeModeException(( - isInAdminSafeMode()) ? adminSafeModeUser : null); - } catch (SafeModeException sfe) { - LOG.info("JobTracker in safe-mode, aborting operation", sfe); - throw sfe; - } + SafeModeException sme = + new SafeModeException( + (isInAdminSafeMode()) ? adminSafeModeUser : null); + LOG.info("JobTracker in safe-mode, aborting operation: ", sme); + throw sme; } } + + private void checkJobTrackerState() + throws JobTrackerNotYetInitializedException { + if (state != State.RUNNING) { + JobTrackerNotYetInitializedException jtnyie = + new JobTrackerNotYetInitializedException(); + LOG.info("JobTracker not yet in RUNNING state, aborting operation: ", + jtnyie); + throw jtnyie; + } + } + } Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerNotYetInitializedException.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerNotYetInitializedException.java?rev=1465171&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerNotYetInitializedException.java (added) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobTrackerNotYetInitializedException.java Sat Apr 6 03:08:47 2013 @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +/** + * This exception is thrown when the JobTracker is still initializing and + * not yet operational. + */ +public class JobTrackerNotYetInitializedException extends IOException { + + private static final long serialVersionUID = 1984839357L; + + public JobTrackerNotYetInitializedException() { + super("JobTracker is not yet RUNNING"); + } + +} Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=1465171&r1=1465170&r2=1465171&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java (original) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Sat Apr 6 03:08:47 2013 @@ -631,6 +631,7 @@ public class TestRecoveryManager { conf.set("mapreduce.jobtracker.staging.root.dir", "/user"); conf.set("mapred.system.dir", "/mapred"); + String mapredSysDir = conf.get("mapred.system.dir"); mkdirWithPerms(fs, mapredSysDir, (short)0700); fs.setOwner(new Path(mapredSysDir), @@ -679,12 +680,36 @@ public class TestRecoveryManager { LOG.info("Stopping jobtracker"); mr.stopJobTracker(); + // Blocking JT INIT on restart + mr.getJobTrackerConf().setBoolean( + JobTracker.JT_INIT_CONFIG_KEY_FOR_TESTS, false); + + // start the jobtracker LOG.info("Starting jobtracker"); - mr.startJobTracker(); - UtilsForTests.waitForJobTracker(jc); + mr.startJobTracker(false); + while (!mr.getJobTrackerRunner().isUp()) { + Thread.sleep(100); + } jobtracker = mr.getJobTrackerRunner().getJobTracker(); + Assert.assertNotNull(jobtracker); + + // now check for job status ... + // should throw JobTrackerNotYetInitializedException + boolean gotJTNYIException = false; + try { + jobtracker.getJobStatus(rJob1.getID()); + } catch (JobTrackerNotYetInitializedException jtnyie) { + LOG.info("Caught JobTrackerNotYetInitializedException", jtnyie); + gotJTNYIException = true; + } + Assert.assertTrue(gotJTNYIException); + + jobtracker.setInitDone(true); + + UtilsForTests.waitForJobTracker(jc); + // assert that job is recovered by the jobtracker Assert.assertEquals("Resubmission failed ", 1, jobtracker.getAllJobs().length); JobInProgress jip = jobtracker.getJob(rJob1.getID());