sqoop-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Veena Basavaraj (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SQOOP-1510) Refactor JobRequestHandler for submit/abort job and SubmissionHandler for get operation only
Date Fri, 31 Oct 2014 15:00:50 GMT

     [ https://issues.apache.org/jira/browse/SQOOP-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Veena Basavaraj updated SQOOP-1510:
-----------------------------------
    Description: 
PROPOSAL JIST:

1. Moves the actions of job submit/ status of current running job and stopping currently running
job to the JOB resource . A submit action creates a submission record. To be consistent with
the v1/connectors and v1/links we also have v1/jobs to display all jobs abd v1/jobs? cname
by connector

Also proposing ( note this can be another RB if it is hard to review the current one )
 START-> renamed to SUBMIT 
 STOP -> renamed to abort 
2. Submission is a read only resource that can give all the submissions or submissions per
job. Since it is collection resource it is uses v1/submissions to be consistent with v1/connectors.
v1/links and v1/jobs.
3. Changes to the client/ shell to reflect the 1 and 2


PROPOSAL  DETAILS


Adding more details to the ticket to explain how the Job/ Submission are structured in the
code.

In sqoop we will create a job  giving it the FROM and the TO link ids

Something like this ...
{code}
create job -fromLink 1 -toLink 2
{code}

A job in SQOOP is a representation of all the data required for the JOB we will submit to
the execution engine. hence the job in sqoop will hold the FROM link and FROM connector  and
TO link and  its corresponding TO connector details. It will hold all its corresponding config
values to invoke the FROM connector code and the TO connector code as part of the job lifecycle
( init, partitions, load, extract, destroy) steps. 

So once the job is created, we can perform these 4 actions on it

1. disable / re-enable it 
2. submit the job to the excution engine
3. at any point while it is running, abort it ( we can also call this stop if we want to)

All of the below is handled by the 
{code}JobManager{code} is an internal class that receives the requests for doing operations
on the job resource.

So what does submit really do?

1. create a job request for the execution engine. This is a UBER object that holds all the
information I just spoke about earlier, like FROM connector and TO connector details. It also
holds a reference to the submission object.  Submission object holds the results of the submit
action. A new submission record is persisted into the SQOOP repository every time we call
a job submit. This is representated as {code} MSubmission {code} internally.  

{code}
    // Bootstrap job to execute in the configured execution engine
    prepareJob(jobRequest);

{code}

2. We then call the submissionEngine api to submit,  that will inturn  choose the configured
execution engine ( such as MR or spark in future) and then submit to it. 
{code}
      boolean success = submissionEngine.submit(jobRequest);

{code}

{code}
/ If we're in local mode than wait on completion. Local job runner do not
      // seems to be exposing API to get previously submitted job which makes
      // other methods of the submission engine quite useless.
      if(isLocal()) {
        job.waitForCompletion(true);
      } else {
        job.submit();
      }

{code}
For consistency we call the submission and execution apis as submit since MR engine and spark
also use submit

3. Once we succeed we persist the record in the Sqoop repo.
{code}
      RepositoryManager.getInstance().getRepository().createSubmission(mJobSubmission);
{code}

4. If execution engine failed, we call some clean up code, but still persist the submission
record in repo to record the "submit" action invoked from the client/ rest api

{code}
 public MSubmission submit(long jobId, HttpEventContext ctx) {

    MSubmission mJobSubmission = createJobSubmission(ctx, jobId);
    JobRequest jobRequest = createJobRequest(jobId, mJobSubmission);
    // Bootstrap job to execute in the configured execution engine
    prepareJob(jobRequest);
    // Make sure that this job id is not currently running and submit the job
    // only if it's not.
    synchronized (getClass()) {
      MSubmission lastSubmission = RepositoryManager.getInstance().getRepository()
          .findLastSubmissionForJob(jobId);
      if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
        throw new SqoopException(DriverError.DRIVER_0002, "Job with id " + jobId);
      }
      // TODO(jarcec): We might need to catch all exceptions here to ensure
      // that Destroyer will be executed in all cases.
      
      // NOTE: the following is a blocking call
      boolean success = submissionEngine.submit(jobRequest);
      if (!success) {
        cleanUpOnJobSubmissionFailure(jobRequest);
        mJobSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
      }
      // persist submisison record to repository and success status
      RepositoryManager.getInstance().getRepository().createSubmission(mJobSubmission);
    }
    return mJobSubmission;
  }

