hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r784771 [2/2] - in /hadoop/core/branches/HADOOP-3628-2: ./ src/c++/task-controller/ src/core/org/apache/hadoop/fs/kfs/ src/core/org/apache/hadoop/util/ src/docs/src/documentation/content/xdocs/ src/examples/org/apache/hadoop/examples/ src/m...
Date Mon, 15 Jun 2009 13:23:46 GMT
Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/jobcontrol/Job.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/jobcontrol/Job.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/jobcontrol/Job.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/jobcontrol/Job.java Mon Jun 15 13:23:44 2009
@@ -22,167 +22,62 @@
 import java.io.IOException;
 import java.util.ArrayList;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
 
-/** This class encapsulates a MapReduce job and its dependency. It monitors 
- *  the states of the depending jobs and updates the state of this job.
- *  A job starts in the WAITING state. If it does not have any depending jobs, or
- *  all of the depending jobs are in SUCCESS state, then the job state will become
- *  READY. If any depending jobs fail, the job will fail too. 
- *  When in READY state, the job can be submitted to Hadoop for execution, with
- *  the state changing into RUNNING state. From RUNNING state, the job can get into 
- *  SUCCESS or FAILED state, depending the status of the job execution.
- *  
+/** 
+ * @deprecated Use {@link ControlledJob} instead.  
  */
