hadoop-mapreduce-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "zhihai xu (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (MAPREDUCE-6259) -1 submit time cause IllegalArgumentException when parse the Job history file name and JOB_INIT_FAILED cause submit time is not updated in JobIndexInfo.
Date Fri, 13 Feb 2015 05:31:11 GMT

     [ https://issues.apache.org/jira/browse/MAPREDUCE-6259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

zhihai xu updated MAPREDUCE-6259:
---------------------------------
    Description: 
-1 submit time cause IllegalArgumentException when parse the Job history file name and JOB_INIT_FAILED
cause submit time is not updated in JobIndexInfo.
We found the following job history file name which cause IllegalArgumentException when parse
the job status in the job history file name.
{code}
job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist
{code}

when IOException happened in JobImpl#setup, the Job submit time in JobHistoryEventHandler#MetaInfo#JobIndexInfo
will not be changed and the Job submit time will be its [initial value -1|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java#L1185].
{code}
      this.jobIndexInfo =
          new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null,
                           queueName);
{code}

The following is the sequence to get -1 submit time:
1. 
a job is created at MRAppMaster#serviceStart and  the new job is at state JobStateInternal.NEW
after created
{code}
    job = createJob(getConfig(), forcedState, shutDownMessage);
{code}

2.
JobEventType.JOB_INIT is sent to JobImpl from MRAppMaster#serviceStart
{code}
      JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
      // Send init to the job (this does NOT trigger job execution)
      // This is a synchronous call, not an event through dispatcher. We want
      // job-init to be done completely here.
      jobEventDispatcher.handle(initJobEvent);
{code}

3.
after JobImpl received JobEventType.JOB_INIT, it call InitTransition#transition
{code}
          .addTransition
              (JobStateInternal.NEW,
              EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
              JobEventType.JOB_INIT,
              new InitTransition())
{code}

4.
then the exception happen from setup(job) in InitTransition#transition before JobSubmittedEvent
is handled.
JobSubmittedEvent will update the job submit time. Due to the exception, the submit time is
still the initial value -1.
This is the code InitTransition#transition
{code}
public JobStateInternal transition(JobImpl job, JobEvent event) {
      job.metrics.submittedJob(job);
      job.metrics.preparingJob(job);
      if (job.newApiCommitter) {
        job.jobContext = new JobContextImpl(job.conf, job.oldJobId);
      } else {
        job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(job.conf, job.oldJobId);
      }
      try {
        setup(job);
        job.fs = job.getFileSystem(job.conf);
        //log to job history
        JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
              job.conf.get(MRJobConfig.JOB_NAME, "test"), 
            job.conf.get(MRJobConfig.USER_NAME, "mapred"),
            job.appSubmitTime,
            job.remoteJobConfFile.toString(),
            job.jobACLs, job.queueName,
            job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
            job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
            job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
            getWorkflowAdjacencies(job.conf),
            job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
        job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
        //TODO JH Verify jobACLs, UserName via UGI?

        TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
        job.numMapTasks = taskSplitMetaInfo.length;
        job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);

        if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
          job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
        } else if (job.numMapTasks == 0) {
          job.reduceWeight = 0.9f;
        } else if (job.numReduceTasks == 0) {
          job.mapWeight = 0.9f;
        } else {
          job.mapWeight = job.reduceWeight = 0.45f;
        }

        checkTaskLimits();

        long inputLength = 0;
        for (int i = 0; i < job.numMapTasks; ++i) {
          inputLength += taskSplitMetaInfo[i].getInputDataLength();
        }

        job.makeUberDecision(inputLength);
        
        job.taskAttemptCompletionEvents =
            new ArrayList<TaskAttemptCompletionEvent>(
                job.numMapTasks + job.numReduceTasks + 10);
        job.mapAttemptCompletionEvents =
            new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10);
        job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>(
            job.numMapTasks + job.numReduceTasks + 10);

        job.allowedMapFailuresPercent =
            job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
        job.allowedReduceFailuresPercent =
            job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);

        // create the Tasks but don't start them yet
        createMapTasks(job, inputLength, taskSplitMetaInfo);
        createReduceTasks(job);

        job.metrics.endPreparingJob(job);
        return JobStateInternal.INITED;
      } catch (Exception e) {
        LOG.warn("Job init failed", e);
        job.metrics.endPreparingJob(job);
        job.addDiagnostic("Job init failed : "
            + StringUtils.stringifyException(e));
        // Leave job in the NEW state. The MR AM will detect that the state is
        // not INITED and send a JOB_INIT_FAILED event.
        return JobStateInternal.NEW;
      }
    }
{code}