{code}

Next, What happens if we want to stop/abort the running job. 

1. The stop action is a nooperation if there is no running job currently. We have an api that
tells the last job submitted and we use it to check if the a requested job is in running state.

{code}
 // remove the last running job submission
    MSubmission mSubmission = repository.findLastSubmissionForJob(jobId);

    if (mSubmission == null || !mSubmission.getStatus().isRunning()) {
      throw new SqoopException(DriverError.DRIVER_0003, "Job with id " + jobId
          + " is not running hence cannot stop");
    }
   
{code}

2. If a job was running, we call the submission engine api to stop which then calls the corresponding
execution engine apis "kill Job"

{code}     submissionEngine.stop(mSubmission.getExternalId()); {code} For consistency we call
all the apis as stop /abort
{code}RunningJob runningJob = jobClient.getJob(JobID.forName(submissionId));
      if(runningJob == null) {
        return;
      }

      runningJob.killJob(); {code}

3.  Finally we update the status of the submission in the repo by calling {code}updateSubmission{code}

So in summary the actions on JOB will be reflected on the job resource

The job will have enable/ disable/ submit / abort(stop) Also get on the status and the job
details

Submission is a read only resource which represents the side affect of submitting a job to
the sqoop execution engine

While we change the rest apis to support the above, we also have to make the corresponsing
changes in the sqoop client.

Here is the details of the sqoop client. {code}SqoopClient{code} will support the submit/
abort(stop) a job

1. The client has limited visibility into the job submission.  After a job submit command
is issued it can monitor the status of the job that is submitted. 

Hence we have this enum JobSubmissionStatus, at any point we will reply back with one of the
following 3 states. Started means it just got kicked off. Updated mean 
{code}

  /**
   * Status flags used when updating the job submission callback status
   */
  private enum JobSubmissionStatus {
    STARTED,
    UPDATED,
    FINISHED
  }

{code}

2. The client will call the {code}SqoopResourceRequest {code}method to make a rest call for
the "submit" action and get back a submission record. It can use this submission record to
query the status if need be and get a more real-time feedback to client on the status of the
job using submisison callback logic.

{code}
    MSubmission submission = resourceRequests.submitJob(jobId).getSubmissions().get(0);

{code}

{code}
  public MSubmission submitJob(long jobId, SubmissionCallback callback, long pollTime)
      throws InterruptedException {
    if(pollTime <= 0) {
      throw new SqoopException(ClientError.CLIENT_0002);
    }
    boolean started = true;
    MSubmission submission = resourceRequests.submitJob(jobId).getSubmissions().get(0);
    while(submission.getStatus().isRunning()) {
      if(started) {
        submissionCallback(callback, submission, JobSubmissionStatus.STARTED);
        started = false;
      } else {
        submissionCallback(callback, submission, JobSubmissionStatus.UPDATED);
      }
      Thread.sleep(pollTime);
      submission = getJobStatus(jobId);
    }
    submissionCallback(callback, submission, JobSubmissionStatus.FINISHED);
    return submission;
  }
{code}

3. Similarly for the stop action it calls the JobResoource to make a rest call to the server
for stopping the job

{code}

  public MSubmission stopJob(long jid) {
    return resourceRequests.stopJob(jid).getSubmissions().get(0);
  }
{code}


Why is the action called SUBMIT/ instead of START?

SUBMIT is a common term in both MR and SPARK execution engine, Also as a side affect of SUBMIT
we create a SUBMISSION record and store it in the repository.


There are tons of places in the code and in the java docs, we actually mean submit when we
say start in the current code. So why not just call it submit.

  DRIVER_0008("Invalid combination of submission and execution engines"),

  DRIVER_0009("Job has been disabled. Cannot submit this job."),

  DRIVER_0010("Link for this job has been disabled. Cannot submit this job."),

  DRIVER_0011("Connector does not support specified direction. Cannot submit this job."),


  was:
PROPOSAL JIST:

1. Moves the actions of job submit/ status of current running job and stopping currently running
job to the JOB resource . A submit action creates a submission record.

Also proposing ( note this can be another RB if it is hard to review the current one )
 START-> renamed to SUBMIT 
 STOP -> renamed to abort 
2. Submission is a read only resource that can give all the submissions or submissions per
job
3. Changes to the client/ shell to reflect the 1 and 2


PROPOSAL  DETAILS


Adding more details to the ticket to explain how the Job/ Submission are structured in the
code.

In sqoop we will create a job  giving it the FROM and the TO link ids

Something like this ...
{code}
create job -fromLink 1 -toLink 2
{code}

A job in SQOOP is a representation of all the data required for the JOB we will submit to
the execution engine. hence the job in sqoop will hold the FROM link and FROM connector  and
TO link and  its corresponding TO connector details. It will hold all its corresponding config
values to invoke the FROM connector code and the TO connector code as part of the job lifecycle
( init, partitions, load, extract, destroy) steps. 

So once the job is created, we can perform these 4 actions on it

1. disable / re-enable it 
2. submit the job to the excution engine
3. at any point while it is running, abort it ( we can also call this stop if we want to)

All of the below is handled by the 
{code}JobManager{code} is an internal class that receives the requests for doing operations
on the job resource.

So what does submit really do?

1. create a job request for the execution engine. This is a UBER object that holds all the
information I just spoke about earlier, like FROM connector and TO connector details. It also
holds a reference to the submission object.  Submission object holds the results of the submit
action. A new submission record is persisted into the SQOOP repository every time we call
a job submit. This is representated as {code} MSubmission {code} internally.  

{code}
    // Bootstrap job to execute in the configured execution engine
    prepareJob(jobRequest);

{code}

2. We then call the submissionEngine api to submit,  that will inturn  choose the configured
execution engine ( such as MR or spark in future) and then submit to it. 
{code}
      boolean success = submissionEngine.submit(jobRequest);

{code}

{code}
/ If we're in local mode than wait on completion. Local job runner do not
      // seems to be exposing API to get previously submitted job which makes
      // other methods of the submission engine quite useless.
      if(isLocal()) {
        job.waitForCompletion(true);
      } else {
        job.submit();
      }

{code}
For consistency we call the submission and execution apis as submit since MR engine and spark
also use submit

3. Once we succeed we persist the record in the Sqoop repo.
{code}
      RepositoryManager.getInstance().getRepository().createSubmission(mJobSubmission);
{code}

4. If execution engine failed, we call some clean up code, but still persist the submission
record in repo to record the "submit" action invoked from the client/ rest api

{code}
 public MSubmission submit(long jobId, HttpEventContext ctx) {

    MSubmission mJobSubmission = createJobSubmission(ctx, jobId);
    JobRequest jobRequest = createJobRequest(jobId, mJobSubmission);
    // Bootstrap job to execute in the configured execution engine
    prepareJob(jobRequest);
    // Make sure that this job id is not currently running and submit the job
    // only if it's not.
    synchronized (getClass()) {
      MSubmission lastSubmission = RepositoryManager.getInstance().getRepository()
          .findLastSubmissionForJob(jobId);
      if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
        throw new SqoopException(DriverError.DRIVER_0002, "Job with id " + jobId);
      }
      // TODO(jarcec): We might need to catch all exceptions here to ensure
      // that Destroyer will be executed in all cases.
      
      // NOTE: the following is a blocking call
      boolean success = submissionEngine.submit(jobRequest);
      if (!success) {
        cleanUpOnJobSubmissionFailure(jobRequest);
        mJobSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
      }
      // persist submisison record to repository and success status
      RepositoryManager.getInstance().getRepository().createSubmission(mJobSubmission);
    }
    return mJobSubmission;
  }

{code}

Next, What happens if we want to stop/abort the running job. 

1. The stop action is a nooperation if there is no running job currently. We have an api that
tells the last job submitted and we use it to check if the a requested job is in running state.

