Return-Path: X-Original-To: apmail-hadoop-mapreduce-issues-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A955C1071E for ; Tue, 5 May 2015 14:41:02 +0000 (UTC) Received: (qmail 85479 invoked by uid 500); 5 May 2015 14:41:02 -0000 Delivered-To: apmail-hadoop-mapreduce-issues-archive@hadoop.apache.org Received: (qmail 85416 invoked by uid 500); 5 May 2015 14:41:02 -0000 Mailing-List: contact mapreduce-issues-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-issues@hadoop.apache.org Delivered-To: mailing list mapreduce-issues@hadoop.apache.org Received: (qmail 85404 invoked by uid 99); 5 May 2015 14:41:02 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 May 2015 14:41:02 +0000 Date: Tue, 5 May 2015 14:41:02 +0000 (UTC) From: "Hudson (JIRA)" To: mapreduce-issues@hadoop.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (MAPREDUCE-6259) IllegalArgumentException due to missing job submit time MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/MAPREDUCE-6259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14528538#comment-14528538 ] Hudson commented on MAPREDUCE-6259: ----------------------------------- SUCCESS: Integrated in Hadoop-Hdfs-trunk #2116 (See [https://builds.apache.org/job/Hadoop-Hdfs-trunk/2116/]) MAPREDUCE-6259. IllegalArgumentException due to missing job submit time. Contributed by zhihai xu (jlowe: rev bf70c5ae2824a9139c1aa9d7c14020018881cec2) * hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java * hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java * hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java * hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java * hadoop-mapreduce-project/CHANGES.txt > IllegalArgumentException due to missing job submit time > ------------------------------------------------------- > > Key: MAPREDUCE-6259 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-6259 > Project: Hadoop Map/Reduce > Issue Type: Bug > Components: jobhistoryserver > Reporter: zhihai xu > Assignee: zhihai xu > Fix For: 2.7.1 > > Attachments: MAPREDUCE-6259.000.patch > > > -1 job submit time cause IllegalArgumentException when parse the Job history file name and JOB_INIT_FAILED cause -1 job submit time in JobIndexInfo. > We found the following job history file name which cause IllegalArgumentException when parse the job status in the job history file name. > {code} > job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist > {code} > The stack trace for the IllegalArgumentException is > {code} > 2015-02-10 04:54:01,863 WARN org.apache.hadoop.mapreduce.v2.hs.PartialJob: Exception while parsing job state. Defaulting to KILLED > java.lang.IllegalArgumentException: No enum constant org.apache.hadoop.mapreduce.v2.api.records.JobState.0 > at java.lang.Enum.valueOf(Enum.java:236) > at org.apache.hadoop.mapreduce.v2.api.records.JobState.valueOf(JobState.java:21) > at org.apache.hadoop.mapreduce.v2.hs.PartialJob.getState(PartialJob.java:82) > at org.apache.hadoop.mapreduce.v2.hs.PartialJob.(PartialJob.java:59) > at org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage.getAllPartialJobs(CachedHistoryStorage.java:159) > at org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage.getPartialJobs(CachedHistoryStorage.java:173) > at org.apache.hadoop.mapreduce.v2.hs.JobHistory.getPartialJobs(JobHistory.java:284) > at org.apache.hadoop.mapreduce.v2.hs.webapp.HsWebServices.getJobs(HsWebServices.java:212) > at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60) > at com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$TypeOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:185) > at com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75) > at com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:288) > at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) > at com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108) > at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) > at com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84) > at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1469) > at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1400) > at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1349) > at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1339) > at com.sun.jersey.spi.container.servlet.WebComponent.service(WebComponent.java:416) > at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:537) > at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:886) > at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834) > at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795) > at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163) > at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) > at com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118) > at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113) > at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) > at org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109) > at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) > at org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1223) > at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) > at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45) > at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) > at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45) > at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) > at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399) > at org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216) > at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182) > at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:767) > at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450) > at org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230) > at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152) > at org.mortbay.jetty.Server.handle(Server.java:326) > at org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542) > at org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:928) > at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:549) > at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:212) > at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404) > at org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:410) > at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582) > {code} > when IOException happened in JobImpl#setup, the Job submit time in JobHistoryEventHandler#MetaInfo#JobIndexInfo will not be changed and the Job submit time will be its [initial value -1|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java#L1185]. > {code} > this.jobIndexInfo = > new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null, > queueName); > {code} > The following is the sequences to get -1 job submit time: > 1. > a job is created at MRAppMaster#serviceStart and the new job is at state JobStateInternal.NEW after created > {code} > job = createJob(getConfig(), forcedState, shutDownMessage); > {code} > 2. > JobEventType.JOB_INIT is sent to JobImpl from MRAppMaster#serviceStart > {code} > JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT); > // Send init to the job (this does NOT trigger job execution) > // This is a synchronous call, not an event through dispatcher. We want > // job-init to be done completely here. > jobEventDispatcher.handle(initJobEvent); > {code} > 3. > after JobImpl received JobEventType.JOB_INIT, it call InitTransition#transition > {code} > .addTransition > (JobStateInternal.NEW, > EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW), > JobEventType.JOB_INIT, > new InitTransition()) > {code} > 4. > then the exception happen from setup(job) in InitTransition#transition before JobSubmittedEvent is handled. > JobSubmittedEvent will update the job submit time. Due to the exception, the submit time is still the initial value -1. > This is the code InitTransition#transition > {code} > public JobStateInternal transition(JobImpl job, JobEvent event) { > job.metrics.submittedJob(job); > job.metrics.preparingJob(job); > if (job.newApiCommitter) { > job.jobContext = new JobContextImpl(job.conf, job.oldJobId); > } else { > job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(job.conf, job.oldJobId); > } > try { > setup(job); > job.fs = job.getFileSystem(job.conf); > //log to job history > JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId, > job.conf.get(MRJobConfig.JOB_NAME, "test"), > job.conf.get(MRJobConfig.USER_NAME, "mapred"), > job.appSubmitTime, > job.remoteJobConfFile.toString(), > job.jobACLs, job.queueName, > job.conf.get(MRJobConfig.WORKFLOW_ID, ""), > job.conf.get(MRJobConfig.WORKFLOW_NAME, ""), > job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""), > getWorkflowAdjacencies(job.conf), > job.conf.get(MRJobConfig.WORKFLOW_TAGS, "")); > job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse)); > //TODO JH Verify jobACLs, UserName via UGI? > TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId); > job.numMapTasks = taskSplitMetaInfo.length; > job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0); > if (job.numMapTasks == 0 && job.numReduceTasks == 0) { > job.addDiagnostic("No of maps and reduces are 0 " + job.jobId); > } else if (job.numMapTasks == 0) { > job.reduceWeight = 0.9f; > } else if (job.numReduceTasks == 0) { > job.mapWeight = 0.9f; > } else { > job.mapWeight = job.reduceWeight = 0.45f; > } > checkTaskLimits(); > long inputLength = 0; > for (int i = 0; i < job.numMapTasks; ++i) { > inputLength += taskSplitMetaInfo[i].getInputDataLength(); > } > job.makeUberDecision(inputLength); > > job.taskAttemptCompletionEvents = > new ArrayList( > job.numMapTasks + job.numReduceTasks + 10); > job.mapAttemptCompletionEvents = > new ArrayList(job.numMapTasks + 10); > job.taskCompletionIdxToMapCompletionIdx = new ArrayList( > job.numMapTasks + job.numReduceTasks + 10); > job.allowedMapFailuresPercent = > job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0); > job.allowedReduceFailuresPercent = > job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0); > // create the Tasks but don't start them yet > createMapTasks(job, inputLength, taskSplitMetaInfo); > createReduceTasks(job); > job.metrics.endPreparingJob(job); > return JobStateInternal.INITED; > } catch (Exception e) { > LOG.warn("Job init failed", e); > job.metrics.endPreparingJob(job); > job.addDiagnostic("Job init failed : " > + StringUtils.stringifyException(e)); > // Leave job in the NEW state. The MR AM will detect that the state is > // not INITED and send a JOB_INIT_FAILED event. > return JobStateInternal.NEW; > } > } > {code} > This is the code JobImpl#setup > {code} > protected void setup(JobImpl job) throws IOException { > String oldJobIDString = job.oldJobId.toString(); > String user = > UserGroupInformation.getCurrentUser().getShortUserName(); > Path path = MRApps.getStagingAreaDir(job.conf, user); > if(LOG.isDebugEnabled()) { > LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString); > } > job.remoteJobSubmitDir = > FileSystem.get(job.conf).makeQualified( > new Path(path, oldJobIDString)); > job.remoteJobConfFile = > new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE); > // Prepare the TaskAttemptListener server for authentication of Containers > // TaskAttemptListener gets the information via jobTokenSecretManager. > JobTokenIdentifier identifier = > new JobTokenIdentifier(new Text(oldJobIDString)); > job.jobToken = > new Token(identifier, job.jobTokenSecretManager); > job.jobToken.setService(identifier.getJobId()); > // Add it to the jobTokenSecretManager so that TaskAttemptListener server > // can authenticate containers(tasks) > job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken); > LOG.info("Adding job token for " + oldJobIDString > + " to jobTokenSecretManager"); > // If the job client did not setup the shuffle secret then reuse > // the job token secret for the shuffle. > if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) { > LOG.warn("Shuffle secret key missing from job credentials." > + " Using job token secret as shuffle secret."); > TokenCache.setShuffleSecretKey(job.jobToken.getPassword(), > job.jobCredentials); > } > } > {code} > 5. > Due to the IOException from JobImpl#setup, the new job is still at state JobStateInternal.NEW > {code} > } catch (Exception e) { > LOG.warn("Job init failed", e); > job.metrics.endPreparingJob(job); > job.addDiagnostic("Job init failed : " > + StringUtils.stringifyException(e)); > // Leave job in the NEW state. The MR AM will detect that the state is > // not INITED and send a JOB_INIT_FAILED event. > return JobStateInternal.NEW; > } > {code} > At the following code of MRAppMaster#serviceStart, The MR AM detect the state is not INITED and send a JOB_INIT_FAILED event. > {code} > // If job is still not initialized, an error happened during > // initialization. Must complete starting all of the services so failure > // events can be processed. > initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED); > if (initFailed) { > JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED); > jobEventDispatcher.handle(initFailedEvent); > } else { > // All components have started, start the job. > startJobs(); > } > {code} > 6. > After JobImpl receives the JOB_INIT_FAILED, it will call InitFailedTransition#transition and enter state JobStateInternal.FAIL_ABORT > {code} > .addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT, > JobEventType.JOB_INIT_FAILED, > new InitFailedTransition()) > {code} > 7. > JobImpl will send CommitterJobAbortEvent in InitFailedTransition#transition > {code} > public void transition(JobImpl job, JobEvent event) { > job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, > job.jobContext, > org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); > } > {code} > 8. > CommitterJobAbortEvent will be handled by CommitterEventHandler#handleJobAbort which will send JobAbortCompletedEvent(JobEventType.JOB_ABORT_COMPLETED) > {code} > protected void handleJobAbort(CommitterJobAbortEvent event) { > cancelJobCommit(); > try { > committer.abortJob(event.getJobContext(), event.getFinalState()); > } catch (Exception e) { > LOG.warn("Could not abort job", e); > } > context.getEventHandler().handle(new JobAbortCompletedEvent( > event.getJobID(), event.getFinalState())); > } > {code} > 9. > After JobImpl receives the JOB_ABORT_COMPLETED, it will call JobAbortCompletedTransition#transition and enter state JobStateInternal.FAILED > {code} > .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAILED, > JobEventType.JOB_ABORT_COMPLETED, > new JobAbortCompletedTransition()) > {code} > 10. > JobAbortCompletedTransition#transition will call JobImpl#unsuccessfulFinish which will send JobUnsuccessfulCompletionEvent with finish time. > {code} > public void transition(JobImpl job, JobEvent event) { > JobStateInternal finalState = JobStateInternal.valueOf( > ((JobAbortCompletedEvent) event).getFinalState().name()); > job.unsuccessfulFinish(finalState); > } > private void unsuccessfulFinish(JobStateInternal finalState) { > if (finishTime == 0) setFinishTime(); > cleanupProgress = 1.0f; > JobUnsuccessfulCompletionEvent unsuccessfulJobEvent = > new JobUnsuccessfulCompletionEvent(oldJobId, > finishTime, > succeededMapTaskCount, > succeededReduceTaskCount, > finalState.toString(), > diagnostics); > eventHandler.handle(new JobHistoryEvent(jobId, > unsuccessfulJobEvent)); > finished(finalState); > } > {code} > 11. > JobUnsuccessfulCompletionEvent will be handled by JobHistoryEventHandler#handleEvent with type EventType.JOB_FAILED > Based on the following code, you can see the JobIndexInfo#finishTime is set correctly but JobIndexInfo#submitTime and JobIndexInfo#jobStartTime are still -1. > {code} > if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED > || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) { > try { > JobUnsuccessfulCompletionEvent jucEvent = > (JobUnsuccessfulCompletionEvent) event > .getHistoryEvent(); > mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime()); > mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps()); > mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces()); > mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus()); > closeEventWriter(event.getJobID()); > processDoneFiles(event.getJobID()); > } catch (IOException e) { > throw new YarnRuntimeException(e); > } > } > {code} > The error job history file name in our log is "job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist" > Based on the filename, you can see submitTime is -1, finishTime is 1423572836007 and jobStartTime is 1423572836007. > The jobStartTime is not -1, and jobStartTime is the same as finishTime. > It is because jobStartTime is handled specially in FileNameIndexUtils#getDoneFileName: > {code} > //JobStartTime > if (indexInfo.getJobStartTime() >= 0) { > sb.append(indexInfo.getJobStartTime()); > } else { > sb.append(indexInfo.getFinishTime()); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)