This is the code JobImpl#setup
{code}
    protected void setup(JobImpl job) throws IOException {

      String oldJobIDString = job.oldJobId.toString();
      String user = 
        UserGroupInformation.getCurrentUser().getShortUserName();
      Path path = MRApps.getStagingAreaDir(job.conf, user);
      if(LOG.isDebugEnabled()) {
        LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString);
      }

      job.remoteJobSubmitDir =
          FileSystem.get(job.conf).makeQualified(
              new Path(path, oldJobIDString));
      job.remoteJobConfFile =
          new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);

      // Prepare the TaskAttemptListener server for authentication of Containers
      // TaskAttemptListener gets the information via jobTokenSecretManager.
      JobTokenIdentifier identifier =
          new JobTokenIdentifier(new Text(oldJobIDString));
      job.jobToken =
          new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager);
      job.jobToken.setService(identifier.getJobId());
      // Add it to the jobTokenSecretManager so that TaskAttemptListener server
      // can authenticate containers(tasks)
      job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken);
      LOG.info("Adding job token for " + oldJobIDString
          + " to jobTokenSecretManager");

      // If the job client did not setup the shuffle secret then reuse
      // the job token secret for the shuffle.
      if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) {
        LOG.warn("Shuffle secret key missing from job credentials."
            + " Using job token secret as shuffle secret.");
        TokenCache.setShuffleSecretKey(job.jobToken.getPassword(),
            job.jobCredentials);
      }
    }
{code}

5.
Due to the IOException from  JobImpl#setup, the new job is still at state JobStateInternal.NEW
{code}
      } catch (Exception e) {
        LOG.warn("Job init failed", e);
        job.metrics.endPreparingJob(job);
        job.addDiagnostic("Job init failed : "
            + StringUtils.stringifyException(e));
        // Leave job in the NEW state. The MR AM will detect that the state is
        // not INITED and send a JOB_INIT_FAILED event.
        return JobStateInternal.NEW;
      }
{code}
At the following code of MRAppMaster#serviceStart, The MR AM detect the state is not INITED
and send a JOB_INIT_FAILED event.
{code}
      // If job is still not initialized, an error happened during
      // initialization. Must complete starting all of the services so failure
      // events can be processed.
      initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED);
    if (initFailed) {
      JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
      jobEventDispatcher.handle(initFailedEvent);
    } else {
      // All components have started, start the job.
      startJobs();
    }
{code}

6.
After JobImpl receives the JOB_INIT_FAILED, it will call InitFailedTransition#transition and
enter state JobStateInternal.FAIL_ABORT
{code}
          .addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT,
              JobEventType.JOB_INIT_FAILED,
              new InitFailedTransition())
{code}

7.
JobImpl will send CommitterJobAbortEvent in  InitFailedTransition#transition 
{code}
    public void transition(JobImpl job, JobEvent event) {
        job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
                job.jobContext,
                org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
    }
{code}

8.
CommitterJobAbortEvent will be handled by CommitterEventHandler#handleJobAbort which will
send JobAbortCompletedEvent(JobEventType.JOB_ABORT_COMPLETED)
{code}
    protected void handleJobAbort(CommitterJobAbortEvent event) {
      cancelJobCommit();
      try {
        committer.abortJob(event.getJobContext(), event.getFinalState());
      } catch (Exception e) {
        LOG.warn("Could not abort job", e);
      }
      context.getEventHandler().handle(new JobAbortCompletedEvent(
          event.getJobID(), event.getFinalState()));
    }
{code}