{code}
 // remove the last running job submission
    MSubmission mSubmission = repository.findLastSubmissionForJob(jobId);

    if (mSubmission == null || !mSubmission.getStatus().isRunning()) {
      throw new SqoopException(DriverError.DRIVER_0003, "Job with id " + jobId
          + " is not running hence cannot stop");
    }
   
{code}

2. If a job was running, we call the submission engine api to stop which then calls the corresponding
execution engine apis "kill Job"

{code}     submissionEngine.stop(mSubmission.getExternalId()); {code} For consistency we call
all the apis as stop /abort
{code}RunningJob runningJob = jobClient.getJob(JobID.forName(submissionId));
      if(runningJob == null) {
        return;
      }

      runningJob.killJob(); {code}

3.  Finally we update the status of the submission in the repo by calling {code}updateSubmission{code}

So in summary the actions on JOB will be reflected on the job resource

The job will have enable/ disable/ submit / abort(stop) Also get on the status and the job
details

Submission is a read only resource which represents the side affect of submitting a job to
the sqoop execution engine

While we change the rest apis to support the above, we also have to make the corresponsing
changes in the sqoop client.

Here is the details of the sqoop client. {code}SqoopClient{code} will support the submit/
abort(stop) a job

1. The client has limited visibility into the job submission.  After a job submit command
is issued it can monitor the status of the job that is submitted. 

Hence we have this enum JobSubmissionStatus, at any point we will reply back with one of the
following 3 states. Started means it just got kicked off. Updated mean 
{code}

  /**
   * Status flags used when updating the job submission callback status
   */
  private enum JobSubmissionStatus {
    STARTED,
    UPDATED,
    FINISHED
  }

{code}

2. The client will call the {code}SqoopResourceRequest {code}method to make a rest call for
the "submit" action and get back a submission record. It can use this submission record to
query the status if need be and get a more real-time feedback to client on the status of the
job using submisison callback logic.

{code}
    MSubmission submission = resourceRequests.submitJob(jobId).getSubmissions().get(0);

{code}

{code}
  public MSubmission submitJob(long jobId, SubmissionCallback callback, long pollTime)
      throws InterruptedException {
    if(pollTime <= 0) {
      throw new SqoopException(ClientError.CLIENT_0002);
    }
    boolean started = true;
    MSubmission submission = resourceRequests.submitJob(jobId).getSubmissions().get(0);
    while(submission.getStatus().isRunning()) {
      if(started) {
        submissionCallback(callback, submission, JobSubmissionStatus.STARTED);
        started = false;
      } else {
        submissionCallback(callback, submission, JobSubmissionStatus.UPDATED);
      }
      Thread.sleep(pollTime);
      submission = getJobStatus(jobId);
    }
    submissionCallback(callback, submission, JobSubmissionStatus.FINISHED);
    return submission;
  }
{code}

3. Similarly for the stop action it calls the JobResoource to make a rest call to the server
for stopping the job

{code}

  public MSubmission stopJob(long jid) {
    return resourceRequests.stopJob(jid).getSubmissions().get(0);
  }
{code}


Why is the action called SUBMIT/ instead of START?

SUBMIT is a common term in both MR and SPARK execution engine, Also as a side affect of SUBMIT
we create a SUBMISSION record and store it in the repository.


There are tons of places in the code and in the java docs, we actually mean submit when we
say start in the current code. So why not just call it submit.

  DRIVER_0008("Invalid combination of submission and execution engines"),

  DRIVER_0009("Job has been disabled. Cannot submit this job."),

  DRIVER_0010("Link for this job has been disabled. Cannot submit this job."),

  DRIVER_0011("Connector does not support specified direction. Cannot submit this job."),



