hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r903227 [5/16] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ conf/ ivy/ src/benchmarks/gridmix/ src/benchmarks/gridmix/javasort/ src/benchmarks/gridmix/maxent/ src/benchmarks/gridmix/monsterQuery/ src/benchmarks/grid...
Date Tue, 26 Jan 2010 14:03:09 GMT
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java Tue Jan 26 14:02:53 2010
@@ -18,11 +18,14 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedHashSet;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Set;
+import java.util.Map;
 
+import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
@@ -33,14 +36,49 @@
  * Class that simulates a job client. It's main functionality is to submit jobs
  * to the simulation engine, and shutdown the simulation engine if the job
  * producer runs out of jobs.
- * TODO: Change System.out.printXX to LOG.xxx.
  */
 public class SimulatorJobClient implements SimulatorEventListener {
+  protected static class JobSketchInfo {
+    protected int numMaps;
+    protected int numReduces;
+    JobSketchInfo(int numMaps, int numReduces) {
+      this.numMaps = numMaps;
+      this.numReduces = numReduces;
+    }
+  }
+  
   private final ClientProtocol jobTracker;
   private final JobStoryProducer jobStoryProducer;
-  private Set<JobID> runningJobs = new LinkedHashSet<JobID>();
+  private final SimulatorJobSubmissionPolicy submissionPolicy;
+  private static final int LOAD_PROB_INTERVAL_START = 1000;
+  private static final int LOAD_PROB_INTERVAL_MAX = 320000;
+  private int loadProbingInterval = LOAD_PROB_INTERVAL_START;
+  
+  /**
+   * The minimum ratio between pending+running map tasks (aka. incomplete map
+   * tasks) and cluster map slot capacity for us to consider the cluster is
+   * overloaded. For running maps, we only count them partially. Namely, a 40%
+   * completed map is counted as 0.6 map tasks in our calculation.
+   */
+  private static final float OVERLAOD_MAPTASK_MAPSLOT_RATIO=2.0f;
+  /**
+   * Keep track of the in-flight load-probing event.
+   */
+  private LoadProbingEvent inFlightLPE = null;
+  /**
+   * We do not have handle to the SimulatorEventQueue, and thus cannot cancel
+   * events directly. Instead, we keep an identity-map (should have been an
+   * identity-set except that JDK does not provide an identity-set) to skip
+   * events that are cancelled.
+   */
+  private Map<LoadProbingEvent, Boolean> cancelledLPE = 
+    new IdentityHashMap<LoadProbingEvent, Boolean>();
+  
+  private Map<JobID, JobSketchInfo> runningJobs = 
+    new LinkedHashMap<JobID, JobSketchInfo>();
   private boolean noMoreJobs = false;
-
+  private JobStory nextJob;
+  
   /**
    * Constructor.
    * 
@@ -49,64 +87,255 @@
    *          SimulatorJobClient} interacts with the JobTracker through the
    *          {@link ClientProtocol}.
    * @param jobStoryProducer
+   * @param submissionPolicy How should we submit jobs to the JobTracker?
    */
-  public SimulatorJobClient(ClientProtocol jobTracker, JobStoryProducer jobStoryProducer) {
+  public SimulatorJobClient(ClientProtocol jobTracker, 
+                            JobStoryProducer jobStoryProducer,
+                            SimulatorJobSubmissionPolicy submissionPolicy) {
     this.jobTracker = jobTracker;
     this.jobStoryProducer = jobStoryProducer;
+    this.submissionPolicy = submissionPolicy;
   }
   
+  /**
+   * Constructor.
+   * 
+   * @param jobTracker
+   *          The job tracker where we submit job to. Note that the {@link
+   *          SimulatorJobClient} interacts with the JobTracker through the
+   *          {@link ClientProtocol}.
+   * @param jobStoryProducer
+   */
+  public SimulatorJobClient(ClientProtocol jobTracker, 
+                            JobStoryProducer jobStoryProducer) {
+    this(jobTracker, jobStoryProducer,  SimulatorJobSubmissionPolicy.REPLAY);
+  }
+
   @Override
   public List<SimulatorEvent> init(long when) throws IOException {
     JobStory job = jobStoryProducer.getNextJob();
-    if (job.getSubmissionTime() != when) {
+    if (submissionPolicy == SimulatorJobSubmissionPolicy.REPLAY
+        && job.getSubmissionTime() != when) {
       throw new IOException("Inconsistent submission time for the first job: "
           + when + " != " + job.getSubmissionTime()+".");
     }
+    
     JobSubmissionEvent event = new JobSubmissionEvent(this, when, job);
-    return Collections.<SimulatorEvent> singletonList(event);
+    if (submissionPolicy != SimulatorJobSubmissionPolicy.STRESS) {
+      return Collections.<SimulatorEvent> singletonList(event);
+    } else {
+      ArrayList<SimulatorEvent> ret = new ArrayList<SimulatorEvent>(2);
+      ret.add(event);
+      inFlightLPE = new LoadProbingEvent(this, when + loadProbingInterval);
+      ret.add(inFlightLPE);
+      return ret;
+    }
   }
-  
-  @Override
-  public List<SimulatorEvent> accept(SimulatorEvent event)
-      throws IOException {
-    if (event instanceof JobSubmissionEvent) {
-      JobSubmissionEvent submitEvent = (JobSubmissionEvent)(event);
-  
-      // Submit job
-      JobStatus status = null;
-      try {
-        status = submitJob(submitEvent.getJob());
-      } catch (InterruptedException e) {
-        throw new IOException(e);
+
+  /**
+   * Doing exponential back-off probing because load probing could be pretty
+   * expensive if we have many pending jobs.
+   * 
+   * @param overloaded Is the job tracker currently overloaded?
+   */
+  private void adjustLoadProbingInterval(boolean overloaded) {
+    if (overloaded) {
+      /**
+       * We should only extend LPE interval when there is no in-flight LPE.
+       */
+      if (inFlightLPE == null) {
+        loadProbingInterval = Math.min(loadProbingInterval * 2,
+            LOAD_PROB_INTERVAL_MAX);
       }
-      runningJobs.add(status.getJobID());
-      System.out.println("Job " + status.getJobID() + 
-                         " is submitted at " + submitEvent.getTimeStamp());
+    } else {
+      loadProbingInterval = LOAD_PROB_INTERVAL_START;
+    }
+  }
+  
+  /**
+   * We try to use some light-weight mechanism to determine cluster load.
+   * @return Whether, from job client perspective, the cluster is overloaded.
+   */
+  private boolean isOverloaded(long now) throws IOException {
+    try {
+      ClusterMetrics clusterMetrics = jobTracker.getClusterMetrics();
       
-      JobStory nextJob = jobStoryProducer.getNextJob();
-      if (nextJob == null) {
-        noMoreJobs = true;
-        return SimulatorEngine.EMPTY_EVENTS;
+      // If there are more jobs than number of task trackers, we assume the
+      // cluster is overloaded. This is to bound the memory usage of the
+      // simulator job tracker, in situations where we have jobs with small
+      // number of map tasks and large number of reduce tasks.
+      if (runningJobs.size() >= clusterMetrics.getTaskTrackerCount()) {
+        System.out.printf("%d Overloaded is %s: " +
+                "#runningJobs >= taskTrackerCount (%d >= %d)\n",
+                now, Boolean.TRUE.toString(),
+                runningJobs.size(), clusterMetrics.getTaskTrackerCount());
+        return true;    
       }
-      
-      return Collections.<SimulatorEvent>singletonList(
-          new JobSubmissionEvent(this, nextJob.getSubmissionTime(), nextJob));
+
+      float incompleteMapTasks = 0; // include pending & running map tasks.
+      for (Map.Entry<JobID, JobSketchInfo> entry : runningJobs.entrySet()) {
+        org.apache.hadoop.mapreduce.JobStatus jobStatus = jobTracker
+            .getJobStatus(entry.getKey());
+        incompleteMapTasks += (1 - Math.min(jobStatus.getMapProgress(), 1.0))
+            * entry.getValue().numMaps;
+      }
+
+      boolean overloaded = incompleteMapTasks >
+          OVERLAOD_MAPTASK_MAPSLOT_RATIO * clusterMetrics.getMapSlotCapacity();
+      String relOp = (overloaded) ? ">" : "<=";
+      System.out.printf("%d Overloaded is %s: "
+          + "incompleteMapTasks %s %.1f*mapSlotCapacity (%.1f %s %.1f*%d)\n",
+          now, Boolean.toString(overloaded), relOp, OVERLAOD_MAPTASK_MAPSLOT_RATIO,
+          incompleteMapTasks, relOp, OVERLAOD_MAPTASK_MAPSLOT_RATIO, 
+          clusterMetrics.getMapSlotCapacity());
+      return overloaded;
+    } catch (InterruptedException e) {
+      throw new IOException("InterruptedException", e);
+    }
+  }
+  
+  /**
+   * Handles a simulation event that is either JobSubmissionEvent or 
+   * JobCompletionEvent.
+   *
+   * @param event SimulatorEvent to respond to
+   * @return list of events generated in response
+   */
+  @Override
+  public List<SimulatorEvent> accept(SimulatorEvent event) throws IOException {
+    if (event instanceof JobSubmissionEvent) {
+      return processJobSubmissionEvent((JobSubmissionEvent) event);
     } else if (event instanceof JobCompleteEvent) {
-      JobCompleteEvent jobCompleteEvent = (JobCompleteEvent)event;
-      JobStatus jobStatus = jobCompleteEvent.getJobStatus();
-      System.out.println("Job " + jobStatus.getJobID() + 
-                         " completed at " + jobCompleteEvent.getTimeStamp() + 
-                         " with status: " + jobStatus.getState() +
-                         " runtime: " + 
-                         (jobCompleteEvent.getTimeStamp() - jobStatus.getStartTime()));
-      runningJobs.remove(jobCompleteEvent.getJobStatus().getJobID());
-      if (noMoreJobs && runningJobs.isEmpty()) {
-        jobCompleteEvent.getEngine().shutdown();
+      return processJobCompleteEvent((JobCompleteEvent) event);
+    } else if (event instanceof LoadProbingEvent) {
+      return processLoadProbingEvent((LoadProbingEvent) event);
+    } else {
+      throw new IllegalArgumentException("unknown event type: "
+          + event.getClass());
+    }
+  }
+
+  /**
+   * Responds to a job submission event by submitting the job to the 
+   * job tracker. If serializeJobSubmissions is true, it postpones the
+   * submission until after the previous job finished instead.
+   * 
+   * @param submitEvent the submission event to respond to
+   */
+  private List<SimulatorEvent> processJobSubmissionEvent(
+      JobSubmissionEvent submitEvent) throws IOException {
+    // Submit job
+    JobStatus status = null;
+    JobStory story = submitEvent.getJob();
+    try {
+      status = submitJob(story);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+    runningJobs.put(status.getJobID(), new JobSketchInfo(story.getNumberMaps(),
+        story.getNumberReduces()));
+    System.out.println("Job " + status.getJobID() + " is submitted at "
+        + submitEvent.getTimeStamp());
+
+    // Find the next job to submit
+    nextJob = jobStoryProducer.getNextJob();
+    if (nextJob == null) {
+      noMoreJobs = true;
+      return SimulatorEngine.EMPTY_EVENTS;
+    } else if (submissionPolicy == SimulatorJobSubmissionPolicy.REPLAY) {
+      // enqueue next submission event
+      return Collections
+          .<SimulatorEvent> singletonList(new JobSubmissionEvent(this, nextJob
+              .getSubmissionTime(), nextJob));
+    } else if (submissionPolicy == SimulatorJobSubmissionPolicy.STRESS) {
+      return checkLoadAndSubmitJob(submitEvent.getTimeStamp());
+    }
+    
+    return SimulatorEngine.EMPTY_EVENTS;
+  }
+  
+  /**
+   * Handles a job completion event. 
+   * 
+   * @param jobCompleteEvent the submission event to respond to
+   * @throws IOException 
+   */
+  private List<SimulatorEvent> processJobCompleteEvent(
+      JobCompleteEvent jobCompleteEvent) throws IOException {
+    JobStatus jobStatus = jobCompleteEvent.getJobStatus();
+    System.out.println("Job " + jobStatus.getJobID() + " completed at "
+        + jobCompleteEvent.getTimeStamp() + " with status: "
+        + jobStatus.getState() + " runtime: "
+        + (jobCompleteEvent.getTimeStamp() - jobStatus.getStartTime()));
+    runningJobs.remove(jobCompleteEvent.getJobStatus().getJobID());
+    if (noMoreJobs && runningJobs.isEmpty()) {
+      jobCompleteEvent.getEngine().shutdown();
+    }
+
+    if (!noMoreJobs) {
+      if (submissionPolicy == SimulatorJobSubmissionPolicy.SERIAL) {
+        long submissionTime = jobCompleteEvent.getTimeStamp() + 1;
+        JobStory story = new SimulatorJobStory(nextJob, submissionTime);
+        return Collections
+            .<SimulatorEvent> singletonList(new JobSubmissionEvent(this,
+                submissionTime, story));
+      } else if (submissionPolicy == SimulatorJobSubmissionPolicy.STRESS) {
+        return checkLoadAndSubmitJob(jobCompleteEvent.getTimeStamp());
       }
+    }
+    return SimulatorEngine.EMPTY_EVENTS;
+  }
+  
+  /**
+   * Check whether job tracker is overloaded. If not, submit the next job.
+   * Pre-condition: noMoreJobs == false
+   * @return A list of {@link SimulatorEvent}'s as the follow-up actions.
+   */
+  private List<SimulatorEvent> checkLoadAndSubmitJob(long now) throws IOException {
+    List<SimulatorEvent> ret = new ArrayList<SimulatorEvent>(2);
+    boolean overloaded = isOverloaded(now);
+    adjustLoadProbingInterval(overloaded);
+    
+    if (inFlightLPE != null && (inFlightLPE.getTimeStamp()>now+loadProbingInterval)) {
+      cancelledLPE.put(inFlightLPE, Boolean.TRUE);
+      inFlightLPE = null;
+    }
+    
+    if (inFlightLPE == null) {
+      inFlightLPE = new LoadProbingEvent(this, now + loadProbingInterval);
+      ret.add(inFlightLPE);
+    }
+
+    if (!overloaded) {
+      long submissionTime = now + 1;
+      JobStory story = new SimulatorJobStory(nextJob, submissionTime);
+      ret.add(new JobSubmissionEvent(this, submissionTime, story));
+    }
+    
+    return ret;
+  }
+  
+  /**
+   * Handles a load probing event. If cluster is not overloaded, submit a new job.
+   * 
+   * @param loadProbingEvent the load probing event
+   */
+  private List<SimulatorEvent> processLoadProbingEvent(
+      LoadProbingEvent loadProbingEvent) throws IOException {
+    if (cancelledLPE.containsKey(loadProbingEvent)) {
+      cancelledLPE.remove(loadProbingEvent);
       return SimulatorEngine.EMPTY_EVENTS;
-    } else {
-      throw new IllegalArgumentException("unknown event type: " + event.getClass());
     }
+    
+    assert(loadProbingEvent == inFlightLPE);
+    
+    inFlightLPE = null;
+    
+    if (noMoreJobs) {
+      return SimulatorEngine.EMPTY_EVENTS;
+    }
+    
+    return checkLoadAndSubmitJob(loadProbingEvent.getTimeStamp());
   }
 
   @SuppressWarnings("deprecation")
@@ -120,6 +349,6 @@
     }
     
     SimulatorJobCache.put(org.apache.hadoop.mapred.JobID.downgrade(jobId), job);
-    return jobTracker.submitJob(jobId);
+    return jobTracker.submitJob(jobId, "dummy-path", null);
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java Tue Jan 26 14:02:53 2010
@@ -28,11 +28,11 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job.RawSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.net.Node;
@@ -48,10 +48,10 @@
   // cache
   private final JobStory jobStory;
 
-  RawSplit[] splits;
+  TaskSplitMetaInfo[] taskSplitMetaInfo;
 
   @SuppressWarnings("deprecation")
-  public SimulatorJobInProgress(JobID jobid, JobTracker jobtracker,
+  public SimulatorJobInProgress(JobID jobid, String jobSubmitDir, JobTracker jobtracker,
       JobConf default_conf, JobStory jobStory) {
     super();
     // jobSetupCleanupNeeded set to false in parent cstr, though
@@ -63,7 +63,7 @@
     this.jobtracker = jobtracker;
     this.conf = jobStory.getJobConf();
     this.priority = conf.getJobPriority();
-    Path jobDir = jobtracker.getSystemDirectoryForJob(jobid);
+    Path jobDir = new Path(jobSubmitDir);
     this.jobFile = new Path(jobDir, "job.xml");
     this.status = new JobStatus(jobid, 0.0f, 0.0f, 0.0f, 0.0f, JobStatus.PREP,
         priority, conf.getUser(), conf.getJobName(), jobFile.toString(), url);
@@ -127,16 +127,17 @@
     }
 
     final String jobFile = "default";
-    splits = getRawSplits(jobStory.getInputSplits());
+    taskSplitMetaInfo = createSplits(jobStory);
     if (loggingEnabled) {
       LOG.debug("(initTasks@SJIP) Created splits for job = " + jobId
-          + " number of splits = " + splits.length);
+          + " number of splits = " + taskSplitMetaInfo.length);
     }
 
-    createMapTasks(jobFile, splits);
+    createMapTasks(jobFile, taskSplitMetaInfo);
 
     if (numMapTasks > 0) {
-      nonRunningMapCache = createCache(splits, maxLevel);
+      nonRunningMapCache = createCache(taskSplitMetaInfo,
+          maxLevel);
       if (loggingEnabled) {
         LOG.debug("initTasks:numMaps=" + numMapTasks
             + " Size of nonRunningMapCache=" + nonRunningMapCache.size()
@@ -167,25 +168,25 @@
     }
   }
 
-  RawSplit[] getRawSplits(InputSplit[] splits) throws IOException {
+  
+  TaskSplitMetaInfo[] createSplits(JobStory story) throws IOException {
+    InputSplit[] splits = story.getInputSplits();
     if (splits == null || splits.length != numMapTasks) {
       throw new IllegalArgumentException("Input split size mismatch: expected="
           + numMapTasks + ", actual=" + ((splits == null) ? -1 : splits.length));
     }
 
-    RawSplit rawSplits[] = new RawSplit[splits.length];
-    for (int i = 0; i < splits.length; i++) {
+    TaskSplitMetaInfo[] splitMetaInfo = 
+      new TaskSplitMetaInfo[story.getNumberMaps()];
+    int i = 0;
+    for (InputSplit split : splits) {
       try {
-        rawSplits[i] = new RawSplit();
-        rawSplits[i].setClassName(splits[i].getClass().getName());
-        rawSplits[i].setDataLength(splits[i].getLength());
-        rawSplits[i].setLocations(splits[i].getLocations());
+        splitMetaInfo[i++] = new TaskSplitMetaInfo(split,0);
       } catch (InterruptedException ie) {
         throw new IOException(ie);
       }
     }
-
-    return rawSplits;
+    return splitMetaInfo;
   }
 
   /**
@@ -208,7 +209,8 @@
     assert (jobid == getJobID());
 
     // Get splits for the TaskAttempt
-    RawSplit split = splits[taskAttemptID.getTaskID().getId()];
+    TaskSplitMetaInfo split = 
+     taskSplitMetaInfo[taskAttemptID.getTaskID().getId()];
     int locality = getClosestLocality(taskTracker, split);
 
     TaskID taskId = taskAttemptID.getTaskID();
@@ -232,7 +234,7 @@
     return taskAttemptInfo;
   }
 
-  private int getClosestLocality(TaskTracker taskTracker, RawSplit split) {
+  private int getClosestLocality(TaskTracker taskTracker, TaskSplitMetaInfo split) {
     int locality = 2;
 
     Node taskTrackerNode = jobtracker

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java Tue Jan 26 14:02:53 2010
@@ -35,10 +35,11 @@
 import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
 import org.apache.hadoop.mapred.SimulatorJobInProgress;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
 
 /**
  * {@link SimulatorJobTracker} extends {@link JobTracker}. It implements the
- * {@link JobSubmissionProtocol} and the {@link InterTrackerProtocol} protocols.
+ * {@link InterTrackerProtocol} protocols.
  */
 @SuppressWarnings("deprecation")
 public class SimulatorJobTracker extends JobTracker {
@@ -173,7 +174,9 @@
   }
 
   @Override
-  public synchronized JobStatus submitJob(JobID jobId) throws IOException {
+  public synchronized JobStatus submitJob(
+      JobID jobId, String jobSubmitDir, TokenStorage ts) 
+  throws IOException {
     boolean loggingEnabled = LOG.isDebugEnabled();
     if (loggingEnabled) {
       LOG.debug("submitJob for jobname = " + jobId);
@@ -191,7 +194,7 @@
     }
     validateAndSetClock(jobStory.getSubmissionTime());
     
-    SimulatorJobInProgress job = new SimulatorJobInProgress(jobId, this,
+    SimulatorJobInProgress job = new SimulatorJobInProgress(jobId, jobSubmitDir, this,
                                                             this.conf, 
                                                             jobStory);
     return addJob(jobId, job);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java Tue Jan 26 14:02:53 2010
@@ -35,6 +35,7 @@
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.QueueAclsInfo;
 import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.security.TokenStorage;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
@@ -43,6 +44,7 @@
 import org.apache.hadoop.tools.rumen.TaskInfo;
 import org.apache.hadoop.tools.rumen.MapTaskAttemptInfo;
 import org.apache.hadoop.tools.rumen.ReduceTaskAttemptInfo;
+import org.apache.hadoop.mapreduce.split.JobSplit.*;
 //
 // Mock jobtracker class that check heartbeat() in parameters and 
 // sends responses based on a prepopulated table
@@ -76,7 +78,8 @@
   }
 
   @Override
-  public JobStatus submitJob(JobID jobId) throws IOException {
+  public JobStatus submitJob(
+      JobID jobId, String jobSubmitDir, TokenStorage ts) throws IOException {
     JobStatus status = new JobStatus(jobId, 0.0f, 0.0f, 0.0f, 0.0f,
         JobStatus.State.RUNNING, JobPriority.NORMAL, "", "", "", "");
     return status;
@@ -172,8 +175,8 @@
     final int numSlotsRequired = 1;
     org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi = 
         org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId);        
-    Task task = new MapTask("dummyjobfile", taskIdOldApi, 0, "dummysplitclass",
-                            null, numSlotsRequired);
+    Task task = new MapTask("dummyjobfile", taskIdOldApi, 0, new TaskSplitIndex(),
+                             numSlotsRequired);
     // all byte counters are 0
     TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0); 
     MapTaskAttemptInfo taskAttemptInfo = 
@@ -302,6 +305,11 @@
   public String getSystemDir() {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public String getStagingAreaDir() {
+    throw new UnsupportedOperationException();
+  }
   
   @Override
   public String getBuildVersion() {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java Tue Jan 26 14:02:53 2010
@@ -36,10 +36,12 @@
 public class TestSimulatorEndToEnd {
 
   public static final Log LOG = LogFactory.getLog(MockSimulatorEngine.class);
+  protected SimulatorJobSubmissionPolicy policy = SimulatorJobSubmissionPolicy.REPLAY;
   
   @Test
   public void testMain() throws Exception {
     final Configuration conf = new Configuration();
+    conf.set(SimulatorJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy.name());
     final FileSystem lfs = FileSystem.getLocal(conf);
     final Path rootInputDir = new Path(
         System.getProperty("src.test.data", "data")).makeQualified(lfs);
@@ -55,7 +57,7 @@
     MockSimulatorEngine mockMumak = new MockSimulatorEngine(numJobs, nTrackers);
 
     String[] args = { traceFile.toString(), topologyFile.toString() };
-    int res = ToolRunner.run(new Configuration(), mockMumak, args);
+    int res = ToolRunner.run(conf, mockMumak, args);
     Assert.assertEquals(res, 0);
   }
   

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobClient.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobClient.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobClient.java Tue Jan 26 14:02:53 2010
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -119,6 +120,7 @@
     private long[] times;
     private int index = 0;
     private List<MockJobStory> jobs = new ArrayList<MockJobStory>();
+    private Random random = new Random();
     
     public MockJobStoryProducer(long[] times, long relativeStartTime) {
       super();
@@ -127,7 +129,7 @@
       index = 0;
       
       for (long time: times) {
-        jobs.add(new MockJobStory(time - relativeStartTime));
+        jobs.add(new MockJobStory(random, time - relativeStartTime));
       }
     }
     
@@ -149,9 +151,11 @@
   }
   
   static class MockJobStory implements JobStory {
+    private Random random;
     private long submissionTime;
     
-    public MockJobStory(long submissionTime) {
+    public MockJobStory(Random random, long submissionTime) {
+      this.random = random;
       this.submissionTime = submissionTime;
     }
     
@@ -183,12 +187,12 @@
 
     @Override
     public int getNumberMaps() {
-      throw new UnsupportedOperationException();
+      return random.nextInt(10)+1;
     }
 
     @Override
     public int getNumberReduces() {
-      throw new UnsupportedOperationException();
+      return random.nextInt(5);
     }
 
     @Override

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java Tue Jan 26 14:02:53 2010
@@ -79,7 +79,7 @@
       FakeJobs job = new FakeJobs("job1", 0, numMaps, numReduces);
 
       SimulatorJobCache.put(org.apache.hadoop.mapred.JobID.downgrade(jobId), job);
-      jobTracker.submitJob(jobId);
+      jobTracker.submitJob(jobId, "dummy-path", null);
     }
   }
 

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jan 26 14:02:53 2010
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/src/contrib/sqoop:713112
 /hadoop/core/trunk/src/contrib/sqoop:784975-786373
-/hadoop/mapreduce/trunk/src/contrib/sqoop:804974-885774
+/hadoop/mapreduce/trunk/src/contrib/sqoop:804974-903221

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/build.xml?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/build.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/build.xml Tue Jan 26 14:02:53 2010
@@ -33,7 +33,8 @@
   <!-- Override with our own version so we can enforce build dependencies -->
   <!-- on compile-mapred-test for MiniMRCluster, and MRUnit.              -->
   <!-- ================================================================== -->
-  <target name="compile-test" depends="compile-examples" if="test.available">
+  <target name="compile-test" depends="compile-examples, ivy-retrieve-test"
+      if="test.available">
     <echo message="Compiling ${name} dependencies" />
     <!-- need top-level compile-mapred-test for MiniMRCluster -->
     <subant target="compile-mapred-test">
@@ -132,8 +133,6 @@
       <classpath>
         <path refid="test.classpath"/>
         <path refid="contrib-classpath"/>
-        <!-- tools.jar from Sun JDK also required to invoke javac. -->
-        <pathelement path="${env.JAVA_HOME}/lib/tools.jar" />
         <!-- need thirdparty JDBC drivers for thirdparty tests -->
         <fileset dir="${sqoop.thirdparty.lib.dir}"
             includes="*.jar" />

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/doc/Sqoop-manpage.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/doc/Sqoop-manpage.txt?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/doc/Sqoop-manpage.txt (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/doc/Sqoop-manpage.txt Tue Jan 26 14:02:53 2010
@@ -113,6 +113,12 @@
   When using direct mode, write to multiple files of
   approximately _size_ bytes each.
 
+Export control options
+~~~~~~~~~~~~~~~~~~~~~~
+
+--export-dir (dir)::
+  Export from an HDFS path into a table (set with
+  --table)
 
 Output line formatting options
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -145,6 +151,14 @@
 --package-name (package)::
   Puts auto-generated classes in the named Java package
 
+Library loading options
+~~~~~~~~~~~~~~~~~~~~~~~
+--jar-file (file)::
+  Disable code generation; use specified jar
+
+--class-name (name)::
+  The class within the jar that represents the table to import/export
+
 Additional commands
 ~~~~~~~~~~~~~~~~~~~
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/doc/SqoopUserGuide.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/doc/SqoopUserGuide.txt?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/doc/SqoopUserGuide.txt (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/doc/SqoopUserGuide.txt Tue Jan 26 14:02:53 2010
@@ -59,6 +59,8 @@
 
 include::hive.txt[]
 
+include::export.txt[]
+
 include::supported-dbs.txt[]
 
 include::api-reference.txt[]

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/doc/api-reference.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/doc/api-reference.txt?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/doc/api-reference.txt (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/doc/api-reference.txt Tue Jan 26 14:02:53 2010
@@ -81,14 +81,14 @@
 rewrite the majority of +getColNames()+.
 
 +ConnManager+ implementations receive a lot of their configuration data from a
-Sqoop-specific class, +ImportOptions+. While +ImportOptions+ does not currently
-contain many setter methods, clients should not assume +ImportOptions+ are
-immutable. More setter methods may be added in the future.  +ImportOptions+ does
+Sqoop-specific class, +SqoopOptions+. While +SqoopOptions+ does not currently
+contain many setter methods, clients should not assume +SqoopOptions+ are
+immutable. More setter methods may be added in the future.  +SqoopOptions+ does
 not directly store specific per-manager options. Instead, it contains a
 reference to the +Configuration+ returned by +Tool.getConf()+ after parsing
 command-line arguments with the +GenericOptionsParser+. This allows extension
 arguments via "+-D any.specific.param=any.value+" without requiring any layering
-of options parsing or modification of +ImportOptions+.
+of options parsing or modification of +SqoopOptions+.
 
 All existing +ConnManager+ implementations are stateless. Thus, the system which
 instantiates +ConnManagers+ may implement multiple instances of the same
@@ -102,7 +102,7 @@
 +ManagerFactory+ implementation should be provided with the new ConnManager.
 +ManagerFactory+ has a single method of note, named +accept()+. This method will
 determine whether it can instantiate a +ConnManager+ for the user's
-+ImportOptions+. If so, it returns the +ConnManager+ instance. Otherwise, it
++SqoopOptions+. If so, it returns the +ConnManager+ instance. Otherwise, it
 returns +null+.
 
 The +ManagerFactory+ implementations used are governed by the
@@ -110,7 +110,7 @@
 libraries can install the 3rd-party library containing a new +ManagerFactory+
 and +ConnManager+(s), and configure sqoop-site.xml to use the new
 +ManagerFactory+.  The +DefaultManagerFactory+ principly discriminates between
-databases by parsing the connect string stored in +ImportOptions+.
+databases by parsing the connect string stored in +SqoopOptions+.
 
 Extension authors may make use of classes in the +org.apache.hadoop.sqoop.io+,
 +mapred+, +mapreduce+, and +util+ packages to facilitate their implementations.
@@ -124,7 +124,7 @@
 This section describes the internal architecture of Sqoop.
 
 The Sqoop program is driven by the +org.apache.hadoop.sqoop.Sqoop+ main class.
-A limited number of additional classes are in the same package; +ImportOptions+
+A limited number of additional classes are in the same package; +SqoopOptions+
 (described earlier) and +ConnFactory+ (which manipulates +ManagerFactory+
 instances).
 
@@ -135,11 +135,11 @@
 
 +org.apache.hadoop.sqoop.Sqoop+ is the main class and implements _Tool_. A new
 instance is launched with +ToolRunner+. It parses its arguments using the
-+ImportOptions+ class.  Within the +ImportOptions+, an +ImportAction+ will be
++SqoopOptions+ class.  Within the +SqoopOptions+, an +ImportAction+ will be
 chosen by the user. This may be import all tables, import one specific table,
 execute a SQL statement, or others.
 
-A +ConnManager+ is then instantiated based on the data in the +ImportOptions+.
+A +ConnManager+ is then instantiated based on the data in the +SqoopOptions+.
 The +ConnFactory+ is used to get a +ConnManager+ from a +ManagerFactory+; the
 mechanics of this were described in an earlier section.
 
@@ -161,7 +161,7 @@
 extended with additional parameters in the future, which optionally further
 direct the import operation. Similarly, the +exportTable()+ method receives an
 argument of type +ExportJobContext+. These classes contain the name of the table
-to import/export, a reference to the +ImportOptions+ object, and other related
+to import/export, a reference to the +SqoopOptions+ object, and other related
 data.
 
 Subpackages
@@ -207,8 +207,8 @@
   importers.
 * +Executor+ launches external processes and connects these to stream handlers
   generated by an AsyncSink (see more detail below).
-* +ExportError+ is thrown by +ConnManagers+ when exports fail.
-* +ImportError+ is thrown by +ConnManagers+ when imports fail.
+* +ExportException+ is thrown by +ConnManagers+ when exports fail.
+* +ImportException+ is thrown by +ConnManagers+ when imports fail.
 * +JdbcUrl+ handles parsing of connect strings, which are URL-like but not
   specification-conforming. (In particular, JDBC connect strings may have
   +multi:part:scheme://+ components.)

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/ivy.xml?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/ivy.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/ivy.xml Tue Jan 26 14:02:53 2010
@@ -41,20 +41,18 @@
   </publications>
   <dependencies>
      <dependency org="org.slf4j" name="slf4j-api" rev="${slf4j-api.version}" conf="common->master"/>
-     <dependency org="org.slf4j" name="slf4j-log4j12" rev="${slf4j-log4j12.version}" conf="common->master"/>
      <dependency org="org.apache.hadoop" name="hadoop-core" rev="${hadoop-core.version}" conf="common->default"/>
      <dependency org="org.apache.hadoop" name="hadoop-core-test" rev="${hadoop-core.version}" conf="common->default"/>
      <dependency org="org.apache.hadoop" name="hadoop-hdfs" rev="${hadoop-hdfs.version}" conf="common->default"/>
-     <dependency org="org.apache.hadoop" name="hadoop-hdfs-test" rev="${hadoop-hdfs.version}" conf="common->default"/>
+     <dependency org="org.apache.hadoop" name="hadoop-hdfs-test" rev="${hadoop-hdfs.version}" conf="test->default"/>
      <dependency org="commons-logging" name="commons-logging" rev="${commons-logging.version}" conf="common->default"/>
      <dependency org="log4j" name="log4j" rev="${log4j.version}" conf="common->master"/>
-     <dependency org="org.mortbay.jetty" name="servlet-api-2.5" rev="${servlet-api-2.5.version}" conf="common->default"/>
      <dependency org="junit" name="junit" rev="${junit.version}" conf="common->default"/>
      <dependency org="commons-httpclient" name="commons-httpclient" rev="${commons-httpclient.version}" conf="common->default"/>
      <dependency org="commons-cli" name="commons-cli" rev="${commons-cli.version}" conf="common->default"/>
      <dependency org="hsqldb" name="hsqldb" rev="${hsqldb.version}" conf="common->default"/>
      <dependency org="org.apache.hadoop" name="avro" rev="${avro.version}" conf="common->default"/>
-     <dependency org="javax.servlet" name="servlet-api" rev="${servlet-api.version}" conf="common->master"/>
+     <dependency org="org.mortbay.jetty" name="servlet-api-2.5" rev="${servlet-api-2.5.version}" conf="common->master"/>
      <dependency org="org.mortbay.jetty" name="jetty" rev="${jetty.version}" conf="common->master"/>
      <dependency org="commons-io" name="commons-io" rev="${commons-io.version}" conf="common->default"/>
      <dependency org="org.mortbay.jetty" name="jetty-util" rev="${jetty-util.version}" conf="common->master"/>

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java Tue Jan 26 14:02:53 2010
@@ -87,7 +87,7 @@
    * @return a ConnManager instance for the appropriate database
    * @throws IOException if it cannot find a ConnManager for this schema
    */
-  public ConnManager getManager(ImportOptions opts) throws IOException {
+  public ConnManager getManager(SqoopOptions opts) throws IOException {
     // Try all the available manager factories.
     for (ManagerFactory factory : factories) {
       LOG.debug("Trying ManagerFactory: " + factory.getClass().getName());

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java Tue Jan 26 14:02:53 2010
@@ -19,6 +19,8 @@
 package org.apache.hadoop.sqoop;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,15 +31,17 @@
 
 import org.apache.hadoop.sqoop.hive.HiveImport;
 import org.apache.hadoop.sqoop.manager.ConnManager;
+import org.apache.hadoop.sqoop.manager.ExportJobContext;
 import org.apache.hadoop.sqoop.manager.ImportJobContext;
 import org.apache.hadoop.sqoop.orm.ClassWriter;
 import org.apache.hadoop.sqoop.orm.CompilationManager;
-import org.apache.hadoop.sqoop.util.ImportError;
+import org.apache.hadoop.sqoop.util.ExportException;
+import org.apache.hadoop.sqoop.util.ImportException;
 
 /**
  * Main entry-point for Sqoop
  * Usage: hadoop jar (this_jar_name) org.apache.hadoop.sqoop.Sqoop (options)
- * See the ImportOptions class for options.
+ * See the SqoopOptions class for options.
  */
 public class Sqoop extends Configured implements Tool {
 
@@ -53,41 +57,58 @@
     Configuration.addDefaultResource("sqoop-site.xml");
   }
 
-  private ImportOptions options;
+  private SqoopOptions options;
   private ConnManager manager;
   private HiveImport hiveImport;
+  private List<String> generatedJarFiles;
 
   public Sqoop() {
+    generatedJarFiles = new ArrayList<String>();
   }
 
-  public ImportOptions getOptions() {
+  public SqoopOptions getOptions() {
     return options;
   }
 
   /**
+   * @return a list of jar files generated as part of this im/export process
+   */
+  public List<String> getGeneratedJarFiles() {
+    ArrayList<String> out = new ArrayList<String>(generatedJarFiles);
+    return out;
+  }
+
+  /**
    * Generate the .class and .jar files
    * @return the filename of the emitted jar file.
    * @throws IOException
    */
   private String generateORM(String tableName) throws IOException {
+    String existingJar = options.getExistingJarName();
+    if (existingJar != null) {
+      // The user has pre-specified a jar and class to use. Don't generate.
+      LOG.info("Using existing jar: " + existingJar);
+      return existingJar;
+    }
+
     LOG.info("Beginning code generation");
     CompilationManager compileMgr = new CompilationManager(options);
     ClassWriter classWriter = new ClassWriter(options, manager, tableName, compileMgr);
     classWriter.generate();
     compileMgr.compile();
     compileMgr.jar();
-    return compileMgr.getJarFilename();
+    String jarFile = compileMgr.getJarFilename();
+    this.generatedJarFiles.add(jarFile);
+    return jarFile;
   }
 
-  private void importTable(String tableName) throws IOException, ImportError {
+  private void importTable(String tableName) throws IOException, ImportException {
     String jarFile = null;
 
     // Generate the ORM code for the tables.
-    // TODO(aaron): Allow this to be bypassed if the user has already generated code,
-    // or if they're using a non-MapReduce import method (e.g., mysqldump).
     jarFile = generateORM(tableName);
 
-    if (options.getAction() == ImportOptions.ControlAction.FullImport) {
+    if (options.getAction() == SqoopOptions.ControlAction.FullImport) {
       // Proceed onward to do the import.
       ImportJobContext context = new ImportJobContext(tableName, jarFile, options);
       manager.importTable(context);
@@ -99,17 +120,26 @@
     }
   }
 
+  private void exportTable(String tableName) throws ExportException, IOException {
+    String jarFile = null;
+
+    // Generate the ORM code for the tables.
+    jarFile = generateORM(tableName);
+
+    ExportJobContext context = new ExportJobContext(tableName, jarFile, options);
+    manager.exportTable(context);
+  }
 
   /**
    * Actual main entry-point for the program
    */
   public int run(String [] args) {
-    options = new ImportOptions();
+    options = new SqoopOptions();
     options.setConf(getConf());
     try {
       options.parse(args);
       options.validate();
-    } catch (ImportOptions.InvalidOptionsException e) {
+    } catch (SqoopOptions.InvalidOptionsException e) {
       // display the error msg
       System.err.println(e.getMessage());
       return 1; // exit on exception here
@@ -131,8 +161,8 @@
       hiveImport = new HiveImport(options, manager, getConf());
     }
 
-    ImportOptions.ControlAction action = options.getAction();
-    if (action == ImportOptions.ControlAction.ListTables) {
+    SqoopOptions.ControlAction action = options.getAction();
+    if (action == SqoopOptions.ControlAction.ListTables) {
       String [] tables = manager.listTables();
       if (null == tables) {
         System.err.println("Could not retrieve tables list from server");
@@ -143,7 +173,7 @@
           System.out.println(tbl);
         }
       }
-    } else if (action == ImportOptions.ControlAction.ListDatabases) {
+    } else if (action == SqoopOptions.ControlAction.ListDatabases) {
       String [] databases = manager.listDatabases();
       if (null == databases) {
         System.err.println("Could not retrieve database list from server");
@@ -154,10 +184,29 @@
           System.out.println(db);
         }
       }
-    } else if (action == ImportOptions.ControlAction.DebugExec) {
+    } else if (action == SqoopOptions.ControlAction.DebugExec) {
       // just run a SQL statement for debugging purposes.
       manager.execAndPrint(options.getDebugSqlCmd());
       return 0;
+    } else if (action == SqoopOptions.ControlAction.Export) {
+      // Export a table.
+      try {
+        exportTable(options.getTableName());
+      } catch (IOException ioe) {
+        LOG.error("Encountered IOException running export job: " + ioe.toString());
+        if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
+          throw new RuntimeException(ioe);
+        } else {
+          return 1;
+        }
+      } catch (ExportException ee) {
+        LOG.error("Error during export: " + ee.toString());
+        if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
+          throw new RuntimeException(ee);
+        } else {
+          return 1;
+        }
+      }
     } else {
       // This is either FullImport or GenerateOnly.
 
@@ -184,7 +233,7 @@
         } else {
           return 1;
         }
-      } catch (ImportError ie) {
+      } catch (ImportException ie) {
         LOG.error("Error during import: " + ie.toString());
         if (System.getProperty(SQOOP_RETHROW_PROPERTY) != null) {
           throw new RuntimeException(ie);

Copied: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/SqoopOptions.java (from r888837, hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/SqoopOptions.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/SqoopOptions.java?p2=hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/SqoopOptions.java&p1=hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/SqoopOptions.java&r1=888837&r2=903227&rev=903227&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/SqoopOptions.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/SqoopOptions.java Tue Jan 26 14:02:53 2010
@@ -675,6 +675,10 @@
     return connectString;
   }
 
+  public void setConnectString(String connectStr) {
+    this.connectString = connectStr;
+  }
+
   public String getTableName() {
     return tableName;
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java Tue Jan 26 14:02:53 2010
@@ -32,7 +32,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.SqoopOptions;
 import org.apache.hadoop.sqoop.manager.ConnManager;
 import org.apache.hadoop.sqoop.util.Executor;
 import org.apache.hadoop.sqoop.util.LoggingAsyncSink;
@@ -46,11 +46,11 @@
 
   public static final Log LOG = LogFactory.getLog(HiveImport.class.getName());
 
-  private ImportOptions options;
+  private SqoopOptions options;
   private ConnManager connManager;
   private Configuration configuration;
 
-  public HiveImport(final ImportOptions opts, final ConnManager connMgr, final Configuration conf) {
+  public HiveImport(final SqoopOptions opts, final ConnManager connMgr, final Configuration conf) {
     this.options = opts;
     this.connManager = connMgr;
     this.configuration = conf;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java Tue Jan 26 14:02:53 2010
@@ -22,9 +22,8 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.SqoopOptions;
 import org.apache.hadoop.sqoop.manager.ConnManager;
-import org.apache.hadoop.sqoop.hive.HiveTypes;
 
 import java.io.File;
 import java.io.IOException;
@@ -47,7 +46,7 @@
 
   public static final Log LOG = LogFactory.getLog(TableDefWriter.class.getName());
 
-  private ImportOptions options;
+  private SqoopOptions options;
   private ConnManager connManager;
   private Configuration configuration;
   private String tableName;
@@ -62,7 +61,7 @@
    * @param withComments if true, then tables will be created with a
    *        timestamp comment.
    */
-  public TableDefWriter(final ImportOptions opts, final ConnManager connMgr,
+  public TableDefWriter(final SqoopOptions opts, final ConnManager connMgr,
       final String table, final Configuration config, final boolean withComments) {
     this.options = opts;
     this.connManager = connMgr;
@@ -95,9 +94,9 @@
       first = false;
 
       Integer colType = columnTypes.get(col);
-      String hiveColType = HiveTypes.toHiveType(colType);
+      String hiveColType = connManager.toHiveType(colType);
       if (null == hiveColType) {
-        throw new IOException("Hive does not support the SQL type for column " + col);  
+        throw new IOException("Hive does not support the SQL type for column " + col);
       }
 
       sb.append(col + " " + hiveColType);
@@ -115,10 +114,10 @@
       sb.append("COMMENT 'Imported by sqoop on " + curDateStr + "' ");
     }
 
-    sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\0");
-    sb.append(Integer.toOctalString((int) options.getOutputFieldDelim()));
-    sb.append("' LINES TERMINATED BY '\\0");
-    sb.append(Integer.toOctalString((int) options.getOutputRecordDelim()));
+    sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '");
+    sb.append(getHiveOctalCharCode((int) options.getOutputFieldDelim()));
+    sb.append("' LINES TERMINATED BY '");
+    sb.append(getHiveOctalCharCode((int) options.getOutputRecordDelim()));
     sb.append("' STORED AS TEXTFILE");
 
     LOG.debug("Create statement: " + sb.toString());
@@ -170,5 +169,28 @@
     LOG.debug("Load statement: " + sb.toString());
     return sb.toString();
   }
+
+  /**
+   * Return a string identifying the character to use as a delimiter
+   * in Hive, in octal representation.
+   * Hive can specify delimiter characters in the form '\ooo' where
+   * ooo is a three-digit octal number between 000 and 177. Values
+   * may not be truncated ('\12' is wrong; '\012' is ok) nor may they
+   * be zero-prefixed (e.g., '\0177' is wrong).
+   *
+   * @param charNum the character to use as a delimiter
+   * @return a string of the form "\ooo" where ooo is an octal number
+   * in [000, 177].
+   * @throws IllegalArgumentException if charNum &gt;> 0177.
+   */
+  static String getHiveOctalCharCode(int charNum)
+      throws IllegalArgumentException {
+    if (charNum > 0177) {
+      throw new IllegalArgumentException(
+          "Character " + charNum + " is an out-of-range delimiter");
+    }
+
+    return String.format("\\%03o", charNum);
+  }
 }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/FieldFormatter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/FieldFormatter.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/FieldFormatter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/lib/FieldFormatter.java Tue Jan 26 14:02:53 2010
@@ -57,6 +57,10 @@
     boolean escapingLegal = (null != escape && escape.length() > 0 && !escape.equals("\000"));
     String withEscapes;
 
+    if (null == str) {
+      return null;
+    }
+
     if (escapingLegal) {
       // escaping is legal. Escape any instances of the escape char itself
       withEscapes = str.replace(escape, escape + escape);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java Tue Jan 26 14:02:53 2010
@@ -26,7 +26,8 @@
 
 import org.apache.hadoop.conf.Configuration;
 
-import org.apache.hadoop.sqoop.util.ImportError;
+import org.apache.hadoop.sqoop.util.ExportException;
+import org.apache.hadoop.sqoop.util.ImportException;
 
 /**
  * Abstract interface that manages connections to a database.
@@ -56,6 +57,20 @@
   public abstract String getPrimaryKey(String tableName);
 
   /**
+   * Return java type for SQL type
+   * @param sqlType     sql type
+   * @return            java type
+   */
+  public abstract String toJavaType(int sqlType);
+
+    /**
+     * Return hive type for SQL type
+     * @param sqlType   sql type
+     * @return          hive type
+     */
+  public abstract String toHiveType(int sqlType);
+
+  /**
    * Return an unordered mapping from colname to sqltype for
    * all columns in a table.
    *
@@ -91,11 +106,43 @@
    * Perform an import of a table from the database into HDFS
    */
   public abstract void importTable(ImportJobContext context)
-      throws IOException, ImportError;
+      throws IOException, ImportException;
+
+  /**
+   * When using a column name in a generated SQL query, how (if at all)
+   * should we escape that column name? e.g., a column named "table"
+   * may need to be quoted with backtiks: "`table`".
+   *
+   * @param colName the column name as provided by the user, etc.
+   * @return how the column name should be rendered in the sql text.
+   */
+  public String escapeColName(String colName) {
+    return colName;
+  }
+
+  /**
+   * When using a table name in a generated SQL query, how (if at all)
+   * should we escape that column name? e.g., a table named "table"
+   * may need to be quoted with backtiks: "`table`".
+   *
+   * @param tableName the table name as provided by the user, etc.
+   * @return how the table name should be rendered in the sql text.
+   */
+  public String escapeTableName(String tableName) {
+    return tableName;
+  }
 
   /**
    * Perform any shutdown operations on the connection.
    */
   public abstract void close() throws SQLException;
+
+  /**
+   * Export data stored in HDFS into a table in a database
+   */
+  public void exportTable(ExportJobContext context)
+      throws IOException, ExportException {
+    throw new ExportException("This database does not support exports");
+  }
 }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java Tue Jan 26 14:02:53 2010
@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.sqoop.manager;
 
-import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.SqoopOptions;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,7 +31,7 @@
 
   public static final Log LOG = LogFactory.getLog(DefaultManagerFactory.class.getName());
 
-  public ConnManager accept(ImportOptions options) {
+  public ConnManager accept(SqoopOptions options) {
     String manualDriver = options.getDriverClassName();
     if (manualDriver != null) {
       // User has manually specified JDBC implementation with --driver.

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java Tue Jan 26 14:02:53 2010
@@ -34,14 +34,14 @@
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.SqoopOptions;
 import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
 import org.apache.hadoop.sqoop.util.AsyncSink;
 import org.apache.hadoop.sqoop.util.DirectImportUtils;
 import org.apache.hadoop.sqoop.util.ErrorableAsyncSink;
 import org.apache.hadoop.sqoop.util.ErrorableThread;
 import org.apache.hadoop.sqoop.util.Executor;
-import org.apache.hadoop.sqoop.util.ImportError;
+import org.apache.hadoop.sqoop.util.ImportException;
 import org.apache.hadoop.sqoop.util.JdbcUrl;
 import org.apache.hadoop.sqoop.util.LoggingAsyncSink;
 import org.apache.hadoop.sqoop.util.PerfCounters;
@@ -53,7 +53,7 @@
 public class DirectPostgresqlManager extends PostgresqlManager {
   public static final Log LOG = LogFactory.getLog(DirectPostgresqlManager.class.getName());
 
-  public DirectPostgresqlManager(final ImportOptions opts) {
+  public DirectPostgresqlManager(final SqoopOptions opts) {
     // Inform superclass that we're overriding import method via alt. constructor.
     super(opts, true);
   }
@@ -66,9 +66,9 @@
   static class PostgresqlAsyncSink extends ErrorableAsyncSink {
     private final SplittableBufferedWriter writer;
     private final PerfCounters counters;
-    private final ImportOptions options;
+    private final SqoopOptions options;
 
-    PostgresqlAsyncSink(final SplittableBufferedWriter w, final ImportOptions opts,
+    PostgresqlAsyncSink(final SplittableBufferedWriter w, final SqoopOptions opts,
         final PerfCounters ctrs) {
       this.writer = w;
       this.options = opts;
@@ -85,11 +85,11 @@
 
       private final SplittableBufferedWriter writer;
       private final InputStream stream;
-      private final ImportOptions options;
+      private final SqoopOptions options;
       private final PerfCounters counters;
 
       PostgresqlStreamThread(final InputStream is, final SplittableBufferedWriter w,
-          final ImportOptions opts, final PerfCounters ctrs) {
+          final SqoopOptions opts, final PerfCounters ctrs) {
         this.stream = is;
         this.writer = w;
         this.options = opts;
@@ -278,15 +278,15 @@
    * via COPY FILE TO STDOUT.
    */
   public void importTable(ImportJobContext context)
-    throws IOException, ImportError {
+    throws IOException, ImportException {
 
     String tableName = context.getTableName();
     String jarFile = context.getJarFile();
-    ImportOptions options = context.getOptions();
+    SqoopOptions options = context.getOptions();
 
     LOG.info("Beginning psql fast path import");
 
-    if (options.getFileLayout() != ImportOptions.FileLayout.TextFile) {
+    if (options.getFileLayout() != SqoopOptions.FileLayout.TextFile) {
       // TODO(aaron): Support SequenceFile-based load-in
       LOG.warn("File import layout" + options.getFileLayout()
           + " is not supported by");
@@ -323,7 +323,7 @@
       int port = JdbcUrl.getPort(connectString);
 
       if (null == databaseName) {
-        throw new ImportError("Could not determine database name");
+        throw new ImportException("Could not determine database name");
       }
 
       LOG.info("Performing import of table " + tableName + " from database " + databaseName);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java Tue Jan 26 14:02:53 2010
@@ -24,7 +24,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.SqoopOptions;
 
 /**
  * Database manager that is connects to a generic JDBC-compliant
@@ -38,7 +38,7 @@
   private String jdbcDriverClass;
   private Connection connection;
 
-  public GenericJdbcManager(final String driverClass, final ImportOptions opts) {
+  public GenericJdbcManager(final String driverClass, final SqoopOptions opts) {
     super(opts);
 
     this.jdbcDriverClass = driverClass;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java Tue Jan 26 14:02:53 2010
@@ -21,7 +21,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.SqoopOptions;
 
 /**
  * Manages connections to hsqldb databases.
@@ -38,7 +38,7 @@
   // "PUBLIC";
   private static final String HSQL_SCHEMA_NAME = "PUBLIC";
 
-  public HsqldbManager(final ImportOptions opts) {
+  public HsqldbManager(final SqoopOptions opts) {
     super(DRIVER_CLASS, opts);
   }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java Tue Jan 26 14:02:53 2010
@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.sqoop.manager;
 
-import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.SqoopOptions;
 
 /**
  * A set of parameters describing an import operation; this is passed to
@@ -28,9 +28,9 @@
 
   private String tableName;
   private String jarFile;
-  private ImportOptions options;
+  private SqoopOptions options;
 
-  public ImportJobContext(final String table, final String jar, final ImportOptions opts) {
+  public ImportJobContext(final String table, final String jar, final SqoopOptions opts) {
     this.tableName = table;
     this.jarFile = jar;
     this.options = opts;
@@ -48,8 +48,8 @@
     return jarFile;
   }
 
-  /** @return the ImportOptions configured by the user */
-  public ImportOptions getOptions() {
+  /** @return the SqoopOptions configured by the user */
+  public SqoopOptions getOptions() {
     return options;
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java Tue Jan 26 14:02:53 2010
@@ -34,7 +34,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.SqoopOptions;
 import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
 import org.apache.hadoop.sqoop.lib.FieldFormatter;
 import org.apache.hadoop.sqoop.lib.RecordParser;
@@ -42,7 +42,7 @@
 import org.apache.hadoop.sqoop.util.DirectImportUtils;
 import org.apache.hadoop.sqoop.util.ErrorableAsyncSink;
 import org.apache.hadoop.sqoop.util.ErrorableThread;
-import org.apache.hadoop.sqoop.util.ImportError;
+import org.apache.hadoop.sqoop.util.ImportException;
 import org.apache.hadoop.sqoop.util.JdbcUrl;
 import org.apache.hadoop.sqoop.util.LoggingAsyncSink;
 import org.apache.hadoop.sqoop.util.PerfCounters;
@@ -154,11 +154,11 @@
    */
   static class ReparsingAsyncSink extends ErrorableAsyncSink {
     private final SplittableBufferedWriter writer;
-    private final ImportOptions options;
+    private final SqoopOptions options;
     private final PerfCounters counters;
 
     ReparsingAsyncSink(final SplittableBufferedWriter w,
-        final ImportOptions opts, final PerfCounters ctrs) {
+        final SqoopOptions opts, final PerfCounters ctrs) {
       this.writer = w;
       this.options = opts;
       this.counters = ctrs;
@@ -174,12 +174,12 @@
           ReparsingStreamThread.class.getName());
 
       private final SplittableBufferedWriter writer;
-      private final ImportOptions options;
+      private final SqoopOptions options;
       private final InputStream stream;
       private final PerfCounters counters;
 
       ReparsingStreamThread(final InputStream is,
-          final SplittableBufferedWriter w, final ImportOptions opts,
+          final SplittableBufferedWriter w, final SqoopOptions opts,
           final PerfCounters ctrs) {
         this.writer = w;
         this.options = opts;
@@ -291,7 +291,7 @@
   }
 
 
-  public LocalMySQLManager(final ImportOptions options) {
+  public LocalMySQLManager(final SqoopOptions options) {
     super(options, false);
   }
 
@@ -343,15 +343,15 @@
    * the database and upload the files directly to HDFS.
    */
   public void importTable(ImportJobContext context)
-      throws IOException, ImportError {
+      throws IOException, ImportException {
 
     String tableName = context.getTableName();
     String jarFile = context.getJarFile();
-    ImportOptions options = context.getOptions();
+    SqoopOptions options = context.getOptions();
 
     LOG.info("Beginning mysqldump fast path import");
 
-    if (options.getFileLayout() != ImportOptions.FileLayout.TextFile) {
+    if (options.getFileLayout() != SqoopOptions.FileLayout.TextFile) {
       // TODO(aaron): Support SequenceFile-based load-in
       LOG.warn("File import layout " + options.getFileLayout()
           + " is not supported by");
@@ -370,7 +370,7 @@
     int port = JdbcUrl.getPort(connectString);
 
     if (null == databaseName) {
-      throw new ImportError("Could not determine database name");
+      throw new ImportException("Could not determine database name");
     }
 
     LOG.info("Performing import of table " + tableName + " from database " + databaseName);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java Tue Jan 26 14:02:53 2010
@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.sqoop.manager;
 
-import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.SqoopOptions;
 
 /**
  * Interface for factory classes for ConnManager implementations.
@@ -28,6 +28,6 @@
  * one such call returns a non-null ConnManager instance.
  */
 public abstract class ManagerFactory {
-  public abstract ConnManager accept(ImportOptions options);
+  public abstract ConnManager accept(SqoopOptions options);
 }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java Tue Jan 26 14:02:53 2010
@@ -19,6 +19,8 @@
 package org.apache.hadoop.sqoop.manager;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -28,8 +30,8 @@
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.sqoop.ImportOptions;
-import org.apache.hadoop.sqoop.util.ImportError;
+import org.apache.hadoop.sqoop.SqoopOptions;
+import org.apache.hadoop.sqoop.util.ImportException;
 
 /**
  * Manages connections to MySQL databases
@@ -44,11 +46,11 @@
   // set to true after we warn the user that we can use direct fastpath.
   private static boolean warningPrinted = false;
 
-  public MySQLManager(final ImportOptions opts) {
+  public MySQLManager(final SqoopOptions opts) {
     super(DRIVER_CLASS, opts);
   }
 
-  protected MySQLManager(final ImportOptions opts, boolean ignored) {
+  protected MySQLManager(final SqoopOptions opts, boolean ignored) {
     // constructor used by subclasses to avoid the --direct warning.
     super(DRIVER_CLASS, opts);
   }
@@ -56,7 +58,7 @@
   @Override
   protected String getColNamesQuery(String tableName) {
     // Use mysql-specific hints and LIMIT to return fast
-    return "SELECT t.* FROM " + tableName + " AS t LIMIT 1";
+    return "SELECT t.* FROM " + escapeTableName(tableName) + " AS t LIMIT 1";
   }
 
   @Override
@@ -93,7 +95,7 @@
 
   @Override
   public void importTable(ImportJobContext context)
-        throws IOException, ImportError {
+        throws IOException, ImportException {
 
     // Check that we're not doing a MapReduce from localhost. If we are, point
     // out that we could use mysqldump.
@@ -113,11 +115,63 @@
       }
     }
 
+    checkDateTimeBehavior(context);
+
     // Then run the normal importTable() method.
     super.importTable(context);
   }
 
   /**
+   * MySQL allows TIMESTAMP fields to have the value '0000-00-00 00:00:00',
+   * which causes errors in import. If the user has not set the
+   * zeroDateTimeBehavior property already, we set it for them to coerce
+   * the type to null.
+   */
+  private void checkDateTimeBehavior(ImportJobContext context) {
+    final String zeroBehaviorStr = "zeroDateTimeBehavior";
+    final String convertToNull = "=convertToNull";
+
+    String connectStr = context.getOptions().getConnectString();
+    if (connectStr.indexOf("jdbc:") != 0) {
+      // This connect string doesn't have the prefix we expect.
+      // We can't parse the rest of it here.
+      return;
+    }
+
+    // This starts with 'jdbc:mysql://' ... let's remove the 'jdbc:'
+    // prefix so that java.net.URI can parse the rest of the line.
+    String uriComponent = connectStr.substring(5);
+    try {
+      URI uri = new URI(uriComponent);
+      String query = uri.getQuery(); // get the part after a '?'
+
+      // If they haven't set the zeroBehavior option, set it to
+      // squash-null for them.
+      if (null == query) {
+        connectStr = connectStr + "?" + zeroBehaviorStr + convertToNull;
+        LOG.info("Setting zero DATETIME behavior to convertToNull (mysql)");
+      } else if (query.length() == 0) {
+        connectStr = connectStr + zeroBehaviorStr + convertToNull;
+        LOG.info("Setting zero DATETIME behavior to convertToNull (mysql)");
+      } else if (query.indexOf(zeroBehaviorStr) == -1) {
+        if (!connectStr.endsWith("&")) {
+          connectStr = connectStr + "&";
+        }
+        connectStr = connectStr + zeroBehaviorStr + convertToNull;
+        LOG.info("Setting zero DATETIME behavior to convertToNull (mysql)");
+      }
+
+      LOG.debug("Rewriting connect string to " + connectStr);
+      context.getOptions().setConnectString(connectStr);
+    } catch (URISyntaxException use) {
+      // Just ignore this. If we can't parse the URI, don't attempt
+      // to add any extra flags to it.
+      LOG.debug("mysql: Couldn't parse connect str in checkDateTimeBehavior: "
+          + use);
+    }
+  }
+
+  /**
    * Executes an arbitrary SQL statement. Sets mysql-specific parameter
    * to ensure the entire table is not buffered in RAM before reading
    * any rows. A consequence of this is that every ResultSet returned
@@ -141,5 +195,35 @@
     LOG.info("Executing SQL statement: " + stmt);
     return statement.executeQuery();
   }
+
+  /**
+   * When using a column name in a generated SQL query, how (if at all)
+   * should we escape that column name? e.g., a column named "table"
+   * may need to be quoted with backtiks: "`table`".
+   *
+   * @param colName the column name as provided by the user, etc.
+   * @return how the column name should be rendered in the sql text.
+   */
+  public String escapeColName(String colName) {
+    if (null == colName) {
+      return null;
+    }
+    return "`" + colName + "`";
+  }
+
+  /**
+   * When using a table name in a generated SQL query, how (if at all)
+   * should we escape that column name? e.g., a table named "table"
+   * may need to be quoted with backtiks: "`table`".
+   *
+   * @param tableName the table name as provided by the user, etc.
+   * @return how the table name should be rendered in the sql text.
+   */
+  public String escapeTableName(String tableName) {
+    if (null == tableName) {
+      return null;
+    }
+    return "`" + tableName + "`";
+  }
 }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java Tue Jan 26 14:02:53 2010
@@ -23,15 +23,18 @@
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Types;
 import java.util.ArrayList;
+import java.util.TimeZone;
+import java.lang.reflect.Method;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.SqoopOptions;
 import org.apache.hadoop.sqoop.mapred.ImportJob;
-import org.apache.hadoop.sqoop.util.ImportError;
+import org.apache.hadoop.sqoop.util.ImportException;
 
 /**
  * Manages connections to Oracle databases.
@@ -44,13 +47,13 @@
   // driver class to ensure is loaded when making db connection.
   private static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver";
 
-  public OracleManager(final ImportOptions opts) {
+  public OracleManager(final SqoopOptions opts) {
     super(DRIVER_CLASS, opts);
   }
 
   protected String getColNamesQuery(String tableName) {
     // SqlManager uses "tableName AS t" which doesn't work in Oracle.
-    return "SELECT t.* FROM " + tableName + " t";
+    return "SELECT t.* FROM " + escapeTableName(tableName) + " t";
   }
 
   /**
@@ -83,19 +86,61 @@
     // We only use this for metadata queries. Loosest semantics are okay.
     connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
 
+    // Setting session time zone
+    setSessionTimeZone(connection);
+
     return connection;
   }
 
   /**
+   * Set session time zone
+   * @param conn      Connection object
+   * @throws          SQLException instance
+   */
+  private void setSessionTimeZone(Connection conn) throws SQLException {
+    // need to use reflection to call the method setSessionTimeZone on the OracleConnection class
+    // because oracle specific java libraries are not accessible in this context
+    Method method;
+    try {
+      method = conn.getClass().getMethod(
+              "setSessionTimeZone", new Class [] {String.class});
+    } catch (Exception ex) {
+      LOG.error("Could not find method setSessionTimeZone in " + conn.getClass().getName(), ex);
+      // rethrow SQLException
+      throw new SQLException(ex);
+    }
+
+    // Need to set the time zone in order for Java
+    // to correctly access the column "TIMESTAMP WITH LOCAL TIME ZONE"
+    String clientTimeZone = TimeZone.getDefault().getID();
+    try {
+      method.setAccessible(true);
+      method.invoke(conn, clientTimeZone);
+      LOG.info("Time zone has been set");
+    } catch (Exception ex) {
+      LOG.warn("Time zone " + clientTimeZone +
+               " could not be set on oracle database.");
+      LOG.info("Setting default time zone: UTC");
+      try {
+        method.invoke(conn, "UTC");
+      } catch (Exception ex2) {
+        LOG.error("Could not set time zone for oracle connection", ex2);
+        // rethrow SQLException
+        throw new SQLException(ex);
+      }
+    }
+  }
+
+  /**
    * This importTable() implementation continues to use the older DBInputFormat
    * because DataDrivenDBInputFormat does not currently work with Oracle.
    */
   public void importTable(ImportJobContext context)
-      throws IOException, ImportError {
+      throws IOException, ImportException {
 
     String tableName = context.getTableName();
     String jarFile = context.getJarFile();
-    ImportOptions options = context.getOptions();
+    SqoopOptions options = context.getOptions();
     ImportJob importer = new ImportJob(options);
     String splitCol = options.getSplitByCol();
     if (null == splitCol) {
@@ -105,11 +150,123 @@
 
     if (null == splitCol) {
       // Can't infer a primary key.
-      throw new ImportError("No primary key could be found for table " + tableName
+      throw new ImportException("No primary key could be found for table " + tableName
           + ". Please specify one with --split-by.");
     }
 
     importer.runImport(tableName, jarFile, splitCol, options.getConf());
   }
+
+  /**
+   * Resolve a database-specific type to the Java type that should contain it.
+   * @param sqlType
+   * @return the name of a Java type to hold the sql datatype, or null if none.
+   */
+  public String toJavaType(int sqlType) {
+    String defaultJavaType = super.toJavaType(sqlType);
+    return (defaultJavaType == null) ? dbToJavaType(sqlType) : defaultJavaType;
+  }
+
+  /**
+   * Attempt to map sql type to java type
+   * @param sqlType     sql type
+   * @return            java type
+   */
+  private String dbToJavaType(int sqlType) {
+    // load class oracle.jdbc.OracleTypes
+    // need to use reflection because oracle specific libraries
+    // are not accessible in this context
+    Class typeClass = getTypeClass("oracle.jdbc.OracleTypes");
+
+    // check if it is TIMESTAMPTZ
+    int dbType = getDatabaseType(typeClass, "TIMESTAMPTZ");
+    if (sqlType == dbType) {
+      return "java.sql.Timestamp";
+    }
+
+    // check if it is TIMESTAMPLTZ
+    dbType = getDatabaseType(typeClass, "TIMESTAMPLTZ");
+    if (sqlType == dbType) {
+      return "java.sql.Timestamp";
+    }
+
+    // return null if no java type was found for sqlType
+    return null;
+  }
+    
+  /**
+   * Attempt to map sql type to hive type
+   * @param sqlType     sql data type
+   * @return            hive data type
+   */
+  public String toHiveType(int sqlType) {
+    String defaultHiveType = super.toHiveType(sqlType);
+    return (defaultHiveType == null) ? dbToHiveType(sqlType) : defaultHiveType;
+  }
+
+  /**
+   * Resolve a database-specific type to Hive type
+   * @param sqlType     sql type
+   * @return            hive type
+   */
+  private String dbToHiveType(int sqlType) {
+    // load class oracle.jdbc.OracleTypes
+    // need to use reflection because oracle specific libraries
+    // are not accessible in this context
+    Class typeClass = getTypeClass("oracle.jdbc.OracleTypes");
+
+    // check if it is TIMESTAMPTZ
+    int dbType = getDatabaseType(typeClass, "TIMESTAMPTZ");
+    if (sqlType == dbType) {
+      return "STRING";
+    }
+
+    // check if it is TIMESTAMPLTZ
+    dbType = getDatabaseType(typeClass, "TIMESTAMPLTZ");
+    if (sqlType == dbType) {
+      return "STRING";
+    }
+
+    // return null if no hive type was found for sqlType
+    return null;
+  }
+
+  /**
+   * Get database type
+   * @param clazz         oracle class representing sql types
+   * @param fieldName     field name
+   * @return              value of database type constant
+   */
+  private int getDatabaseType(Class clazz, String fieldName) {
+    // need to use reflection to extract constant values
+    // because the database specific java libraries are not accessible in this context
+    int value = -1;
+    try {
+      java.lang.reflect.Field field = clazz.getDeclaredField(fieldName);
+      value = field.getInt(null);
+    } catch (NoSuchFieldException ex) {
+      LOG.error("Could not retrieve value for field " + fieldName, ex);
+    } catch (IllegalAccessException ex) {
+      LOG.error("Could not retrieve value for field " + fieldName, ex);
+    }
+    return value;
+  }
+
+  /**
+   * Load class by name
+   * @param className     class name
+   * @return              class instance
+   */
+  private Class getTypeClass(String className) {
+    // need to use reflection to load class
+    // because the database specific java libraries are not accessible in this context
+    Class typeClass = null;
+    try {
+      typeClass = Class.forName(className);
+    } catch (ClassNotFoundException ex) {
+      LOG.error("Could not load class " + className, ex);
+    }
+    return typeClass;
+  }
 }
 



Mime
View raw message