9.
After JobImpl receives the JOB_ABORT_COMPLETED, it will call JobAbortCompletedTransition#transition
and enter state JobStateInternal.FAILED
{code}
          .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAILED,
              JobEventType.JOB_ABORT_COMPLETED,
              new JobAbortCompletedTransition())
{code}

10.
JobAbortCompletedTransition#transition will call JobImpl#unsuccessfulFinish which will send
JobUnsuccessfulCompletionEvent with finish time.
{code}
    public void transition(JobImpl job, JobEvent event) {
      JobStateInternal finalState = JobStateInternal.valueOf(
          ((JobAbortCompletedEvent) event).getFinalState().name());
      job.unsuccessfulFinish(finalState);
    }
  private void unsuccessfulFinish(JobStateInternal finalState) {
      if (finishTime == 0) setFinishTime();
      cleanupProgress = 1.0f;
      JobUnsuccessfulCompletionEvent unsuccessfulJobEvent =
          new JobUnsuccessfulCompletionEvent(oldJobId,
              finishTime,
              succeededMapTaskCount,
              succeededReduceTaskCount,
              finalState.toString(),
              diagnostics);
      eventHandler.handle(new JobHistoryEvent(jobId,
          unsuccessfulJobEvent));
      finished(finalState);
  }
{code}

11.
JobUnsuccessfulCompletionEvent will be handled by JobHistoryEventHandler#handleEvent with
type EventType.JOB_FAILED
Based on the following code, you can see the JobIndexInfo#finishTime is set correctly but
JobIndexInfo#submitTime and  JobIndexInfo#jobStartTime are still -1.
{code}
      if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED
          || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) {
        try {
          JobUnsuccessfulCompletionEvent jucEvent = 
              (JobUnsuccessfulCompletionEvent) event
              .getHistoryEvent();
          mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
          mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
          mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
          mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
          closeEventWriter(event.getJobID());
          processDoneFiles(event.getJobID());
        } catch (IOException e) {
          throw new YarnRuntimeException(e);
        }
      }
{code}

The error job history file name in our log is "job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist"
Based on the filename, you can see submitTime is -1, finishTime is 1423572836007 and jobStartTime
is 1423572836007.
The jobStartTime is not -1, and  jobStartTime is the same as  finishTime.
It is because jobStartTime is handled specially in FileNameIndexUtils#getDoneFileName:
{code}
    //JobStartTime
    if (indexInfo.getJobStartTime() >= 0) {
      sb.append(indexInfo.getJobStartTime());
    } else {
      sb.append(indexInfo.getFinishTime());
    }
{code}



  was:
-1 submit time cause IllegalArgumentException when parse the Job history file name and JOB_INIT_FAILED
cause submit time is not updated in JobIndexInfo.
We found the following job history file name which cause IllegalArgumentException when parse
the job status in the job history file name.
{code}
job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist
{code}

when IOException happened in JobImpl#setup, the Job submit time in JobHistoryEventHandler#MetaInfo#JobIndexInfo
will not be changed and the Job submit time will be its initial value [-1|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java#L1185].
{code}
      this.jobIndexInfo =
          new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null,
                           queueName);
{code}

The following is the sequence to get -1 submit time:
1. 
a job is created at MRAppMaster#serviceStart and  the new job is at state JobStateInternal.NEW
after created
{code}
    job = createJob(getConfig(), forcedState, shutDownMessage);
{code}

2.
JobEventType.JOB_INIT is sent to JobImpl from MRAppMaster#serviceStart
{code}
      JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
      // Send init to the job (this does NOT trigger job execution)
      // This is a synchronous call, not an event through dispatcher. We want
      // job-init to be done completely here.
      jobEventDispatcher.handle(initJobEvent);
{code}

3.
after JobImpl received JobEventType.JOB_INIT, it call InitTransition#transition
{code}
          .addTransition
              (JobStateInternal.NEW,
              EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
              JobEventType.JOB_INIT,
              new InitTransition())
{code}