> Refactor JobRequestHandler for submit/abort job and SubmissionHandler for get operation
only
> --------------------------------------------------------------------------------------------
>
>                 Key: SQOOP-1510
>                 URL: https://issues.apache.org/jira/browse/SQOOP-1510
>             Project: Sqoop
>          Issue Type: Sub-task
>            Reporter: Veena Basavaraj
>            Assignee: Veena Basavaraj
>
> PROPOSAL JIST:
> 1. Moves the actions of job submit/ status of current running job and stopping currently
running job to the JOB resource . A submit action creates a submission record. To be consistent
with the v1/connectors and v1/links we also have v1/jobs to display all jobs abd v1/jobs?
cname by connector
> Also proposing ( note this can be another RB if it is hard to review the current one
)
>  START-> renamed to SUBMIT 
>  STOP -> renamed to abort 
> 2. Submission is a read only resource that can give all the submissions or submissions
per job. Since it is collection resource it is uses v1/submissions to be consistent with v1/connectors.
v1/links and v1/jobs.
> 3. Changes to the client/ shell to reflect the 1 and 2
> PROPOSAL  DETAILS
> Adding more details to the ticket to explain how the Job/ Submission are structured in
the code.
> In sqoop we will create a job  giving it the FROM and the TO link ids
> Something like this ...
> {code}
> create job -fromLink 1 -toLink 2
> {code}
> A job in SQOOP is a representation of all the data required for the JOB we will submit
to the execution engine. hence the job in sqoop will hold the FROM link and FROM connector
 and TO link and  its corresponding TO connector details. It will hold all its corresponding