+@Deprecated
+public class Job extends ControlledJob {
+  static final Log LOG = LogFactory.getLog(Job.class);
 
-public class Job {
-
-  // A job will be in one of the following states
   final public static int SUCCESS = 0;
   final public static int WAITING = 1;
   final public static int RUNNING = 2;
   final public static int READY = 3;
   final public static int FAILED = 4;
   final public static int DEPENDENT_FAILED = 5;
-	
-	
-  private JobConf theJobConf;
-  private int state;
-  private String jobID; 		// assigned and used by JobControl class
-  private JobID mapredJobID; // the job ID assigned by map/reduce
-  private String jobName;		// external name, assigned/used by client app
-  private String message;		// some info for human consumption, 
-  // e.g. the reason why the job failed
-  private ArrayList<Job> dependingJobs;	// the jobs the current job depends on
-	
-  private JobClient jc = null;		// the map reduce job client
-	
+
   /** 
    * Construct a job.
    * @param jobConf a mapred job configuration representing a job to be executed.
    * @param dependingJobs an array of jobs the current job depends on
    */
-  public Job(JobConf jobConf, ArrayList<Job> dependingJobs) throws IOException {
-    this.theJobConf = jobConf;
-    this.dependingJobs = dependingJobs;
-    this.state = Job.WAITING;
-    this.jobID = "unassigned";
-    this.mapredJobID = null; //not yet assigned 
-    this.jobName = "unassigned";
-    this.message = "just initialized";
-    this.jc = new JobClient(jobConf);
-  }
-  
-  /**
-   * Construct a job.
-   * 
-   * @param jobConf mapred job configuration representing a job to be executed.
-   * @throws IOException
-   */
-  public Job(JobConf jobConf) throws IOException {
-    this(jobConf, null);
-  }
-	
-  @Override
-  public String toString() {
-    StringBuffer sb = new StringBuffer();
-    sb.append("job name:\t").append(this.jobName).append("\n");
-    sb.append("job id:\t").append(this.jobID).append("\n");
-    sb.append("job state:\t").append(this.state).append("\n");
-    sb.append("job mapred id:\t").append(this.mapredJobID==null ? "unassigned" 
-        : this.mapredJobID).append("\n");
-    sb.append("job message:\t").append(this.message).append("\n");
-		
-    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
-      sb.append("job has no depending job:\t").append("\n");
-    } else {
-      sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:\n");
-      for (int i = 0; i < this.dependingJobs.size(); i++) {
-        sb.append("\t depending job ").append(i).append(":\t");
-        sb.append((this.dependingJobs.get(i)).getJobName()).append("\n");
-      }
-    }
-    return sb.toString();
-  }
-	
-  /**
-   * @return the job name of this job
-   */
-  public String getJobName() {
-    return this.jobName;
-  }
-	
-  /**
-   * Set the job name for  this job.
-   * @param jobName the job name
-   */
-  public void setJobName(String jobName) {
-    this.jobName = jobName;
+  public Job(JobConf jobConf, ArrayList<?> dependingJobs) throws IOException {
+    super(new org.apache.hadoop.mapreduce.Job(jobConf), 
+          (ArrayList<ControlledJob>) dependingJobs);
   }
-	
-  /**
-   * @return the job ID of this job assigned by JobControl
-   */
-  public String getJobID() {
-    return this.jobID;
-  }
-	
-  /**
-   * Set the job ID for  this job.
-   * @param id the job ID
-   */
-  public void setJobID(String id) {
-    this.jobID = id;
-  }
-	
-  /**
-   * @return the mapred ID of this job
-   * @deprecated use {@link #getAssignedJobID()} instead
-   */
-  @Deprecated
-  public String getMapredJobID() {
-    return this.mapredJobID.toString();
-  }
-	
-  /**
-   * Set the mapred ID for this job.
-   * @param mapredJobID the mapred job ID for this job.
-   * @deprecated use {@link #setAssignedJobID(JobID)} instead
-   */
-  @Deprecated
-  public void setMapredJobID(String mapredJobID) {
-    this.mapredJobID = JobID.forName(mapredJobID);
+
+  public Job(JobConf conf) throws IOException {
+    super(conf);
   }
-	
+
   /**
    * @return the mapred ID of this job as assigned by the 
    * mapred framework.
    */
   public JobID getAssignedJobID() {
-    return this.mapredJobID;
+    return (JobID)super.getMapredJobID();
   }
-  
+
   /**
-   * Set the mapred ID for this job as assigned by the 
-   * mapred framework.
-   * @param mapredJobID the mapred job ID for this job.
+   * @deprecated setAssignedJobID should not be called.
+   * JOBID is set by the framework.
    */
   public void setAssignedJobID(JobID mapredJobID) {
-    this.mapredJobID = mapredJobID;
+    // do nothing
   }
-  
+
   /**
    * @return the mapred job conf of this job
    */
   public synchronized JobConf getJobConf() {
-    return this.theJobConf;
+    return new JobConf(super.getJob().getConfiguration());
   }
 
 
@@ -191,197 +86,55 @@
    * @param jobConf the mapred job conf for this job.
    */
   public synchronized void setJobConf(JobConf jobConf) {
-    this.theJobConf = jobConf;
+    try {
+      super.setJob(new org.apache.hadoop.mapreduce.Job(jobConf));
+    } catch (IOException ioe) { 
+      LOG.info("Exception" + ioe);
+    }
   }
 
   /**
    * @return the state of this job
    */
   public synchronized int getState() {
-    return this.state;
-  }
-	
-  /**
-   * Set the state for this job.
-   * @param state the new state for this job.
-   */
-  protected synchronized void setState(int state) {
-    this.state = state;
-  }
-	
-  /**
-   * @return the message of this job
-   */
-  public synchronized String getMessage() {
-    return this.message;
-  }
-
-  /**
-   * Set the message for this job.
-   * @param message the message for this job.
-   */
-  public synchronized void setMessage(String message) {
-    this.message = message;
-  }
-
-
-  /**
-   * @return the job client of this job
-   */
-  public JobClient getJobClient(){
-          return this.jc;
-  }
-
-  /**
-   * @return the depending jobs of this job
-   */
-  public ArrayList<Job> getDependingJobs() {
-    return this.dependingJobs;
-  }
-  
-  /**
-   * Add a job to this jobs' dependency list. Dependent jobs can only be added while a Job 
-   * is waiting to run, not during or afterwards.
-   * 
-   * @param dependingJob Job that this Job depends on.
-   * @return <tt>true</tt> if the Job was added.
-   */
-  public synchronized boolean addDependingJob(Job dependingJob) {
-    if (this.state == Job.WAITING) { //only allowed to add jobs when waiting
-      if (this.dependingJobs == null) {
-        this.dependingJobs = new ArrayList<Job>();
-      }
-      return this.dependingJobs.add(dependingJob);
-    } else {
-      return false;
+    State state = super.getJobState();
+    if (state == State.SUCCESS) {
+      return SUCCESS;
+    } 
+    if (state == State.WAITING) {
+      return WAITING;
+    }
+    if (state == State.RUNNING) {
+      return RUNNING;
+    }
+    if (state == State.READY) {
+      return READY;
     }
+    if (state == State.FAILED ) {
+      return FAILED;
+    }
+    if (state == State.DEPENDENT_FAILED ) {
+      return DEPENDENT_FAILED;
+    }
+    return -1;
   }
-	
-  /**
-   * @return true if this job is in a complete state
-   */
-  public synchronized boolean isCompleted() {
-    return this.state == Job.FAILED || 
-      this.state == Job.DEPENDENT_FAILED ||
-      this.state == Job.SUCCESS;
-  }
-	
-  /**
-   * @return true if this job is in READY state
-   */
-  public synchronized boolean isReady() {
-    return this.state == Job.READY;
-  }
-	
+  
   /**
-   * Check the state of this running job. The state may 
-   * remain the same, become SUCCESS or FAILED.
+   * @return the job client of this job
    */
-  private void checkRunningState() {
-    RunningJob running = null;
+  public JobClient getJobClient() {
     try {
-      running = jc.getJob(this.mapredJobID);
-      if (running.isComplete()) {
-        if (running.isSuccessful()) {
-          this.state = Job.SUCCESS;
-        } else {
-          this.state = Job.FAILED;
-          this.message = "Job failed!";
-          try {
-            running.killJob();
-          } catch (IOException e1) {
-
-          }
-          try {
-            this.jc.close();
-          } catch (IOException e2) {
-
-          }
-        }
-      }
-
+      return new JobClient(super.getJob().getConfiguration());
     } catch (IOException ioe) {
-      this.state = Job.FAILED;
-      this.message = StringUtils.stringifyException(ioe);
-      try {
-        if (running != null)
-          running.killJob();
-      } catch (IOException e1) {
-
-      }
-      try {
-        this.jc.close();
-      } catch (IOException e1) {
-
-      }
+      return null;
     }
   }
-	
-  /**
-   * Check and update the state of this job. The state changes  
-   * depending on its current state and the states of the depending jobs.
-   */
-   synchronized int checkState() {
-    if (this.state == Job.RUNNING) {
-      checkRunningState();
-    }
-    if (this.state != Job.WAITING) {
-      return this.state;
-    }
-    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
-      this.state = Job.READY;
-      return this.state;
-    }
-    Job pred = null;
-    int n = this.dependingJobs.size();
-    for (int i = 0; i < n; i++) {
-      pred = this.dependingJobs.get(i);
-      int s = pred.checkState();
-      if (s == Job.WAITING || s == Job.READY || s == Job.RUNNING) {
-        break; // a pred is still not completed, continue in WAITING
-        // state
-      }
-      if (s == Job.FAILED || s == Job.DEPENDENT_FAILED) {
-        this.state = Job.DEPENDENT_FAILED;
-        this.message = "depending job " + i + " with jobID "
-          + pred.getJobID() + " failed. " + pred.getMessage();
-        break;
-      }
-      // pred must be in success state
-      if (i == n - 1) {
-        this.state = Job.READY;
-      }
-    }
 
-    return this.state;
-  }
-	
   /**
-   * Submit this job to mapred. The state becomes RUNNING if submission 
-   * is successful, FAILED otherwise.  
+   * @return the depending jobs of this job
    */
-  protected synchronized void submit() {
-    try {
-      if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
-        FileSystem fs = FileSystem.get(theJobConf);
-        Path inputPaths[] = FileInputFormat.getInputPaths(theJobConf);
-        for (int i = 0; i < inputPaths.length; i++) {
-          if (!fs.exists(inputPaths[i])) {
-            try {
-              fs.mkdirs(inputPaths[i]);
-            } catch (IOException e) {
-
-            }
-          }
-        }
-      }
-      RunningJob running = jc.submitJob(theJobConf);
-      this.mapredJobID = running.getID();
-      this.state = Job.RUNNING;
-    } catch (IOException ioe) {
-      this.state = Job.FAILED;
-      this.message = StringUtils.stringifyException(ioe);
-    }
+  public ArrayList<Job> getDependingJobs() {
+    return JobControl.castToJobList(super.getDependentJobs());
   }
-	
+
 }

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/jobcontrol/JobControl.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/jobcontrol/JobControl.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/jobcontrol/JobControl.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/jobcontrol/JobControl.java Mon Jun 15 13:23:44 2009
@@ -20,279 +20,97 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Hashtable;
-import java.util.Map;
 
-/** This class encapsulates a set of MapReduce jobs and its dependency. It tracks 
- *  the states of the jobs by placing them into different tables according to their 
- *  states. 
- *  
- *  This class provides APIs for the client app to add a job to the group and to get 
- *  the jobs in the group in different states. When a 
- *  job is added, an ID unique to the group is assigned to the job. 
- *  
- *  This class has a thread that submits jobs when they become ready, monitors the
- *  states of the running jobs, and updates the states of jobs based on the state changes 
- *  of their depending jobs states. The class provides APIs for suspending/resuming
- *  the thread,and for stopping the thread.
- *  
- */
-public class JobControl implements Runnable{
+import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
+
+/**
+ *@deprecated Use 
+ *{@link org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl} instead
+ **/
+@Deprecated
+public class JobControl extends 
+    org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl {
 
-  // The thread can be in one of the following state
-  private static final int RUNNING = 0;
-  private static final int SUSPENDED = 1;
-  private static final int STOPPED = 2;
-  private static final int STOPPING = 3;
-  private static final int READY = 4;
-	
-  private int runnerState;			// the thread state
-	
-  private Map<String, Job> waitingJobs;
-  private Map<String, Job> readyJobs;
-  private Map<String, Job> runningJobs;
-  private Map<String, Job> successfulJobs;
-  private Map<String, Job> failedJobs;
-	
-  private long nextJobID;
-  private String groupName;
-	
   /** 
    * Construct a job control for a group of jobs.
    * @param groupName a name identifying this group
    */
   public JobControl(String groupName) {
-    this.waitingJobs = new Hashtable<String, Job>();
-    this.readyJobs = new Hashtable<String, Job>();
-    this.runningJobs = new Hashtable<String, Job>();
-    this.successfulJobs = new Hashtable<String, Job>();
-    this.failedJobs = new Hashtable<String, Job>();
-    this.nextJobID = -1;
-    this.groupName = groupName;
-    this.runnerState = JobControl.READY;
+    super(groupName);
   }
-	
-  private static ArrayList<Job> toArrayList(Map<String, Job> jobs) {
-    ArrayList<Job> retv = new ArrayList<Job>();
-    synchronized (jobs) {
-      for (Job job : jobs.values()) {
-        retv.add(job);
-      }
+  
+  static ArrayList<Job> castToJobList(ArrayList<ControlledJob> cjobs) {
+    ArrayList<Job> ret = new ArrayList<Job>();
+    for (ControlledJob job : cjobs) {
+      ret.add((Job)job);
     }
-    return retv;
+    return ret;
   }
-	
+  
   /**
    * @return the jobs in the waiting state
    */
   public ArrayList<Job> getWaitingJobs() {
-    return JobControl.toArrayList(this.waitingJobs);
+    return castToJobList(super.getWaitingJobList());
   }
 	
   /**
    * @return the jobs in the running state
    */
   public ArrayList<Job> getRunningJobs() {
-    return JobControl.toArrayList(this.runningJobs);
+    return castToJobList(super.getRunningJobList());
   }
 	
   /**
    * @return the jobs in the ready state
    */
   public ArrayList<Job> getReadyJobs() {
-    return JobControl.toArrayList(this.readyJobs);
+    return castToJobList(super.getReadyJobsList());
   }
 	
   /**
    * @return the jobs in the success state
    */
   public ArrayList<Job> getSuccessfulJobs() {
-    return JobControl.toArrayList(this.successfulJobs);
+    return castToJobList(super.getSuccessfulJobList());
   }
 	
   public ArrayList<Job> getFailedJobs() {
-    return JobControl.toArrayList(this.failedJobs);
-  }
-	
-  private String getNextJobID() {
-    nextJobID += 1;
-    return this.groupName + this.nextJobID;
-  }
-	
-  private static void addToQueue(Job aJob, Map<String, Job> queue) {
-    synchronized(queue) {
-      queue.put(aJob.getJobID(), aJob);
-    }		
-  }
-	
-  private void addToQueue(Job aJob) {
-    Map<String, Job> queue = getQueue(aJob.getState());
-    addToQueue(aJob, queue);	
-  }
-	
-  private Map<String, Job> getQueue(int state) {
-    Map<String, Job> retv = null;
-    if (state == Job.WAITING) {
-      retv = this.waitingJobs;
-    } else if (state == Job.READY) {
-      retv = this.readyJobs;
-    } else if (state == Job.RUNNING) {
-      retv = this.runningJobs;
-    } else if (state == Job.SUCCESS) {
-      retv = this.successfulJobs;
-    } else if (state == Job.FAILED || state == Job.DEPENDENT_FAILED) {
-      retv = this.failedJobs;
-    } 
-    return retv;
+    return castToJobList(super.getFailedJobList());
   }
 
   /**
-   * Add a new job.
-   * @param aJob the new job
-   */
-  synchronized public String addJob(Job aJob) {
-    String id = this.getNextJobID();
-    aJob.setJobID(id);
-    aJob.setState(Job.WAITING);
-    this.addToQueue(aJob);
-    return id;	
-  }
-	
-  /**
    * Add a collection of jobs
    * 
    * @param jobs
    */
-  public void addJobs(Collection<Job> jobs) {
+  public void addJobs(Collection <Job> jobs) {
     for (Job job : jobs) {
       addJob(job);
     }
   }
-	
+
   /**
    * @return the thread state
    */
   public int getState() {
-    return this.runnerState;
-  }
-	
-  /**
-   * set the thread state to STOPPING so that the 
-   * thread will stop when it wakes up.
-   */
-  public void stop() {
-    this.runnerState = JobControl.STOPPING;
-  }
-	
-  /**
-   * suspend the running thread
-   */
-  public void suspend () {
-    if (this.runnerState == JobControl.RUNNING) {
-      this.runnerState = JobControl.SUSPENDED;
-    }
-  }
-	
-  /**
-   * resume the suspended thread
-   */
-  public void resume () {
-    if (this.runnerState == JobControl.SUSPENDED) {
-      this.runnerState = JobControl.RUNNING;
+    ThreadState state = super.getThreadState();
+    if (state == ThreadState.RUNNING) {
+      return 0;
+    } 
+    if (state == ThreadState.SUSPENDED) {
+      return 1;
     }
-  }
-	
-  synchronized private void checkRunningJobs() {
-		
-    Map<String, Job> oldJobs = null;
-    oldJobs = this.runningJobs;
-    this.runningJobs = new Hashtable<String, Job>();
-		
-    for (Job nextJob : oldJobs.values()) {
-      int state = nextJob.checkState();
-      /*
-        if (state != Job.RUNNING) {
-        System.out.println("The state of the running job " +
-        nextJob.getJobName() + " has changed to: " + nextJob.getState());
-        }
-      */
-      this.addToQueue(nextJob);
+    if (state == ThreadState.STOPPED) {
+      return 2;
     }
-  }
-	
-  synchronized private void checkWaitingJobs() {
-    Map<String, Job> oldJobs = null;
-    oldJobs = this.waitingJobs;
-    this.waitingJobs = new Hashtable<String, Job>();
-		
-    for (Job nextJob : oldJobs.values()) {
-      int state = nextJob.checkState();
-      /*
-        if (state != Job.WAITING) {
-        System.out.println("The state of the waiting job " +
-        nextJob.getJobName() + " has changed to: " + nextJob.getState());
-        }
-      */
-      this.addToQueue(nextJob);
+    if (state == ThreadState.STOPPING) {
+      return 3;
     }
-  }
-	
-  synchronized private void startReadyJobs() {
-    Map<String, Job> oldJobs = null;
-    oldJobs = this.readyJobs;
-    this.readyJobs = new Hashtable<String, Job>();
-		
-    for (Job nextJob : oldJobs.values()) {
-      //System.out.println("Job to submit to Hadoop: " + nextJob.getJobName());
-      nextJob.submit();
-      //System.out.println("Hadoop ID: " + nextJob.getMapredJobID());
-      this.addToQueue(nextJob);
-    }	
-  }
-	
-  synchronized public boolean allFinished() {
-    return this.waitingJobs.size() == 0 &&
-      this.readyJobs.size() == 0 &&
-      this.runningJobs.size() == 0;
-  }
-	
-  /**
-   *  The main loop for the thread.
-   *  The loop does the following:
-   *  	Check the states of the running jobs
-   *  	Update the states of waiting jobs
-   *  	Submit the jobs in ready state
-   */
-  public void run() {
-    this.runnerState = JobControl.RUNNING;
-    while (true) {
-      while (this.runnerState == JobControl.SUSPENDED) {
-        try {
-          Thread.sleep(5000);
-        }
-        catch (Exception e) {
-					
-        }
-      }
-      checkRunningJobs();	
-      checkWaitingJobs();		
-      startReadyJobs();		
-      if (this.runnerState != JobControl.RUNNING && 
-          this.runnerState != JobControl.SUSPENDED) {
-        break;
-      }
-      try {
-        Thread.sleep(5000);
-      }
-      catch (Exception e) {
-				
-      }
-      if (this.runnerState != JobControl.RUNNING && 
-          this.runnerState != JobControl.SUSPENDED) {
-        break;
-      }
+    if (state == ThreadState.READY ) {
+      return 4;
     }
-    this.runnerState = JobControl.STOPPED;
+    return -1;
   }
 
 }

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java Mon Jun 15 13:23:44 2009
@@ -18,70 +18,71 @@
 
 package org.apache.hadoop.mapred.lib.db;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.lib.db.DBInputFormat.NullDBWritable;
 
 /**
- * A container for configuration property names for jobs with DB input/output. 
- * <br>
- * The job can be configured using the static methods in this class, 
- * {@link DBInputFormat}, and {@link DBOutputFormat}. 
- * <p> 
- * Alternatively, the properties can be set in the configuration with proper
- * values. 
- *   
- * @see DBConfiguration#configureDB(JobConf, String, String, String, String)
- * @see DBInputFormat#setInput(JobConf, Class, String, String)
- * @see DBInputFormat#setInput(JobConf, Class, String, String, String, String...)
- * @see DBOutputFormat#setOutput(JobConf, String, String...)
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.db.DBConfiguration} instead 
  */
-public class DBConfiguration {
-
+@Deprecated
+public class DBConfiguration extends 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration {
   /** The JDBC Driver class name */
-  public static final String DRIVER_CLASS_PROPERTY = "mapred.jdbc.driver.class";
+  public static final String DRIVER_CLASS_PROPERTY = 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration.DRIVER_CLASS_PROPERTY;
   
   /** JDBC Database access URL */
-  public static final String URL_PROPERTY = "mapred.jdbc.url";
+  public static final String URL_PROPERTY = 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration.URL_PROPERTY;
 
   /** User name to access the database */
-  public static final String USERNAME_PROPERTY = "mapred.jdbc.username";
+  public static final String USERNAME_PROPERTY = 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration.USERNAME_PROPERTY;
   
   /** Password to access the database */
-  public static final String PASSWORD_PROPERTY = "mapred.jdbc.password";
+  public static final String PASSWORD_PROPERTY = 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration.PASSWORD_PROPERTY;
 
   /** Input table name */
-  public static final String INPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.input.table.name";
+  public static final String INPUT_TABLE_NAME_PROPERTY = org.apache.hadoop.
+    mapreduce.lib.db.DBConfiguration.INPUT_TABLE_NAME_PROPERTY;
 
   /** Field names in the Input table */
-  public static final String INPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.input.field.names";
+  public static final String INPUT_FIELD_NAMES_PROPERTY = org.apache.hadoop.
+    mapreduce.lib.db.DBConfiguration.INPUT_FIELD_NAMES_PROPERTY;
 
   /** WHERE clause in the input SELECT statement */
-  public static final String INPUT_CONDITIONS_PROPERTY = "mapred.jdbc.input.conditions";
+  public static final String INPUT_CONDITIONS_PROPERTY = org.apache.hadoop.
+    mapreduce.lib.db.DBConfiguration.INPUT_CONDITIONS_PROPERTY;
   
   /** ORDER BY clause in the input SELECT statement */
-  public static final String INPUT_ORDER_BY_PROPERTY = "mapred.jdbc.input.orderby";
+  public static final String INPUT_ORDER_BY_PROPERTY = org.apache.hadoop.
+    mapreduce.lib.db.DBConfiguration.INPUT_ORDER_BY_PROPERTY;
   
   /** Whole input query, exluding LIMIT...OFFSET */
-  public static final String INPUT_QUERY = "mapred.jdbc.input.query";
+  public static final String INPUT_QUERY = 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration.INPUT_QUERY;
   
   /** Input query to get the count of records */
-  public static final String INPUT_COUNT_QUERY = "mapred.jdbc.input.count.query";
+  public static final String INPUT_COUNT_QUERY = 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration.INPUT_COUNT_QUERY;
   
   /** Class name implementing DBWritable which will hold input tuples */
-  public static final String INPUT_CLASS_PROPERTY = "mapred.jdbc.input.class";
+  public static final String INPUT_CLASS_PROPERTY = 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration.INPUT_CLASS_PROPERTY;
 
   /** Output table name */
-  public static final String OUTPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.output.table.name";
+  public static final String OUTPUT_TABLE_NAME_PROPERTY = org.apache.hadoop.
+    mapreduce.lib.db.DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY;
 
   /** Field names in the Output table */
-  public static final String OUTPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.output.field.names";  
+  public static final String OUTPUT_FIELD_NAMES_PROPERTY = org.apache.hadoop.
+    mapreduce.lib.db.DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY;  
 
   /** Number of fields in the Output table */
-  public static final String OUTPUT_FIELD_COUNT_PROPERTY = "mapred.jdbc.output.field.count";  
+  public static final String OUTPUT_FIELD_COUNT_PROPERTY = org.apache.hadoop.
+    mapreduce.lib.db.DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY;
+
   
   /**
    * Sets the DB access related fields in the JobConf.  
@@ -112,115 +113,8 @@
     configureDB(job, driverClass, dbUrl, null, null);
   }
 
-  private JobConf job;
-
   DBConfiguration(JobConf job) {
-    this.job = job;
-  }
-
-  /** Returns a connection object o the DB 
-   * @throws ClassNotFoundException 
-   * @throws SQLException */
-  Connection getConnection() throws ClassNotFoundException, SQLException{
-
-    Class.forName(job.get(DBConfiguration.DRIVER_CLASS_PROPERTY));
-
-    if(job.get(DBConfiguration.USERNAME_PROPERTY) == null) {
-      return DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY));
-    } else {
-      return DriverManager.getConnection(
-          job.get(DBConfiguration.URL_PROPERTY), 
-          job.get(DBConfiguration.USERNAME_PROPERTY), 
-          job.get(DBConfiguration.PASSWORD_PROPERTY));
-    }
-  }
-
-  String getInputTableName() {
-    return job.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY);
-  }
-
-  void setInputTableName(String tableName) {
-    job.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
-  }
-
-  String[] getInputFieldNames() {
-    return job.getStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY);
-  }
-
-  void setInputFieldNames(String... fieldNames) {
-    job.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);
-  }
-
-  String getInputConditions() {
-    return job.get(DBConfiguration.INPUT_CONDITIONS_PROPERTY);
-  }
-
-  void setInputConditions(String conditions) {
-    if (conditions != null && conditions.length() > 0)
-      job.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, conditions);
-  }
-
-  String getInputOrderBy() {
-    return job.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY);
-  }
-  
-  void setInputOrderBy(String orderby) {
-    if(orderby != null && orderby.length() >0) {
-      job.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, orderby);
-    }
-  }
-  
-  String getInputQuery() {
-    return job.get(DBConfiguration.INPUT_QUERY);
-  }
-  
-  void setInputQuery(String query) {
-    if(query != null && query.length() >0) {
-      job.set(DBConfiguration.INPUT_QUERY, query);
-    }
-  }
-  
-  String getInputCountQuery() {
-    return job.get(DBConfiguration.INPUT_COUNT_QUERY);
-  }
-  
-  void setInputCountQuery(String query) {
-    if(query != null && query.length() >0) {
-      job.set(DBConfiguration.INPUT_COUNT_QUERY, query);
-    }
-  }
-  
-  
-  Class<?> getInputClass() {
-    return job.getClass(DBConfiguration.INPUT_CLASS_PROPERTY, NullDBWritable.class);
-  }
-
-  void setInputClass(Class<? extends DBWritable> inputClass) {
-    job.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class);
-  }
-
-  String getOutputTableName() {
-    return job.get(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY);
-  }
-
-  void setOutputTableName(String tableName) {
-    job.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
-  }
-
-  String[] getOutputFieldNames() {
-    return job.getStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY);
-  }
-
-  void setOutputFieldNames(String... fieldNames) {
-    job.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames);
-  }
-
-  void setOutputFieldCount(int fieldCount) {
-    job.setInt(DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY, fieldCount);
-  }
-  
-  int getOutputFieldCount() {
-    return job.getInt(OUTPUT_FIELD_COUNT_PROPERTY, 0);
+    super(job);
   }
   
 }

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/db/DBInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/db/DBInputFormat.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/db/DBInputFormat.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/db/DBInputFormat.java Mon Jun 15 13:23:44 2009
@@ -18,14 +18,9 @@
 
 package org.apache.hadoop.mapred.lib.db;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
+import java.util.List;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
@@ -35,100 +30,31 @@
 import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapreduce.Job;
 
 /**
- * A InputFormat that reads input data from an SQL table.
- * <p>
- * DBInputFormat emits LongWritables containing the record number as 
- * key and DBWritables as value. 
- * 
- * The SQL query, and input class can be using one of the two 
- * setInput methods.
+ * @deprecated 
+ * Use {@link org.apache.hadoop.mapreduce.lib.db.DBInputFormat} instead.
  */
+@Deprecated
 public class DBInputFormat<T  extends DBWritable>
-  implements InputFormat<LongWritable, T>, JobConfigurable {
+    extends org.apache.hadoop.mapreduce.lib.db.DBInputFormat<T> 
+    implements InputFormat<LongWritable, T>, JobConfigurable {
   /**
    * A RecordReader that reads records from a SQL table.
    * Emits LongWritables containing the record number as 
    * key and DBWritables as value.  
    */
-  protected class DBRecordReader implements
-  RecordReader<LongWritable, T> {
-    private ResultSet results;
-
-    private Statement statement;
-
-    private Class<T> inputClass;
-
-    private JobConf job;
-
-    private DBInputSplit split;
-
-    private long pos = 0;
-
+  protected class DBRecordReader extends
+      org.apache.hadoop.mapreduce.lib.db.DBInputFormat<T>.DBRecordReader
+      implements RecordReader<LongWritable, T> {
     /**
      * @param split The InputSplit to read data for
      * @throws SQLException 
      */
-    protected DBRecordReader(DBInputSplit split, Class<T> inputClass, JobConf job) throws SQLException {
-      this.inputClass = inputClass;
-      this.split = split;
-      this.job = job;
-      
-      statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-
-      //statement.setFetchSize(Integer.MIN_VALUE);
-      results = statement.executeQuery(getSelectQuery());
-    }
-
-    /** Returns the query for selecting the records, 
-     * subclasses can override this for custom behaviour.*/
-    protected String getSelectQuery() {
-      StringBuilder query = new StringBuilder();
-      
-      if(dbConf.getInputQuery() == null) {
-        query.append("SELECT ");
-
-        for (int i = 0; i < fieldNames.length; i++) {
-          query.append(fieldNames[i]);
-          if(i != fieldNames.length -1) {
-            query.append(", ");
-          }
-        }
-
-        query.append(" FROM ").append(tableName);
-        query.append(" AS ").append(tableName); //in hsqldb this is necessary
-        if (conditions != null && conditions.length() > 0)
-          query.append(" WHERE (").append(conditions).append(")");
-        String orderBy = dbConf.getInputOrderBy();
-        if(orderBy != null && orderBy.length() > 0) {
-          query.append(" ORDER BY ").append(orderBy);
-        }
-      }
-      else {
-        query.append(dbConf.getInputQuery());
-      }
-
-      try {
-        query.append(" LIMIT ").append(split.getLength());
-        query.append(" OFFSET ").append(split.getStart());
-      }
-      catch (IOException ex) {
-        //ignore, will not throw
-      }
-      return query.toString();
-    }
-
-    /** {@inheritDoc} */
-    public void close() throws IOException {
-      try {
-        connection.commit();
-        results.close();
-        statement.close();
-      } catch (SQLException e) {
-        throw new IOException(e.getMessage());
-      }
+    protected DBRecordReader(DBInputSplit split, Class<T> inputClass, 
+      JobConf job) throws SQLException {
+     super(split, inputClass, job);
     }
 
     /** {@inheritDoc} */
@@ -138,59 +64,32 @@
 
     /** {@inheritDoc} */
     public T createValue() {
-      return ReflectionUtils.newInstance(inputClass, job);
+      return super.createValue();
     }
 
-    /** {@inheritDoc} */
     public long getPos() throws IOException {
-      return pos;
-    }
-
-    /** {@inheritDoc} */
-    public float getProgress() throws IOException {
-      return pos / (float)split.getLength();
+      return super.getPos();
     }
 
     /** {@inheritDoc} */
     public boolean next(LongWritable key, T value) throws IOException {
-      try {
-        if (!results.next())
-          return false;
-
-        // Set the key field value as the output key value
-        key.set(pos + split.getStart());
-
-        value.readFields(results);
-
-        pos ++;
-      } catch (SQLException e) {
-        throw new IOException(e.getMessage());
-      }
-      return true;
+      return super.next(key, value);
     }
   }
 
   /**
    * A Class that does nothing, implementing DBWritable
    */
-  public static class NullDBWritable implements DBWritable, Writable {
-    @Override
-    public void readFields(DataInput in) throws IOException { }
-    @Override
-    public void readFields(ResultSet arg0) throws SQLException { }
-    @Override
-    public void write(DataOutput out) throws IOException { }
-    @Override
-    public void write(PreparedStatement arg0) throws SQLException { }
+  public static class NullDBWritable extends 
+      org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable 
+      implements DBWritable, Writable {
   }
   /**
    * A InputSplit that spans a set of rows
    */
-  protected static class DBInputSplit implements InputSplit {
-
-    private long end = 0;
-    private long start = 0;
-
+  protected static class DBInputSplit extends 
+      org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit 
+      implements InputSplit {
     /**
      * Default Constructor
      */
@@ -203,77 +102,13 @@
      * @param end the index of the last row to select
      */
     public DBInputSplit(long start, long end) {
-      this.start = start;
-      this.end = end;
-    }
-
-    /** {@inheritDoc} */
-    public String[] getLocations() throws IOException {
-      // TODO Add a layer to enable SQL "sharding" and support locality
-      return new String[] {};
-    }
-
-    /**
-     * @return The index of the first row to select
-     */
-    public long getStart() {
-      return start;
-    }
-
-    /**
-     * @return The index of the last row to select
-     */
-    public long getEnd() {
-      return end;
-    }
-
-    /**
-     * @return The total row count in this split
-     */
-    public long getLength() throws IOException {
-      return end - start;
-    }
-
-    /** {@inheritDoc} */
-    public void readFields(DataInput input) throws IOException {
-      start = input.readLong();
-      end = input.readLong();
-    }
-
-    /** {@inheritDoc} */
-    public void write(DataOutput output) throws IOException {
-      output.writeLong(start);
-      output.writeLong(end);
+      super(start, end);
     }
   }
 
-  private String conditions;
-
-  private Connection connection;
-
-  private String tableName;
-
-  private String[] fieldNames;
-
-  private DBConfiguration dbConf;
-
   /** {@inheritDoc} */
   public void configure(JobConf job) {
-
-    dbConf = new DBConfiguration(job);
-
-    try {
-      this.connection = dbConf.getConnection();
-      this.connection.setAutoCommit(false);
-      connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
-    }
-    catch (Exception ex) {
-      throw new RuntimeException(ex);
-    }
-
-    tableName = dbConf.getInputTableName();
-    fieldNames = dbConf.getInputFieldNames();
-    conditions = dbConf.getInputConditions();
+    super.setConf(job);
   }
 
   /** {@inheritDoc} */
@@ -281,7 +116,7 @@
   public RecordReader<LongWritable, T> getRecordReader(InputSplit split,
       JobConf job, Reporter reporter) throws IOException {
 
-    Class inputClass = dbConf.getInputClass();
+    Class inputClass = super.getDBConf().getInputClass();
     try {
       return new DBRecordReader((DBInputSplit) split, inputClass, job);
     }
@@ -292,63 +127,16 @@
 
   /** {@inheritDoc} */
   public InputSplit[] getSplits(JobConf job, int chunks) throws IOException {
-
-	ResultSet results = null;  
-	Statement statement = null;
-    try {
-      statement = connection.createStatement();
-
-      results = statement.executeQuery(getCountQuery());
-      results.next();
-
-      long count = results.getLong(1);
-      long chunkSize = (count / chunks);
-
-      results.close();
-      statement.close();
-
-      InputSplit[] splits = new InputSplit[chunks];
-
-      // Split the rows into n-number of chunks and adjust the last chunk
-      // accordingly
-      for (int i = 0; i < chunks; i++) {
-        DBInputSplit split;
-
-        if ((i + 1) == chunks)
-          split = new DBInputSplit(i * chunkSize, count);
-        else
-          split = new DBInputSplit(i * chunkSize, (i * chunkSize)
-              + chunkSize);
-
-        splits[i] = split;
-      }
-
-      return splits;
-    } catch (SQLException e) {
-      try {
-        if (results != null) { results.close(); }
-      } catch (SQLException e1) {}
-      try {
-        if (statement != null) { statement.close(); }
-      } catch (SQLException e1) {}
-      throw new IOException(e.getMessage());
+    List<org.apache.hadoop.mapreduce.InputSplit> newSplits = 
+      super.getSplits(new Job(job));
+    InputSplit[] ret = new InputSplit[newSplits.size()];
+    int i = 0;
+    for (org.apache.hadoop.mapreduce.InputSplit s : newSplits) {
+      org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit split = 
+    	(org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit)s;
+      ret[i++] = new DBInputSplit(split.getStart(), split.getEnd());
     }
-  }
-
-  /** Returns the query for getting the total number of rows, 
-   * subclasses can override this for custom behaviour.*/
-  protected String getCountQuery() {
-    
-    if(dbConf.getInputCountQuery() != null) {
-      return dbConf.getInputCountQuery();
-    }
-    
-    StringBuilder query = new StringBuilder();
-    query.append("SELECT COUNT(*) FROM " + tableName);
-
-    if (conditions != null && conditions.length() > 0)
-      query.append(" WHERE " + conditions);
-    return query.toString();
+    return ret;
   }
 
   /**

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java Mon Jun 15 13:23:44 2009
@@ -23,120 +23,41 @@
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.StringUtils;
 
 /**
- * A OutputFormat that sends the reduce output to a SQL table.
- * <p> 
- * {@link DBOutputFormat} accepts &lt;key,value&gt; pairs, where 
- * key has a type extending DBWritable. Returned {@link RecordWriter} 
- * writes <b>only the key</b> to the database with a batch SQL query.  
- * 
+ * @deprecated Use org.apache.hadoop.mapreduce.lib.db.DBOutputFormat instead
  */
+@Deprecated
 public class DBOutputFormat<K  extends DBWritable, V> 
-implements OutputFormat<K,V> {
-
-  private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
+    extends org.apache.hadoop.mapreduce.lib.db.DBOutputFormat<K, V>
+    implements OutputFormat<K, V> {
 
   /**
    * A RecordWriter that writes the reduce output to a SQL table
    */
-  protected class DBRecordWriter 
-  implements RecordWriter<K, V> {
-
-    private Connection connection;
-    private PreparedStatement statement;
-
-    protected DBRecordWriter(Connection connection
-        , PreparedStatement statement) throws SQLException {
-      this.connection = connection;
-      this.statement = statement;
-      this.connection.setAutoCommit(false);
+  protected class DBRecordWriter extends 
+      org.apache.hadoop.mapreduce.lib.db.DBOutputFormat<K, V>.DBRecordWriter
+      implements RecordWriter<K, V> {
+
+    protected DBRecordWriter(Connection connection, 
+      PreparedStatement statement) throws SQLException {
+      super(connection, statement);
     }
 
     /** {@inheritDoc} */
     public void close(Reporter reporter) throws IOException {
-      try {
-        statement.executeBatch();
-        connection.commit();
-      } catch (SQLException e) {
-        try {
-          connection.rollback();
-        }
-        catch (SQLException ex) {
-          LOG.warn(StringUtils.stringifyException(ex));
-        }
-        throw new IOException(e.getMessage());
-      } finally {
-        try {
-          statement.close();
-          connection.close();
-        }
-        catch (SQLException ex) {
-          throw new IOException(ex.getMessage());
-        }
-      }
-    }
-
-    /** {@inheritDoc} */
-    public void write(K key, V value) throws IOException {
-      try {
-        key.write(statement);
-        statement.addBatch();
-      } catch (SQLException e) {
-        e.printStackTrace();
-      }
+      super.close(null);
     }
   }
 
-  /**
-   * Constructs the query used as the prepared statement to insert data.
-   * 
-   * @param table
-   *          the table to insert into
-   * @param fieldNames
-   *          the fields to insert into. If field names are unknown, supply an
-   *          array of nulls.
-   */
-  protected String constructQuery(String table, String[] fieldNames) {
-    if(fieldNames == null) {
-      throw new IllegalArgumentException("Field names may not be null");
-    }
-
-    StringBuilder query = new StringBuilder();
-    query.append("INSERT INTO ").append(table);
-
-    if (fieldNames.length > 0 && fieldNames[0] != null) {
-      query.append(" (");
-      for (int i = 0; i < fieldNames.length; i++) {
-        query.append(fieldNames[i]);
-        if (i != fieldNames.length - 1) {
-          query.append(",");
-        }
-      }
-      query.append(")");
-    }
-    query.append(" VALUES (");
-
-    for (int i = 0; i < fieldNames.length; i++) {
-      query.append("?");
-      if(i != fieldNames.length - 1) {
-        query.append(",");
-      }
-    }
-    query.append(");");
-
-    return query.toString();
-  }
-
   /** {@inheritDoc} */
   public void checkOutputSpecs(FileSystem filesystem, JobConf job)
   throws IOException {
@@ -146,24 +67,15 @@
   /** {@inheritDoc} */
   public RecordWriter<K, V> getRecordWriter(FileSystem filesystem,
       JobConf job, String name, Progressable progress) throws IOException {
-
-    DBConfiguration dbConf = new DBConfiguration(job);
-    String tableName = dbConf.getOutputTableName();
-    String[] fieldNames = dbConf.getOutputFieldNames();
-    
-    if(fieldNames == null) {
-      fieldNames = new String[dbConf.getOutputFieldCount()];
-    }
-    
+    org.apache.hadoop.mapreduce.RecordWriter<K, V> w = super.getRecordWriter(
+      new TaskAttemptContext(job, 
+            TaskAttemptID.forName(job.get("mapred.task.id"))));
+    org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter writer = 
+     (org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter) w;
     try {
-      Connection connection = dbConf.getConnection();
-      PreparedStatement statement = null;
-  
-      statement = connection.prepareStatement(constructQuery(tableName, fieldNames));
-      return new DBRecordWriter(connection, statement);
-    }
-    catch (Exception ex) {
-      throw new IOException(ex.getMessage());
+      return new DBRecordWriter(writer.getConnection(), writer.getStatement());
+    } catch(SQLException se) {
+      throw new IOException(se);
     }
   }
 

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java Mon Jun 15 13:23:44 2009
@@ -1,75 +1,11 @@
 package org.apache.hadoop.mapred.lib.db;
 
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-import org.apache.hadoop.io.Writable;
-
 /**
- * Objects that are read from/written to a database should implement
- * <code>DBWritable</code>. DBWritable, is similar to {@link Writable} 
- * except that the {@link #write(PreparedStatement)} method takes a 
- * {@link PreparedStatement}, and {@link #readFields(ResultSet)} 
- * takes a {@link ResultSet}. 
- * <p>
- * Implementations are responsible for writing the fields of the object 
- * to PreparedStatement, and reading the fields of the object from the 
- * ResultSet. 
- * 
- * <p>Example:</p>
- * If we have the following table in the database :
- * <pre>
- * CREATE TABLE MyTable (
- *   counter        INTEGER NOT NULL,
- *   timestamp      BIGINT  NOT NULL,
- * );
- * </pre>
- * then we can read/write the tuples from/to the table with :
- * <p><pre>
- * public class MyWritable implements Writable, DBWritable {
- *   // Some data     
- *   private int counter;
- *   private long timestamp;
- *       
- *   //Writable#write() implementation
- *   public void write(DataOutput out) throws IOException {
- *     out.writeInt(counter);
- *     out.writeLong(timestamp);
- *   }
- *       
- *   //Writable#readFields() implementation
- *   public void readFields(DataInput in) throws IOException {
- *     counter = in.readInt();
- *     timestamp = in.readLong();
- *   }
- *       
- *   public void write(PreparedStatement statement) throws SQLException {
- *     statement.setInt(1, counter);
- *     statement.setLong(2, timestamp);
- *   }
- *       
- *   public void readFields(ResultSet resultSet) throws SQLException {
- *     counter = resultSet.getInt(1);
- *     timestamp = resultSet.getLong(2);
- *   } 
- * }
- * </pre></p>
+ * @deprecated 
+ * Use {@link org.apache.hadoop.mapreduce.lib.db.DBWritable} instead
  */
-public interface DBWritable {
-
-  /**
-   * Sets the fields of the object in the {@link PreparedStatement}.
-   * @param statement the statement that the fields are put into.
-   * @throws SQLException
-   */
-	public void write(PreparedStatement statement) throws SQLException;
-	
-	/**
-	 * Reads the fields of the object from the {@link ResultSet}. 
-	 * @param resultSet the {@link ResultSet} to get the fields from.
-	 * @throws SQLException
-	 */
-	public void readFields(ResultSet resultSet) throws SQLException ; 
+@Deprecated
+public interface DBWritable 
+    extends org.apache.hadoop.mapreduce.lib.db.DBWritable {
 	
 }

Propchange: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapreduce/lib/db/
------------------------------------------------------------------------------
    svn:mergeinfo = 

Propchange: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/
------------------------------------------------------------------------------
    svn:mergeinfo = 

Modified: hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java Mon Jun 15 13:23:44 2009
@@ -12,7 +12,6 @@
  * implied. See the License for the specific language governing
  * permissions and limitations under the License.
  *
- * @author: Sriram Rao (Kosmix Corp.)
  * 
  * We need to provide the ability to the code in fs/kfs without really
  * having a KFS deployment.  For this purpose, use the LocalFileSystem

Modified: hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java Mon Jun 15 13:23:44 2009
@@ -12,7 +12,6 @@
  * implied. See the License for the specific language governing
  * permissions and limitations under the License.
  *
- * @author: Sriram Rao (Kosmix Corp.)
  * 
  * Unit tests for testing the KosmosFileSystem API implementation.
  */

Modified: hadoop/core/branches/HADOOP-3628-2/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/test/findbugsExcludeFile.xml?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/test/findbugsExcludeFile.xml Mon Jun 15 13:23:44 2009
@@ -141,6 +141,17 @@
        <Method name="getSplits" />
        <Bug pattern="DLS_DEAD_LOCAL_STORE" />
      </Match>
+
+     <Match>
+       <Class name="org.apache.hadoop.mapred.lib.db.DBWritable" />
+       <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
+     </Match>
+
+     <Match>
+       <Class name="org.apache.hadoop.mapred.lib.db.DBInputFormat$DBRecordReader" />
+       <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
+     </Match>
+     
     <!--
       This is a spurious warning. Just ignore
     -->

Modified: hadoop/core/branches/HADOOP-3628-2/src/test/hdfs/org/apache/hadoop/hdfs/TestModTime.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/test/hdfs/org/apache/hadoop/hdfs/TestModTime.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/test/hdfs/org/apache/hadoop/hdfs/TestModTime.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/test/hdfs/org/apache/hadoop/hdfs/TestModTime.java Mon Jun 15 13:23:44 2009
@@ -31,7 +31,6 @@
 
 /**
  * This class tests the decommissioning of nodes.
- * @author Dhruba Borthakur
  */
 public class TestModTime extends TestCase {
   static final long seed = 0xDEADBEEFL;

Modified: hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java Mon Jun 15 13:23:44 2009
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 /**
@@ -41,4 +44,28 @@
     assertTrue("Job failed", job.isSuccessful());
     assertOwnerShip(outDir);
   }
+  
+  public void testEnvironment() throws IOException {
+    if (!shouldRun()) {
+      return;
+    }
+    startCluster();
+    TestMiniMRChildTask childTask = new TestMiniMRChildTask();
+    Path inDir = new Path("input1");
+    Path outDir = new Path("output1");
+    try {
+      childTask.runTestTaskEnv(getClusterConf(), inDir, outDir);
+    } catch (IOException e) {
+      fail("IOException thrown while running enviroment test."
+          + e.getMessage());
+    } finally {
+      FileSystem outFs = outDir.getFileSystem(getClusterConf());
+      if (outFs.exists(outDir)) {
+        assertOwnerShip(outDir);
+        outFs.delete(outDir, true);
+      } else {
+        fail("Output directory does not exist" + outDir.toString());
+      }
+    }
+  }
 }

Modified: hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java Mon Jun 15 13:23:44 2009
@@ -23,17 +23,22 @@
 import java.io.IOException;
 import java.util.Random;
 import java.util.Iterator;
+import java.util.StringTokenizer;
 
 import junit.framework.TestCase;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.util.TestProcfsBasedProcessTree;
 
 import org.apache.commons.logging.Log;
@@ -49,11 +54,11 @@
             .getLog(TestKillSubProcesses.class);
 
   private static String TEST_ROOT_DIR = new File(System.getProperty(
-      "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
+      "test.build.data", "/tmp"), "killjob").toURI().toString().replace(' ', '+');
 
   private static JobClient jobClient = null;
 
-  private static MiniMRCluster mr = null;
+  static MiniMRCluster mr = null;
   private static Path scriptDir = null;
   private static String scriptDirName = null;
   private static String pid = null;
@@ -70,7 +75,7 @@
     conf.setJobName("testkilljobsubprocesses");
     conf.setMapperClass(KillingMapperWithChildren.class);
     
-    scriptDir = new Path(TEST_ROOT_DIR + "/script");
+    scriptDir = new Path(TEST_ROOT_DIR , "script");
     RunningJob job = runJobAndSetProcessHandle(jt, conf);
 
     // kill the job now
@@ -181,9 +186,8 @@
           }
         }
         LOG.info("pid of map task is " + pid);
-
-        // Checking if the map task is alive
-        assertTrue(ProcessTree.isAlive(pid));
+        //Checking if the map task is alive
+        assertTrue("Map is no more alive", isAlive(pid));
         LOG.info("The map task is alive before Job completion, as expected.");
       }
     }
@@ -216,7 +220,7 @@
                  " is " + childPid);
         assertTrue("Unexpected: The subprocess at level " + i +
                    " in the subtree is not alive before Job completion",
-                   ProcessTree.isAlive(childPid));
+                   isAlive(childPid));
       }
     }
     return job;
@@ -250,10 +254,10 @@
                  " is " + childPid);
         assertTrue("Unexpected: The subprocess at level " + i +
                    " in the subtree is alive after Job completion",
-                   !ProcessTree.isAlive(childPid));
+                   !isAlive(childPid));
       }
     }
-    FileSystem fs = FileSystem.get(conf);
+    FileSystem fs = FileSystem.getLocal(mr.createJobConf());
     if(fs.exists(scriptDir)) {
       fs.delete(scriptDir, true);
     }
@@ -261,10 +265,23 @@
   
   private static RunningJob runJob(JobConf conf) throws IOException {
 
-    final Path inDir = new Path(TEST_ROOT_DIR + "/killjob/input");
-    final Path outDir = new Path(TEST_ROOT_DIR + "/killjob/output");
+    final Path inDir;
+    final Path outDir;
+    FileSystem fs = FileSystem.getLocal(conf);
+    FileSystem tempFs = FileSystem.get(conf);
+    //Check if test is run with hdfs or local file system.
+    //if local filesystem then prepend TEST_ROOT_DIR, otherwise
+    //killjob folder would be created in workspace root.
+    if (!tempFs.getUri().toASCIIString().equals(
+        fs.getUri().toASCIIString())) {
+      inDir = new Path("killjob/input");
+      outDir = new Path("killjob/output");
+    } else {
+      inDir = new Path(TEST_ROOT_DIR, "input");
+      outDir = new Path(TEST_ROOT_DIR, "output");
+    }
 
-    FileSystem fs = FileSystem.get(conf);
+    
     if(fs.exists(scriptDir)) {
       fs.delete(scriptDir, true);
     }
@@ -290,9 +307,7 @@
       // run the TCs
       conf = mr.createJobConf();
       JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
-      runKillingJobAndValidate(jt, conf);
-      runFailingJobAndValidate(jt, conf);
-      runSuccessfulJobAndValidate(jt, conf);
+      runTests(conf, jt);
     } finally {
       if (mr != null) {
         mr.shutdown();
@@ -300,12 +315,25 @@
     }
   }
 
+  void runTests(JobConf conf, JobTracker jt) throws IOException {
+    FileSystem fs = FileSystem.getLocal(mr.createJobConf());
+    Path rootDir = new Path(TEST_ROOT_DIR);
+    if(!fs.exists(rootDir)) {
+      fs.mkdirs(rootDir);
+    }
+    fs.setPermission(rootDir, 
+        new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+    runKillingJobAndValidate(jt, conf);
+    runFailingJobAndValidate(jt, conf);
+    runSuccessfulJobAndValidate(jt, conf);
+  }
+
   /**
    * Creates signal file
    */
   private static void signalTask(String signalFile, JobConf conf) {
     try {
-      FileSystem fs = FileSystem.get(conf);
+      FileSystem fs = FileSystem.getLocal(conf);
       fs.createNewFile(new Path(signalFile));
     } catch(IOException e) {
       LOG.warn("Unable to create signal file. " + e);
@@ -317,10 +345,12 @@
    */
   private static void runChildren(JobConf conf) throws IOException {
     if (ProcessTree.isSetsidAvailable) {
-      FileSystem fs = FileSystem.get(conf);
+      FileSystem fs = FileSystem.getLocal(conf);
       TEST_ROOT_DIR = new Path(conf.get("test.build.data")).toUri().getPath();
-      scriptDir = new Path(TEST_ROOT_DIR + "/script");  
-    
+      scriptDir = new Path(TEST_ROOT_DIR + "/script");
+      if(fs.exists(scriptDir)){
+        fs.delete(scriptDir, true);
+      }
       // create shell script
       Random rm = new Random();
       Path scriptPath = new Path(scriptDir, "_shellScript_" + rm.nextInt()
@@ -329,6 +359,7 @@
       String script =
         "echo $$ > " + scriptDir.toString() + "/childPidFile" + "$1\n" +
         "echo hello\n" +
+        "trap 'echo got SIGTERM' 15 \n" +
         "if [ $1 != 0 ]\nthen\n" +
         " sh " + shellScript + " $(($1-1))\n" +
         "else\n" +
@@ -447,4 +478,46 @@
       throw new RuntimeException("failing map");
     }
   }
+  
+  /**
+   * Check for presence of the process with the pid passed is alive or not
+   * currently.
+   * 
+   * @param pid pid of the process
+   * @return if a process is alive or not.
+   */
+  private static boolean isAlive(String pid) throws IOException {
+    String commandString ="ps -o pid,command -e";
+    String args[] = new String[] {"bash", "-c" , commandString};
+    ShellCommandExecutor shExec = new ShellCommandExecutor(args);
+    try {
+      shExec.execute(); 
+    }catch(ExitCodeException e) {
+      return false;
+    } catch (IOException e) {
+      LOG.warn("IOExecption thrown while checking if process is alive" + 
+          StringUtils.stringifyException(e));
+      throw e;
+    }
+
+    String output = shExec.getOutput();
+
+    //Parse the command output and check for pid, ignore the commands
+    //which has ps or grep in it.
+    StringTokenizer strTok = new StringTokenizer(output, "\n");
+    boolean found = false;
+    while(strTok.hasMoreTokens()) {
+      StringTokenizer pidToken = new StringTokenizer(strTok.nextToken(), 
+          " ");
+      String pidStr = pidToken.nextToken();
+      String commandStr = pidToken.nextToken();
+      if(pid.equals(pidStr) && !(commandStr.contains("ps") 
+          || commandStr.contains("grep"))) {
+        found = true;
+        break;
+      }
+    }
+    return found; 
+  }
+  
 }

Modified: hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRChildTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRChildTask.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRChildTask.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRChildTask.java Mon Jun 15 13:23:44 2009
@@ -265,28 +265,11 @@
   public void testTaskEnv(){
     try {
       JobConf conf = mr.createJobConf();
-      
       // initialize input, output directories
       Path inDir = new Path("testing/wc/input1");
       Path outDir = new Path("testing/wc/output1");
-      String input = "The input";
-      
-      configure(conf, inDir, outDir, input, EnvCheckMapper.class);
-
       FileSystem outFs = outDir.getFileSystem(conf);
-      
-      // test 
-      //  - new SET of new var (MY_PATH)
-      //  - set of old var (HOME)
-      //  - append to an old var from modified env (LD_LIBRARY_PATH)
-      //  - append to an old var from tt's env (PATH)
-      //  - append to a new var (NEW_PATH)
-      conf.set("mapred.child.env", 
-               "MY_PATH=/tmp,HOME=/tmp,LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp,"
-               + "PATH=$PATH:/tmp,NEW_PATH=$NEW_PATH:/tmp");
-      conf.set("path", System.getenv("PATH"));
-
-      JobClient.runJob(conf);
+      runTestTaskEnv(conf, inDir, outDir);
       outFs.delete(outDir, true);
     } catch(Exception e) {
       e.printStackTrace();
@@ -294,4 +277,22 @@
       tearDown();
     }
   }
+  
+  void runTestTaskEnv(JobConf conf, Path inDir, Path outDir) throws IOException {
+    String input = "The input";
+    configure(conf, inDir, outDir, input, EnvCheckMapper.class);
+    // test 
+    //  - new SET of new var (MY_PATH)
+    //  - set of old var (HOME)
+    //  - append to an old var from modified env (LD_LIBRARY_PATH)
+    //  - append to an old var from tt's env (PATH)
+    //  - append to a new var (NEW_PATH)
+    conf.set("mapred.child.env", 
+             "MY_PATH=/tmp,HOME=/tmp,LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp,"
+             + "PATH=$PATH:/tmp,NEW_PATH=$NEW_PATH:/tmp");
+    conf.set("path", System.getenv("PATH"));
+    RunningJob job = JobClient.runJob(conf);
+    assertTrue("The environment checker job failed.", job.isSuccessful());
+  }
+  
 }

Modified: hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java Mon Jun 15 13:23:44 2009
@@ -28,6 +28,7 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
 import org.apache.hadoop.mapred.MiniMRCluster.JobTrackerRunner;
 import org.apache.hadoop.mapred.TestJobInProgressListener.MyScheduler;
@@ -310,7 +311,7 @@
     fs.delete(rFile,false);
     
     // start the jobtracker
-    LOG.info("Stopping jobtracker with system files deleted");
+    LOG.info("Starting jobtracker with system files deleted");
     mr.startJobTracker();
     
     UtilsForTests.waitForJobTracker(jc);
@@ -394,8 +395,58 @@
     LOG.info("Starting jobtracker with fs errors");
     mr.startJobTracker();
     JobTrackerRunner runner = mr.getJobTrackerRunner();
-    assertFalse("Restart count for new job is incorrect", runner.isActive());
+    assertFalse("JobTracker is still alive", runner.isActive());
 
     mr.shutdown();
   } 
+
+  /**
+   * Test if the jobtracker waits for the info file to be created before 
+   * starting.
+   */
+  public void testJobTrackerInfoCreation() throws Exception {
+    LOG.info("Testing jobtracker.info file");
+    MiniDFSCluster dfs = new MiniDFSCluster(new Configuration(), 1, true, null);
+    String namenode = (dfs.getFileSystem()).getUri().getHost() + ":"
+                      + (dfs.getFileSystem()).getUri().getPort();
+    // shut down the data nodes
+    dfs.shutdownDataNodes();
+
+    // start the jobtracker
+    JobConf conf = new JobConf();
+    FileSystem.setDefaultUri(conf, namenode);
+    conf.set("mapred.job.tracker", "localhost:0");
+    conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
+
+    JobTracker jobtracker = new JobTracker(conf);
+
+    // now check if the update restart count works fine or not
+    boolean failed = false;
+    try {
+      jobtracker.recoveryManager.updateRestartCount();
+    } catch (IOException ioe) {
+      failed = true;
+    }
+    assertTrue("JobTracker created info files without datanodes!!!", failed);
+
+    Path restartFile = jobtracker.recoveryManager.getRestartCountFile();
+    Path tmpRestartFile = jobtracker.recoveryManager.getTempRestartCountFile();
+    FileSystem fs = dfs.getFileSystem();
+    assertFalse("Info file exists after update failure", 
+                fs.exists(restartFile));
+    assertFalse("Temporary restart-file exists after update failure", 
+                fs.exists(restartFile));
+
+    // start 1 data node
+    dfs.startDataNodes(conf, 1, true, null, null, null, null);
+    dfs.waitActive();
+
+    failed = false;
+    try {
+      jobtracker.recoveryManager.updateRestartCount();
+    } catch (IOException ioe) {
+      failed = true;
+    }
+    assertFalse("JobTracker failed to create info files with datanodes!!!", failed);
+  }
 }

Propchange: hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/
------------------------------------------------------------------------------
    svn:mergeinfo = 

Propchange: hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/
------------------------------------------------------------------------------
    svn:mergeinfo = 



Mime
View raw message