4.
then the exception happen from setup(job) in InitTransition#transition before JobSubmittedEvent
is handled.
JobSubmittedEvent will update the job submit time. Due to the exception, the submit time is
still the initial value -1.
This is the code InitTransition#transition
{code}
public JobStateInternal transition(JobImpl job, JobEvent event) {
      job.metrics.submittedJob(job);
      job.metrics.preparingJob(job);
      if (job.newApiCommitter) {
        job.jobContext = new JobContextImpl(job.conf, job.oldJobId);
      } else {
        job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(job.conf, job.oldJobId);
      }
      try {
        setup(job);
        job.fs = job.getFileSystem(job.conf);
        //log to job history
        JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
              job.conf.get(MRJobConfig.JOB_NAME, "test"), 
            job.conf.get(MRJobConfig.USER_NAME, "mapred"),
            job.appSubmitTime,
            job.remoteJobConfFile.toString(),
            job.jobACLs, job.queueName,
            job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
            job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
            job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
            getWorkflowAdjacencies(job.conf),
            job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
        job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
        //TODO JH Verify jobACLs, UserName via UGI?

        TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
        job.numMapTasks = taskSplitMetaInfo.length;
        job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);

        if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
          job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
        } else if (job.numMapTasks == 0) {
          job.reduceWeight = 0.9f;
        } else if (job.numReduceTasks == 0) {
          job.mapWeight = 0.9f;
        } else {
          job.mapWeight = job.reduceWeight = 0.45f;
        }

        checkTaskLimits();

        long inputLength = 0;
        for (int i = 0; i < job.numMapTasks; ++i) {
          inputLength += taskSplitMetaInfo[i].getInputDataLength();
        }

        job.makeUberDecision(inputLength);
        
        job.taskAttemptCompletionEvents =
            new ArrayList<TaskAttemptCompletionEvent>(
                job.numMapTasks + job.numReduceTasks + 10);
        job.mapAttemptCompletionEvents =
            new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10);
        job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>(
            job.numMapTasks + job.numReduceTasks + 10);

        job.allowedMapFailuresPercent =
            job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
        job.allowedReduceFailuresPercent =
            job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);

        // create the Tasks but don't start them yet
        createMapTasks(job, inputLength, taskSplitMetaInfo);
        createReduceTasks(job);

        job.metrics.endPreparingJob(job);
        return JobStateInternal.INITED;
      } catch (Exception e) {
        LOG.warn("Job init failed", e);
        job.metrics.endPreparingJob(job);
        job.addDiagnostic("Job init failed : "
            + StringUtils.stringifyException(e));
        // Leave job in the NEW state. The MR AM will detect that the state is
        // not INITED and send a JOB_INIT_FAILED event.
        return JobStateInternal.NEW;
      }
    }
{code}

This is the code JobImpl#setup
{code}
    protected void setup(JobImpl job) throws IOException {

      String oldJobIDString = job.oldJobId.toString();
      String user = 
        UserGroupInformation.getCurrentUser().getShortUserName();
      Path path = MRApps.getStagingAreaDir(job.conf, user);
      if(LOG.isDebugEnabled()) {
        LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString);
      }

      job.remoteJobSubmitDir =
          FileSystem.get(job.conf).makeQualified(
              new Path(path, oldJobIDString));
      job.remoteJobConfFile =
          new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);

      // Prepare the TaskAttemptListener server for authentication of Containers
      // TaskAttemptListener gets the information via jobTokenSecretManager.
      JobTokenIdentifier identifier =
          new JobTokenIdentifier(new Text(oldJobIDString));
      job.jobToken =
          new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager);
      job.jobToken.setService(identifier.getJobId());
      // Add it to the jobTokenSecretManager so that TaskAttemptListener server
      // can authenticate containers(tasks)
      job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken);
      LOG.info("Adding job token for " + oldJobIDString
          + " to jobTokenSecretManager");

      // If the job client did not setup the shuffle secret then reuse
      // the job token secret for the shuffle.
      if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) {
        LOG.warn("Shuffle secret key missing from job credentials."
            + " Using job token secret as shuffle secret.");
        TokenCache.setShuffleSecretKey(job.jobToken.getPassword(),
            job.jobCredentials);
      }
    }
{code}