config values to invoke the FROM connector code and the TO connector code as part of the job
lifecycle ( init, partitions, load, extract, destroy) steps. 
> So once the job is created, we can perform these 4 actions on it
> 1. disable / re-enable it 
> 2. submit the job to the excution engine
> 3. at any point while it is running, abort it ( we can also call this stop if we want
to)
> All of the below is handled by the 
> {code}JobManager{code} is an internal class that receives the requests for doing operations
on the job resource.
> So what does submit really do?
> 1. create a job request for the execution engine. This is a UBER object that holds all
the information I just spoke about earlier, like FROM connector and TO connector details.
It also holds a reference to the submission object.  Submission object holds the results of
the submit action. A new submission record is persisted into the SQOOP repository every time
we call a job submit. This is representated as {code} MSubmission {code} internally.  
> {code}
>     // Bootstrap job to execute in the configured execution engine
>     prepareJob(jobRequest);
> {code}
> 2. We then call the submissionEngine api to submit,  that will inturn  choose the configured
execution engine ( such as MR or spark in future) and then submit to it. 
> {code}
>       boolean success = submissionEngine.submit(jobRequest);
> {code}
> {code}
> / If we're in local mode than wait on completion. Local job runner do not
>       // seems to be exposing API to get previously submitted job which makes
>       // other methods of the submission engine quite useless.
>       if(isLocal()) {
>         job.waitForCompletion(true);
>       } else {
>         job.submit();
>       }
> {code}
> For consistency we call the submission and execution apis as submit since MR engine and
spark also use submit
> 3. Once we succeed we persist the record in the Sqoop repo.
> {code}
>       RepositoryManager.getInstance().getRepository().createSubmission(mJobSubmission);
> {code}
> 4. If execution engine failed, we call some clean up code, but still persist the submission
record in repo to record the "submit" action invoked from the client/ rest api
> {code}
>  public MSubmission submit(long jobId, HttpEventContext ctx) {
>     MSubmission mJobSubmission = createJobSubmission(ctx, jobId);
>     JobRequest jobRequest = createJobRequest(jobId, mJobSubmission);
>     // Bootstrap job to execute in the configured execution engine
>     prepareJob(jobRequest);
>     // Make sure that this job id is not currently running and submit the job
>     // only if it's not.
>     synchronized (getClass()) {
>       MSubmission lastSubmission = RepositoryManager.getInstance().getRepository()
>           .findLastSubmissionForJob(jobId);
>       if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
>         throw new SqoopException(DriverError.DRIVER_0002, "Job with id " + jobId);
>       }
>       // TODO(jarcec): We might need to catch all exceptions here to ensure
>       // that Destroyer will be executed in all cases.
>       
>       // NOTE: the following is a blocking call
>       boolean success = submissionEngine.submit(jobRequest);
>       if (!success) {
>         cleanUpOnJobSubmissionFailure(jobRequest);
>         mJobSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
>       }
>       // persist submisison record to repository and success status
>       RepositoryManager.getInstance().getRepository().createSubmission(mJobSubmission);
>     }
>     return mJobSubmission;
>   }
> {code}
> Next, What happens if we want to stop/abort the running job. 
> 1. The stop action is a nooperation if there is no running job currently. We have an
api that tells the last job submitted and we use it to check if the a requested job is in
running state.
> {code}
>  // remove the last running job submission
>     MSubmission mSubmission = repository.findLastSubmissionForJob(jobId);
>     if (mSubmission == null || !mSubmission.getStatus().isRunning()) {
>       throw new SqoopException(DriverError.DRIVER_0003, "Job with id " + jobId
>           + " is not running hence cannot stop");
>     }
>    
> {code}
> 2. If a job was running, we call the submission engine api to stop which then calls the
corresponding execution engine apis "kill Job"
> {code}     submissionEngine.stop(mSubmission.getExternalId()); {code} For consistency
we call all the apis as stop /abort
> {code}RunningJob runningJob = jobClient.getJob(JobID.forName(submissionId));
>       if(runningJob == null) {
>         return;
>       }
>       runningJob.killJob(); {code}
> 3.  Finally we update the status of the submission in the repo by calling {code}updateSubmission{code}
> So in summary the actions on JOB will be reflected on the job resource
> The job will have enable/ disable/ submit / abort(stop) Also get on the status and the
job details
> Submission is a read only resource which represents the side affect of submitting a job
to the sqoop execution engine
> While we change the rest apis to support the above, we also have to make the corresponsing
changes in the sqoop client.
> Here is the details of the sqoop client. {code}SqoopClient{code} will support the submit/
abort(stop) a job
> 1. The client has limited visibility into the job submission.  After a job submit command
is issued it can monitor the status of the job that is submitted. 
> Hence we have this enum JobSubmissionStatus, at any point we will reply back with one
of the following 3 states. Started means it just got kicked off. Updated mean 
> {code}
>   /**
>    * Status flags used when updating the job submission callback status
>    */
>   private enum JobSubmissionStatus {
>     STARTED,
>     UPDATED,
>     FINISHED
>   }
> {code}
> 2. The client will call the {code}SqoopResourceRequest {code}method to make a rest call
for the "submit" action and get back a submission record. It can use this submission record
to query the status if need be and get a more real-time feedback to client on the status of
the job using submisison callback logic.
> {code}
>     MSubmission submission = resourceRequests.submitJob(jobId).getSubmissions().get(0);
> {code}
> {code}
>   public MSubmission submitJob(long jobId, SubmissionCallback callback, long pollTime)
>       throws InterruptedException {
>     if(pollTime <= 0) {
>       throw new SqoopException(ClientError.CLIENT_0002);
>     }
>     boolean started = true;
>     MSubmission submission = resourceRequests.submitJob(jobId).getSubmissions().get(0);
>     while(submission.getStatus().isRunning()) {
>       if(started) {
>         submissionCallback(callback, submission, JobSubmissionStatus.STARTED);
>         started = false;
>       } else {
>         submissionCallback(callback, submission, JobSubmissionStatus.UPDATED);
>       }
>       Thread.sleep(pollTime);
>       submission = getJobStatus(jobId);
>     }
>     submissionCallback(callback, submission, JobSubmissionStatus.FINISHED);
>     return submission;
>   }
> {code}
> 3. Similarly for the stop action it calls the JobResoource to make a rest call to the
server for stopping the job
> {code}
>   public MSubmission stopJob(long jid) {
>     return resourceRequests.stopJob(jid).getSubmissions().get(0);
>   }
> {code}
> Why is the action called SUBMIT/ instead of START?
> SUBMIT is a common term in both MR and SPARK execution engine, Also as a side affect
of SUBMIT we create a SUBMISSION record and store it in the repository.
> There are tons of places in the code and in the java docs, we actually mean submit when
we say start in the current code. So why not just call it submit.
>   DRIVER_0008("Invalid combination of submission and execution engines"),
>   DRIVER_0009("Job has been disabled. Cannot submit this job."),
>   DRIVER_0010("Link for this job has been disabled. Cannot submit this job."),
>   DRIVER_0011("Connector does not support specified direction. Cannot submit this job."),



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message