5.
Due to the IOException from  JobImpl#setup, the new job is still at state JobStateInternal.NEW
{code}
      } catch (Exception e) {
        LOG.warn("Job init failed", e);
        job.metrics.endPreparingJob(job);
        job.addDiagnostic("Job init failed : "
            + StringUtils.stringifyException(e));
        // Leave job in the NEW state. The MR AM will detect that the state is
        // not INITED and send a JOB_INIT_FAILED event.
        return JobStateInternal.NEW;
      }
{code}
At the following code of MRAppMaster#serviceStart, The MR AM detect the state is not INITED
and send a JOB_INIT_FAILED event.
{code}
      // If job is still not initialized, an error happened during
      // initialization. Must complete starting all of the services so failure
      // events can be processed.
      initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED);
    if (initFailed) {
      JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
      jobEventDispatcher.handle(initFailedEvent);
    } else {
      // All components have started, start the job.
      startJobs();
    }
{code}

6.
After JobImpl receives the JOB_INIT_FAILED, it will call InitFailedTransition#transition and
enter state JobStateInternal.FAIL_ABORT
{code}
          .addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT,
              JobEventType.JOB_INIT_FAILED,
              new InitFailedTransition())
{code}

7.
JobImpl will send CommitterJobAbortEvent in  InitFailedTransition#transition 
{code}
    public void transition(JobImpl job, JobEvent event) {
        job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
                job.jobContext,
                org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
    }
{code}

8.
CommitterJobAbortEvent will be handled by CommitterEventHandler#handleJobAbort which will
send JobAbortCompletedEvent(JobEventType.JOB_ABORT_COMPLETED)
{code}
    protected void handleJobAbort(CommitterJobAbortEvent event) {
      cancelJobCommit();
      try {
        committer.abortJob(event.getJobContext(), event.getFinalState());
      } catch (Exception e) {
        LOG.warn("Could not abort job", e);
      }
      context.getEventHandler().handle(new JobAbortCompletedEvent(
          event.getJobID(), event.getFinalState()));
    }
{code}

9.
After JobImpl receives the JOB_ABORT_COMPLETED, it will call JobAbortCompletedTransition#transition
and enter state JobStateInternal.FAILED
{code}
          .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAILED,
              JobEventType.JOB_ABORT_COMPLETED,
              new JobAbortCompletedTransition())
{code}

10.
JobAbortCompletedTransition#transition will call JobImpl#unsuccessfulFinish which will send
JobUnsuccessfulCompletionEvent with finish time.
{code}
    public void transition(JobImpl job, JobEvent event) {
      JobStateInternal finalState = JobStateInternal.valueOf(
          ((JobAbortCompletedEvent) event).getFinalState().name());
      job.unsuccessfulFinish(finalState);
    }
  private void unsuccessfulFinish(JobStateInternal finalState) {
      if (finishTime == 0) setFinishTime();
      cleanupProgress = 1.0f;
      JobUnsuccessfulCompletionEvent unsuccessfulJobEvent =
          new JobUnsuccessfulCompletionEvent(oldJobId,
              finishTime,
              succeededMapTaskCount,
              succeededReduceTaskCount,
              finalState.toString(),
              diagnostics);
      eventHandler.handle(new JobHistoryEvent(jobId,
          unsuccessfulJobEvent));
      finished(finalState);
  }
{code}

11.
JobUnsuccessfulCompletionEvent will be handled by JobHistoryEventHandler#handleEvent with
type EventType.JOB_FAILED
Based on the following code, you can see the JobIndexInfo#finishTime is set correctly but
JobIndexInfo#submitTime and  JobIndexInfo#jobStartTime are still -1.
{code}
      if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED
          || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) {
        try {
          JobUnsuccessfulCompletionEvent jucEvent = 
              (JobUnsuccessfulCompletionEvent) event
              .getHistoryEvent();
          mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
          mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
          mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
          mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
          closeEventWriter(event.getJobID());
          processDoneFiles(event.getJobID());
        } catch (IOException e) {
          throw new YarnRuntimeException(e);
        }
      }
{code}

The error job history file name in our log is "job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist"
Based on the filename, you can see submitTime is -1, finishTime is 1423572836007 and jobStartTime
is 1423572836007.
The jobStartTime is not -1, and  jobStartTime is the same as  finishTime.
It is because jobStartTime is handled specially in FileNameIndexUtils#getDoneFileName:
{code}
    //JobStartTime
    if (indexInfo.getJobStartTime() >= 0) {
      sb.append(indexInfo.getJobStartTime());
    } else {
      sb.append(indexInfo.getFinishTime());
    }
{code}




> -1 submit time cause IllegalArgumentException when parse the Job history file name and
JOB_INIT_FAILED cause submit time is not updated in JobIndexInfo.
> --------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-6259
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-6259
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>          Components: jobhistoryserver
>            Reporter: zhihai xu
>            Assignee: zhihai xu
>
> -1 submit time cause IllegalArgumentException when parse the Job history file name and
JOB_INIT_FAILED cause submit time is not updated in JobIndexInfo.
> We found the following job history file name which cause IllegalArgumentException when
parse the job status in the job history file name.
> {code}
> job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist
> {code}
> when IOException happened in JobImpl#setup, the Job submit time in JobHistoryEventHandler#MetaInfo#JobIndexInfo
will not be changed and the Job submit time will be its [initial value -1|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java#L1185].
> {code}
>       this.jobIndexInfo =
>           new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null,
>                            queueName);
> {code}
> The following is the sequence to get -1 submit time:
> 1. 
> a job is created at MRAppMaster#serviceStart and  the new job is at state JobStateInternal.NEW
after created
> {code}
>     job = createJob(getConfig(), forcedState, shutDownMessage);
> {code}
> 2.
> JobEventType.JOB_INIT is sent to JobImpl from MRAppMaster#serviceStart
> {code}
>       JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
>       // Send init to the job (this does NOT trigger job execution)
>       // This is a synchronous call, not an event through dispatcher. We want
>       // job-init to be done completely here.
>       jobEventDispatcher.handle(initJobEvent);
> {code}
> 3.
> after JobImpl received JobEventType.JOB_INIT, it call InitTransition#transition
> {code}
>           .addTransition
>               (JobStateInternal.NEW,
>               EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
>               JobEventType.JOB_INIT,
>               new InitTransition())
> {code}
> 4.
> then the exception happen from setup(job) in InitTransition#transition before JobSubmittedEvent
is handled.
> JobSubmittedEvent will update the job submit time. Due to the exception, the submit time
is still the initial value -1.
> This is the code InitTransition#transition
> {code}
> public JobStateInternal transition(JobImpl job, JobEvent event) {
>       job.metrics.submittedJob(job);
>       job.metrics.preparingJob(job);
>       if (job.newApiCommitter) {
>         job.jobContext = new JobContextImpl(job.conf, job.oldJobId);
>       } else {
>         job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(job.conf, job.oldJobId);
>       }
>       try {
>         setup(job);
>         job.fs = job.getFileSystem(job.conf);
>         //log to job history
>         JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
>               job.conf.get(MRJobConfig.JOB_NAME, "test"), 
>             job.conf.get(MRJobConfig.USER_NAME, "mapred"),
>             job.appSubmitTime,
>             job.remoteJobConfFile.toString(),
>             job.jobACLs, job.queueName,
>             job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
>             job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
>             job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
>             getWorkflowAdjacencies(job.conf),
>             job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
>         job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
>         //TODO JH Verify jobACLs, UserName via UGI?
>         TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
>         job.numMapTasks = taskSplitMetaInfo.length;
>         job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
>         if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
>           job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
>         } else if (job.numMapTasks == 0) {
>           job.reduceWeight = 0.9f;
>         } else if (job.numReduceTasks == 0) {
>           job.mapWeight = 0.9f;
>         } else {
>           job.mapWeight = job.reduceWeight = 0.45f;
>         }
>         checkTaskLimits();
>         long inputLength = 0;
>         for (int i = 0; i < job.numMapTasks; ++i) {
>           inputLength += taskSplitMetaInfo[i].getInputDataLength();
>         }
>         job.makeUberDecision(inputLength);
>         
>         job.taskAttemptCompletionEvents =
>             new ArrayList<TaskAttemptCompletionEvent>(
>                 job.numMapTasks + job.numReduceTasks + 10);
>         job.mapAttemptCompletionEvents =
>             new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10);
>         job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>(
>             job.numMapTasks + job.numReduceTasks + 10);
>         job.allowedMapFailuresPercent =
>             job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
>         job.allowedReduceFailuresPercent =
>             job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
>         // create the Tasks but don't start them yet
>         createMapTasks(job, inputLength, taskSplitMetaInfo);
>         createReduceTasks(job);
>         job.metrics.endPreparingJob(job);
>         return JobStateInternal.INITED;
>       } catch (Exception e) {
>         LOG.warn("Job init failed", e);
>         job.metrics.endPreparingJob(job);
>         job.addDiagnostic("Job init failed : "
>             + StringUtils.stringifyException(e));
>         // Leave job in the NEW state. The MR AM will detect that the state is
>         // not INITED and send a JOB_INIT_FAILED event.
>         return JobStateInternal.NEW;
>       }
>     }
> {code}
> This is the code JobImpl#setup
> {code}
>     protected void setup(JobImpl job) throws IOException {
>       String oldJobIDString = job.oldJobId.toString();
>       String user = 
>         UserGroupInformation.getCurrentUser().getShortUserName();
>       Path path = MRApps.getStagingAreaDir(job.conf, user);
>       if(LOG.isDebugEnabled()) {
>         LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString);
>       }
>       job.remoteJobSubmitDir =
>           FileSystem.get(job.conf).makeQualified(
>               new Path(path, oldJobIDString));
>       job.remoteJobConfFile =
>           new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
>       // Prepare the TaskAttemptListener server for authentication of Containers
>       // TaskAttemptListener gets the information via jobTokenSecretManager.
>       JobTokenIdentifier identifier =
>           new JobTokenIdentifier(new Text(oldJobIDString));
>       job.jobToken =
>           new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager);
>       job.jobToken.setService(identifier.getJobId());
>       // Add it to the jobTokenSecretManager so that TaskAttemptListener server
>       // can authenticate containers(tasks)
>       job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken);
>       LOG.info("Adding job token for " + oldJobIDString
>           + " to jobTokenSecretManager");
>       // If the job client did not setup the shuffle secret then reuse
>       // the job token secret for the shuffle.
>       if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) {
>         LOG.warn("Shuffle secret key missing from job credentials."
>             + " Using job token secret as shuffle secret.");
>         TokenCache.setShuffleSecretKey(job.jobToken.getPassword(),
>             job.jobCredentials);
>       }
>     }
> {code}
> 5.
> Due to the IOException from  JobImpl#setup, the new job is still at state JobStateInternal.NEW
> {code}
>       } catch (Exception e) {
>         LOG.warn("Job init failed", e);
>         job.metrics.endPreparingJob(job);
>         job.addDiagnostic("Job init failed : "
>             + StringUtils.stringifyException(e));
>         // Leave job in the NEW state. The MR AM will detect that the state is
>         // not INITED and send a JOB_INIT_FAILED event.
>         return JobStateInternal.NEW;
>       }
> {code}
> At the following code of MRAppMaster#serviceStart, The MR AM detect the state is not
INITED and send a JOB_INIT_FAILED event.
> {code}
>       // If job is still not initialized, an error happened during
>       // initialization. Must complete starting all of the services so failure
>       // events can be processed.
>       initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED);
>     if (initFailed) {
>       JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
>       jobEventDispatcher.handle(initFailedEvent);
>     } else {
>       // All components have started, start the job.
>       startJobs();
>     }
> {code}
> 6.
> After JobImpl receives the JOB_INIT_FAILED, it will call InitFailedTransition#transition
and enter state JobStateInternal.FAIL_ABORT
> {code}
>           .addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT,
>               JobEventType.JOB_INIT_FAILED,
>               new InitFailedTransition())
> {code}
> 7.
> JobImpl will send CommitterJobAbortEvent in  InitFailedTransition#transition 
> {code}
>     public void transition(JobImpl job, JobEvent event) {
>         job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
>                 job.jobContext,
>                 org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
>     }
> {code}
> 8.
> CommitterJobAbortEvent will be handled by CommitterEventHandler#handleJobAbort which
will send JobAbortCompletedEvent(JobEventType.JOB_ABORT_COMPLETED)
> {code}
>     protected void handleJobAbort(CommitterJobAbortEvent event) {
>       cancelJobCommit();
>       try {
>         committer.abortJob(event.getJobContext(), event.getFinalState());
>       } catch (Exception e) {
>         LOG.warn("Could not abort job", e);
>       }
>       context.getEventHandler().handle(new JobAbortCompletedEvent(
>           event.getJobID(), event.getFinalState()));
>     }
> {code}
> 9.
> After JobImpl receives the JOB_ABORT_COMPLETED, it will call JobAbortCompletedTransition#transition
and enter state JobStateInternal.FAILED
> {code}
>           .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAILED,
>               JobEventType.JOB_ABORT_COMPLETED,
>               new JobAbortCompletedTransition())
> {code}
> 10.
> JobAbortCompletedTransition#transition will call JobImpl#unsuccessfulFinish which will
send JobUnsuccessfulCompletionEvent with finish time.
> {code}
>     public void transition(JobImpl job, JobEvent event) {
>       JobStateInternal finalState = JobStateInternal.valueOf(
>           ((JobAbortCompletedEvent) event).getFinalState().name());
>       job.unsuccessfulFinish(finalState);
>     }
>   private void unsuccessfulFinish(JobStateInternal finalState) {
>       if (finishTime == 0) setFinishTime();
>       cleanupProgress = 1.0f;
>       JobUnsuccessfulCompletionEvent unsuccessfulJobEvent =
>           new JobUnsuccessfulCompletionEvent(oldJobId,
>               finishTime,
>               succeededMapTaskCount,
>               succeededReduceTaskCount,
>               finalState.toString(),
>               diagnostics);
>       eventHandler.handle(new JobHistoryEvent(jobId,
>           unsuccessfulJobEvent));
>       finished(finalState);
>   }
> {code}
> 11.
> JobUnsuccessfulCompletionEvent will be handled by JobHistoryEventHandler#handleEvent
with type EventType.JOB_FAILED
> Based on the following code, you can see the JobIndexInfo#finishTime is set correctly
but JobIndexInfo#submitTime and  JobIndexInfo#jobStartTime are still -1.
> {code}
>       if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED
>           || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) {
>         try {
>           JobUnsuccessfulCompletionEvent jucEvent = 
>               (JobUnsuccessfulCompletionEvent) event
>               .getHistoryEvent();
>           mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
>           mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
>           mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
>           mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
>           closeEventWriter(event.getJobID());
>           processDoneFiles(event.getJobID());
>         } catch (IOException e) {
>           throw new YarnRuntimeException(e);
>         }
>       }
> {code}
> The error job history file name in our log is "job_1418398645407_115853--1-worun-kafka%2Dto%2Dhdfs%5Btwo%5D%5B15+topic%28s%29%5D-1423572836007-0-0-FAILED-root.journaling-1423572836007.jhist"
> Based on the filename, you can see submitTime is -1, finishTime is 1423572836007 and
jobStartTime is 1423572836007.
> The jobStartTime is not -1, and  jobStartTime is the same as  finishTime.
> It is because jobStartTime is handled specially in FileNameIndexUtils#getDoneFileName:
> {code}
>     //JobStartTime
>     if (indexInfo.getJobStartTime() >= 0) {
>       sb.append(indexInfo.getJobStartTime());
>     } else {
>       sb.append(indexInfo.getFinishTime());
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message