hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r885145 [24/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/ src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...
Date Sat, 28 Nov 2009 20:26:22 GMT
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java Sat Nov 28 20:26:01 2009
@@ -86,10 +86,7 @@
  * RunningJob job = jc.submitJob(conf);
  * ...
  * </pre>
- * @deprecated 
- * Use {@link org.apache.hadoop.mapreduce.lib.chain.ChainReducer} instead
  */
-@Deprecated
 public class ChainReducer implements Reducer {
 
   /**

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java Sat Nov 28 20:26:01 2009
@@ -145,9 +145,9 @@
                             {split, jc, reporter, Integer.valueOf(idx)});
 
       // setup some helper config variables.
-      jc.set("map.input.file", split.getPath(idx).toString());
-      jc.setLong("map.input.start", split.getOffset(idx));
-      jc.setLong("map.input.length", split.getLength(idx));
+      jc.set(JobContext.MAP_INPUT_FILE, split.getPath(idx).toString());
+      jc.setLong(JobContext.MAP_INPUT_START, split.getOffset(idx));
+      jc.setLong(JobContext.MAP_INPUT_PATH, split.getLength(idx));
     } catch (Exception e) {
       throw new RuntimeException (e);
     }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java Sat Nov 28 20:26:01 2009
@@ -44,9 +44,9 @@
  * fields are from the value only. Otherwise, the fields are the union of those
  * from the key and those from the value.
  * 
- * The field separator is under attribute "mapred.data.field.separator"
+ * The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
  * 
- * The map output field list spec is under attribute "map.output.key.value.fields.spec".
+ * The map output field list spec is under attribute "mapreduce.fieldsel.mapreduce.fieldsel.map.output.key.value.fields.spec".
  * The value is expected to be like "keyFieldsSpec:valueFieldsSpec"
  * key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ...
  * Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range
@@ -57,7 +57,7 @@
  * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys,
  * and use fields 6,5,1,2,3,7 and above for values.
  * 
- * The reduce output field list spec is under attribute "reduce.output.key.value.fields.spec".
+ * The reduce output field list spec is under attribute "mapreduce.fieldsel.mapreduce.fieldsel.reduce.output.key.value.fields.spec".
  * 
  * The reducer extracts output key/value pairs in a similar manner, except that
  * the key is never ignored.
@@ -156,13 +156,13 @@
   }
 
   public void configure(JobConf job) {
-    this.fieldSeparator = job.get("mapred.data.field.separator", "\t");
-    this.mapOutputKeyValueSpec = job.get("map.output.key.value.fields.spec",
+    this.fieldSeparator = job.get("mapreduce.fieldsel.data.field.separator", "\t");
+    this.mapOutputKeyValueSpec = job.get("mapreduce.fieldsel.mapreduce.fieldsel.map.output.key.value.fields.spec",
         "0-:");
     this.ignoreInputKey = TextInputFormat.class.getCanonicalName().equals(
         job.getInputFormat().getClass().getCanonicalName());
     this.reduceOutputKeyValueSpec = job.get(
-        "reduce.output.key.value.fields.spec", "0-:");
+        "mapreduce.fieldsel.mapreduce.fieldsel.reduce.output.key.value.fields.spec", "0-:");
     parseOutputKeyValueSpec();
     LOG.info(specToString());
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedComparator.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedComparator.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedComparator.java Sat Nov 28 20:26:01 2009
@@ -20,6 +20,7 @@
 
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapreduce.JobContext;
 
 /**
  * This comparator implementation provides a subset of the features provided
@@ -33,8 +34,8 @@
  *  character. If '.c' is omitted from pos1, it defaults to 1 (the beginning
  *  of the field); if omitted from pos2, it defaults to 0 (the end of the
  *  field). opts are ordering options (any of 'nr' as described above). 
- * We assume that the fields in the key are separated by 
- * map.output.key.field.separator.
+ * We assume that the fields in the key are separated by
+ * {@link JobContext#MAP_OUTPUT_KEY_FIELD_SEPERATOR} 
  * @deprecated Use 
  * {@link org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator} 
  * instead

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/LazyOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/LazyOutputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/LazyOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/LazyOutputFormat.java Sat Nov 28 20:26:01 2009
@@ -44,7 +44,7 @@
   public static void  setOutputFormatClass(JobConf job, 
       Class<? extends OutputFormat> theClass) {
       job.setOutputFormat(LazyOutputFormat.class);
-      job.setClass("mapred.lazy.output.format", theClass, OutputFormat.class);
+      job.setClass("mapreduce.output.lazyoutputformat.outputformat", theClass, OutputFormat.class);
   }
 
   @Override
@@ -68,7 +68,7 @@
   @SuppressWarnings("unchecked")
   private void getBaseOutputFormat(JobConf job) throws IOException {
     baseOut = ReflectionUtils.newInstance(
-        job.getClass("mapred.lazy.output.format", null, OutputFormat.class), 
+        job.getClass("mapreduce.output.lazyoutputformat.outputformat", null, OutputFormat.class), 
         job); 
     if (baseOut == null) {
       throw new IOException("Ouput format not set for LazyOutputFormat");

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultipleInputs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultipleInputs.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultipleInputs.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultipleInputs.java Sat Nov 28 20:26:01 2009
@@ -48,8 +48,8 @@
 
     String inputFormatMapping = path.toString() + ";"
        + inputFormatClass.getName();
-    String inputFormats = conf.get("mapred.input.dir.formats");
-    conf.set("mapred.input.dir.formats",
+    String inputFormats = conf.get("mapreduce.input.multipleinputs.dir.formats");
+    conf.set("mapreduce.input.multipleinputs.dir.formats",
        inputFormats == null ? inputFormatMapping : inputFormats + ","
            + inputFormatMapping);
 
@@ -72,8 +72,8 @@
     addInputPath(conf, path, inputFormatClass);
 
     String mapperMapping = path.toString() + ";" + mapperClass.getName();
-    String mappers = conf.get("mapred.input.dir.mappers");
-    conf.set("mapred.input.dir.mappers", mappers == null ? mapperMapping
+    String mappers = conf.get("mapreduce.input.multipleinputs.dir.mappers");
+    conf.set("mapreduce.input.multipleinputs.dir.mappers", mappers == null ? mapperMapping
        : mappers + "," + mapperMapping);
 
     conf.setMapperClass(DelegatingMapper.class);
@@ -89,7 +89,7 @@
    */
   static Map<Path, InputFormat> getInputFormatMap(JobConf conf) {
     Map<Path, InputFormat> m = new HashMap<Path, InputFormat>();
-    String[] pathMappings = conf.get("mapred.input.dir.formats").split(",");
+    String[] pathMappings = conf.get("mapreduce.input.multipleinputs.dir.formats").split(",");
     for (String pathMapping : pathMappings) {
       String[] split = pathMapping.split(";");
       InputFormat inputFormat;
@@ -114,11 +114,11 @@
    */
   @SuppressWarnings("unchecked")
   static Map<Path, Class<? extends Mapper>> getMapperTypeMap(JobConf conf) {
-    if (conf.get("mapred.input.dir.mappers") == null) {
+    if (conf.get("mapreduce.input.multipleinputs.dir.mappers") == null) {
       return Collections.emptyMap();
     }
     Map<Path, Class<? extends Mapper>> m = new HashMap<Path, Class<? extends Mapper>>();
-    String[] pathMappings = conf.get("mapred.input.dir.mappers").split(",");
+    String[] pathMappings = conf.get("mapreduce.input.multipleinputs.dir.mappers").split(",");
     for (String pathMapping : pathMappings) {
       String[] split = pathMapping.split(";");
       Class<? extends Mapper> mapClass;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java Sat Nov 28 20:26:01 2009
@@ -28,6 +28,7 @@
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.util.Progressable;
 
 /**
@@ -47,7 +48,10 @@
  * Case three: This class is used for a map only job. The job wants to use an
  * output file name that depends on both the keys and the input file name,
  * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.output.MultipleOutputs} instead
  */
+@Deprecated
 public abstract class MultipleOutputFormat<K, V>
 extends FileOutputFormat<K, V> {
 
@@ -171,7 +175,7 @@
 
   /**
    * Generate the outfile name based on a given anme and the input file name. If
-   * the map input file does not exists (i.e. this is not for a map only job),
+   * the {@link JobContext#MAP_INPUT_FILE} does not exists (i.e. this is not for a map only job),
    * the given name is returned unchanged. If the config value for
    * "num.of.trailing.legs.to.use" is not set, or set 0 or negative, the given
    * name is returned unchanged. Otherwise, return a file name consisting of the
@@ -185,9 +189,10 @@
    * @return the outfile name based on a given anme and the input file name.
    */
   protected String getInputFileBasedOutputFileName(JobConf job, String name) {
-    String infilepath = job.get("map.input.file");
+    String infilepath = job.get(JobContext.MAP_INPUT_FILE);
     if (infilepath == null) {
-      // if the map input file does not exists, then return the given name
+      // if the {@link JobContext#MAP_INPUT_FILE} does not exists,
+      // then return the given name
       return name;
     }
     int numOfTrailingLegsToUse = job.getInt("mapred.outputformat.numOfTrailingLegs", 0);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java Sat Nov 28 20:26:01 2009
@@ -112,7 +112,10 @@
  *
  * }
  * </pre>
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.output.MultipleOutputs} instead
  */
+@Deprecated
 public class MultipleOutputs {
 
   private static final String NAMED_OUTPUTS = "mo.namedOutputs";

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java Sat Nov 28 20:26:01 2009
@@ -29,7 +29,10 @@
 /**
  * This class extends the MultipleOutputFormat, allowing to write the output data 
  * to different output files in sequence file output format. 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.output.MultipleOutputs} instead
  */
+@Deprecated
 public class MultipleSequenceFileOutputFormat <K,V>
 extends MultipleOutputFormat<K, V> {
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java Sat Nov 28 20:26:01 2009
@@ -29,7 +29,10 @@
 /**
  * This class extends the MultipleOutputFormat, allowing to write the output
  * data to different output files in Text output format.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.output.MultipleOutputs} instead
  */
+@Deprecated
 public class MultipleTextOutputFormat<K, V>
     extends MultipleOutputFormat<K, V> {
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java Sat Nov 28 20:26:01 2009
@@ -67,7 +67,7 @@
   @SuppressWarnings("unchecked")
   public void configure(JobConf jobConf) {
     int numberOfThreads =
-      jobConf.getInt("mapred.map.multithreadedrunner.threads", 10);
+      jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Configuring jobConf " + jobConf.getJobName() +
                 " to use " + numberOfThreads + " threads");

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java Sat Nov 28 20:26:01 2009
@@ -87,6 +87,6 @@
   }
 
   public void configure(JobConf conf) {
-    N = conf.getInt("mapred.line.input.format.linespermap", 1);
+    N = conf.getInt("mapreduce.input.lineinputformat.linespermap", 1);
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java Sat Nov 28 20:26:01 2009
@@ -42,8 +42,10 @@
   private int group;
 
   public void configure(JobConf job) {
-    pattern = Pattern.compile(job.get("mapred.mapper.regex"));
-    group = job.getInt("mapred.mapper.regex.group", 0);
+    pattern = Pattern.compile(job.get(org.apache.hadoop.mapreduce.lib.map.
+                RegexMapper.PATTERN));
+    group = job.getInt(org.apache.hadoop.mapreduce.lib.map.
+              RegexMapper.GROUP, 0);
   }
 
   public void map(K key, Text value,

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java Sat Nov 28 20:26:01 2009
@@ -28,8 +28,10 @@
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.util.Progressable;
 
 /**
@@ -68,8 +70,8 @@
   public RecordWriter<K, V> getRecordWriter(FileSystem filesystem,
       JobConf job, String name, Progressable progress) throws IOException {
     org.apache.hadoop.mapreduce.RecordWriter<K, V> w = super.getRecordWriter(
-      new TaskAttemptContext(job, 
-            TaskAttemptID.forName(job.get("mapred.task.id"))));
+      new TaskAttemptContextImpl(job, 
+            TaskAttemptID.forName(job.get(JobContext.TASK_ATTEMPT_ID))));
     org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter writer = 
      (org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter) w;
     try {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/db/DBWritable.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/db/DBWritable.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/db/DBWritable.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/db/DBWritable.java Sat Nov 28 20:26:01 2009
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.mapred.lib.db;
 
 /**

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/package.html
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/package.html?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/package.html (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/package.html Sat Nov 28 20:26:01 2009
@@ -179,9 +179,9 @@
     grepJob.setCombinerClass(GrepReducer.class);
     grepJob.setReducerClass(GrepReducer.class);
 
-    grepJob.set("mapred.mapper.regex", args[2]);
+    grepJob.set("mapreduce.mapper.regex", args[2]);
     if (args.length == 4)
-      grepJob.set("mapred.mapper.regex.group", args[3]);
+      grepJob.set("mapreduce.mapper.regexmapper..group", args[3]);
 
     grepJob.setOutputFormat(SequenceFileOutputFormat.class);
     grepJob.setOutputKeyClass(Text.class);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Application.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Application.java Sat Nov 28 20:26:01 2009
@@ -29,6 +29,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.io.FloatWritable;
@@ -80,19 +81,24 @@
     Map<String, String> env = new HashMap<String,String>();
     // add TMPDIR environment variable with the value of java.io.tmpdir
     env.put("TMPDIR", System.getProperty("java.io.tmpdir"));
-    env.put("hadoop.pipes.command.port", 
+    env.put(Submitter.PORT, 
             Integer.toString(serverSocket.getLocalPort()));
     List<String> cmd = new ArrayList<String>();
-    String interpretor = conf.get("hadoop.pipes.executable.interpretor");
+    String interpretor = conf.get(Submitter.INTERPRETOR);
     if (interpretor != null) {
       cmd.add(interpretor);
     }
 
     String executable = DistributedCache.getLocalCacheFiles(conf)[0].toString();
-    FileUtil.chmod(executable, "a+x");
+    if (!new File(executable).canExecute()) {
+      // LinuxTaskController sets +x permissions on all distcache files already.
+      // In case of DefaultTaskController, set permissions here.
+      FileUtil.chmod(executable, "u+x");
+    }
     cmd.add(executable);
     // wrap the command in a stdout/stderr capture
-    TaskAttemptID taskid = TaskAttemptID.forName(conf.get("mapred.task.id"));
+    TaskAttemptID taskid = 
+      TaskAttemptID.forName(conf.get(JobContext.TASK_ATTEMPT_ID));
     File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
     File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
     long logLength = TaskLog.getTaskLogLength(conf);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java Sat Nov 28 20:26:01 2009
@@ -30,6 +30,7 @@
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SkipBadRecords;
+import org.apache.hadoop.mapreduce.JobContext;
 
 /**
  * An adaptor to run a C++ mapper.
@@ -76,7 +77,7 @@
     boolean isJavaInput = Submitter.getIsJavaRecordReader(job);
     downlink.runMap(reporter.getInputSplit(), 
                     job.getNumReduceTasks(), isJavaInput);
-    boolean skipping = job.getBoolean("mapred.skip.on", false);
+    boolean skipping = job.getBoolean(JobContext.SKIP_RECORDS, false);
     try {
       if (isJavaInput) {
         // allocate key & value instances that are re-used for all entries

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/PipesNonJavaInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/PipesNonJavaInputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/PipesNonJavaInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/PipesNonJavaInputFormat.java Sat Nov 28 20:26:01 2009
@@ -37,7 +37,7 @@
  * The only useful thing this does is set up the Map-Reduce job to get the
  * {@link PipesDummyRecordReader}, everything else left for the 'actual'
  * InputFormat specified by the user which is given by 
- * <i>mapred.pipes.user.inputformat</i>.
+ * <i>mapreduce.pipes.inputformat</i>.
  */
 class PipesNonJavaInputFormat 
 implements InputFormat<FloatWritable, NullWritable> {
@@ -51,7 +51,7 @@
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
     // Delegate the generation of input splits to the 'original' InputFormat
     return ReflectionUtils.newInstance(
-        job.getClass("mapred.pipes.user.inputformat", 
+        job.getClass(Submitter.INPUT_FORMAT, 
                      TextInputFormat.class, 
                      InputFormat.class), job).getSplits(job, numSplits);
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java Sat Nov 28 20:26:01 2009
@@ -27,6 +27,7 @@
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SkipBadRecords;
+import org.apache.hadoop.mapreduce.JobContext;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -49,7 +50,7 @@
     //disable the auto increment of the counter. For pipes, no of processed 
     //records could be different(equal or less) than the no of records input.
     SkipBadRecords.setAutoIncrReducerProcCount(job, false);
-    skipping = job.getBoolean("mapred.skip.on", false);
+    skipping = job.getBoolean(JobContext.SKIP_RECORDS, false);
   }
 
   /**

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Submitter.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Submitter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Submitter.java Sat Nov 28 20:26:01 2009
@@ -32,8 +32,6 @@
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.OptionGroup;
-import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.Parser;
@@ -41,6 +39,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -68,6 +67,18 @@
 public class Submitter extends Configured implements Tool {
 
   protected static final Log LOG = LogFactory.getLog(Submitter.class);
+  public static final String PRESERVE_COMMANDFILE = 
+    "mapreduce.pipes.commandfile.preserve";
+  public static final String EXECUTABLE = "mapreduce.pipes.executable";
+  public static final String INTERPRETOR = 
+    "mapreduce.pipes.executable.interpretor";
+  public static final String IS_JAVA_MAP = "mapreduce.pipes.isjavamapper";
+  public static final String IS_JAVA_RR = "mapreduce.pipes.isjavarecordreader";
+  public static final String IS_JAVA_RW = "mapreduce.pipes.isjavarecordwriter";
+  public static final String IS_JAVA_REDUCE = "mapreduce.pipes.isjavareducer";
+  public static final String PARTITIONER = "mapreduce.pipes.partitioner";
+  public static final String INPUT_FORMAT = "mapreduce.pipes.inputformat";
+  public static final String PORT = "mapreduce.pipes.command.port";
   
   public Submitter() {
     this(new Configuration());
@@ -83,9 +94,9 @@
    * @return the URI where the application's executable is located
    */
   public static String getExecutable(JobConf conf) {
-    return conf.get("hadoop.pipes.executable");
+    return conf.get(Submitter.EXECUTABLE);
   }
-  
+
   /**
    * Set the URI for the application's executable. Normally this is a hdfs: 
    * location.
@@ -93,7 +104,7 @@
    * @param executable The URI of the application's executable.
    */
   public static void setExecutable(JobConf conf, String executable) {
-    conf.set("hadoop.pipes.executable", executable);
+    conf.set(Submitter.EXECUTABLE, executable);
   }
 
   /**
@@ -102,7 +113,7 @@
    * @param value the new value
    */
   public static void setIsJavaRecordReader(JobConf conf, boolean value) {
-    conf.setBoolean("hadoop.pipes.java.recordreader", value);
+    conf.setBoolean(Submitter.IS_JAVA_RR, value);
   }
 
   /**
@@ -111,7 +122,7 @@
    * @return is it a Java RecordReader?
    */
   public static boolean getIsJavaRecordReader(JobConf conf) {
-    return conf.getBoolean("hadoop.pipes.java.recordreader", false);
+    return conf.getBoolean(Submitter.IS_JAVA_RR, false);
   }
 
   /**
@@ -120,7 +131,7 @@
    * @param value the new value
    */
   public static void setIsJavaMapper(JobConf conf, boolean value) {
-    conf.setBoolean("hadoop.pipes.java.mapper", value);
+    conf.setBoolean(Submitter.IS_JAVA_MAP, value);
   }
 
   /**
@@ -129,7 +140,7 @@
    * @return is it a Java Mapper?
    */
   public static boolean getIsJavaMapper(JobConf conf) {
-    return conf.getBoolean("hadoop.pipes.java.mapper", false);
+    return conf.getBoolean(Submitter.IS_JAVA_MAP, false);
   }
 
   /**
@@ -138,7 +149,7 @@
    * @param value the new value
    */
   public static void setIsJavaReducer(JobConf conf, boolean value) {
-    conf.setBoolean("hadoop.pipes.java.reducer", value);
+    conf.setBoolean(Submitter.IS_JAVA_REDUCE, value);
   }
 
   /**
@@ -147,7 +158,7 @@
    * @return is it a Java Reducer?
    */
   public static boolean getIsJavaReducer(JobConf conf) {
-    return conf.getBoolean("hadoop.pipes.java.reducer", false);
+    return conf.getBoolean(Submitter.IS_JAVA_REDUCE, false);
   }
 
   /**
@@ -156,7 +167,7 @@
    * @param value the new value to set
    */
   public static void setIsJavaRecordWriter(JobConf conf, boolean value) {
-    conf.setBoolean("hadoop.pipes.java.recordwriter", value);
+    conf.setBoolean(Submitter.IS_JAVA_RW, value);
   }
 
   /**
@@ -165,7 +176,7 @@
    * @return true, if the output of the job will be written by Java
    */
   public static boolean getIsJavaRecordWriter(JobConf conf) {
-    return conf.getBoolean("hadoop.pipes.java.recordwriter", false);
+    return conf.getBoolean(Submitter.IS_JAVA_RW, false);
   }
 
   /**
@@ -187,7 +198,7 @@
    * @param cls the user's partitioner class
    */
   static void setJavaPartitioner(JobConf conf, Class cls) {
-    conf.set("hadoop.pipes.partitioner", cls.getName());
+    conf.set(Submitter.PARTITIONER, cls.getName());
   }
   
   /**
@@ -196,7 +207,7 @@
    * @return the class that the user submitted
    */
   static Class<? extends Partitioner> getJavaPartitioner(JobConf conf) {
-    return conf.getClass("hadoop.pipes.partitioner", 
+    return conf.getClass(Submitter.PARTITIONER, 
                          HashPartitioner.class,
                          Partitioner.class);
   }
@@ -209,12 +220,12 @@
    * JobConf.setKeepFailedTaskFiles(true) to keep the entire directory from
    * being deleted.
    * To run using the data file, set the environment variable 
-   * "hadoop.pipes.command.file" to point to the file.
+   * "mapreduce.pipes.commandfile" to point to the file.
    * @param conf the configuration to check
    * @return will the framework save the command file?
    */
   public static boolean getKeepCommandFile(JobConf conf) {
-    return conf.getBoolean("hadoop.pipes.command-file.keep", false);
+    return conf.getBoolean(Submitter.PRESERVE_COMMANDFILE, false);
   }
 
   /**
@@ -223,7 +234,7 @@
    * @param keep the new value
    */
   public static void setKeepCommandFile(JobConf conf, boolean keep) {
-    conf.setBoolean("hadoop.pipes.command-file.keep", keep);
+    conf.setBoolean(Submitter.PRESERVE_COMMANDFILE, keep);
   }
 
   /**
@@ -279,15 +290,15 @@
       }
     }
     String textClassname = Text.class.getName();
-    setIfUnset(conf, "mapred.mapoutput.key.class", textClassname);
-    setIfUnset(conf, "mapred.mapoutput.value.class", textClassname);
-    setIfUnset(conf, "mapred.output.key.class", textClassname);
-    setIfUnset(conf, "mapred.output.value.class", textClassname);
+    setIfUnset(conf, JobContext.MAP_OUTPUT_KEY_CLASS, textClassname);
+    setIfUnset(conf, JobContext.MAP_OUTPUT_VALUE_CLASS, textClassname);
+    setIfUnset(conf, JobContext.OUTPUT_KEY_CLASS, textClassname);
+    setIfUnset(conf, JobContext.OUTPUT_VALUE_CLASS, textClassname);
     
     // Use PipesNonJavaInputFormat if necessary to handle progress reporting
     // from C++ RecordReaders ...
     if (!getIsJavaRecordReader(conf) && !getIsJavaMapper(conf)) {
-      conf.setClass("mapred.pipes.user.inputformat", 
+      conf.setClass(Submitter.INPUT_FORMAT, 
                     conf.getInputFormat().getClass(), InputFormat.class);
       conf.setInputFormat(PipesNonJavaInputFormat.class);
     }
@@ -302,8 +313,8 @@
       DistributedCache.createSymlink(conf);
       // set default gdb commands for map and reduce task 
       String defScript = "$HADOOP_HOME/src/c++/pipes/debug/pipes-default-script";
-      setIfUnset(conf,"mapred.map.task.debug.script",defScript);
-      setIfUnset(conf,"mapred.reduce.task.debug.script",defScript);
+      setIfUnset(conf, JobContext.MAP_DEBUG_SCRIPT,defScript);
+      setIfUnset(conf, JobContext.REDUCE_DEBUG_SCRIPT,defScript);
     }
     URI[] fileCache = DistributedCache.getCacheFiles(conf);
     if (fileCache == null) {
@@ -371,7 +382,7 @@
                                           JobConf conf, 
                                           Class<InterfaceType> cls
                                          ) throws ClassNotFoundException {
-    return conf.getClassByName((String) cl.getOptionValue(key)).asSubclass(cls);
+    return conf.getClassByName(cl.getOptionValue(key)).asSubclass(cls);
   }
 
   @Override
@@ -409,15 +420,14 @@
       JobConf job = new JobConf(getConf());
       
       if (results.hasOption("input")) {
-        FileInputFormat.setInputPaths(job, 
-                          (String) results.getOptionValue("input"));
+        FileInputFormat.setInputPaths(job, results.getOptionValue("input"));
       }
       if (results.hasOption("output")) {
         FileOutputFormat.setOutputPath(job, 
-          new Path((String) results.getOptionValue("output")));
+          new Path(results.getOptionValue("output")));
       }
       if (results.hasOption("jar")) {
-        job.setJar((String) results.getOptionValue("jar"));
+        job.setJar(results.getOptionValue("jar"));
       }
       if (results.hasOption("inputformat")) {
         setIsJavaRecordReader(job, true);
@@ -440,7 +450,7 @@
         job.setReducerClass(getClass(results, "reduce", job, Reducer.class));
       }
       if (results.hasOption("reduces")) {
-        job.setNumReduceTasks(Integer.parseInt((String) 
+        job.setNumReduceTasks(Integer.parseInt( 
                                            results.getOptionValue("reduces")));
       }
       if (results.hasOption("writer")) {
@@ -450,18 +460,18 @@
       }
       
       if (results.hasOption("lazyOutput")) {
-        if (Boolean.parseBoolean((String)results.getOptionValue("lazyOutput"))) {
+        if (Boolean.parseBoolean(results.getOptionValue("lazyOutput"))) {
           LazyOutputFormat.setOutputFormatClass(job,
               job.getOutputFormat().getClass());
         }
       }
       
       if (results.hasOption("program")) {
-        setExecutable(job, (String) results.getOptionValue("program"));
+        setExecutable(job, results.getOptionValue("program"));
       }
       if (results.hasOption("jobconf")) {
         LOG.warn("-jobconf option is deprecated, please use -D instead.");
-        String options = (String)results.getOptionValue("jobconf");
+        String options = results.getOptionValue("jobconf");
         StringTokenizer tokenizer = new StringTokenizer(options, ",");
         while (tokenizer.hasMoreTokens()) {
           String keyVal = tokenizer.nextToken().trim();
@@ -502,7 +512,8 @@
    * @param args
    */
   public static void main(String[] args) throws Exception {
-    new Submitter().run(args);
+    int exitCode =  new Submitter().run(args);
+    System.exit(exitCode);
   }
 
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Counter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Counter.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Counter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Counter.java Sat Nov 28 20:26:01 2009
@@ -50,6 +50,17 @@
     this.displayName = displayName;
   }
   
+  /** Create a counter.
+   * @param name the name within the group's enum.
+   * @param displayName a name to be displayed.
+   * @param value the counter value.
+   */
+  public Counter(String name, String displayName, long value) {
+    this.name = name;
+    this.displayName = displayName;
+    this.value = value;
+  }
+  
   @Deprecated
   protected synchronized void setDisplayName(String displayName) {
     this.displayName = displayName;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/CounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/CounterGroup.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/CounterGroup.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/CounterGroup.java Sat Nov 28 20:26:01 2009
@@ -60,7 +60,11 @@
     displayName = localize("CounterGroupName", name);
   }
   
-  protected CounterGroup(String name, String displayName) {
+  /** Create a CounterGroup.
+   * @param name the name of the group's enum.
+   * @param displayName a name to be displayed for the group.
+   */
+  public CounterGroup(String name, String displayName) {
     this.name = name;
     this.displayName = displayName;
   }
@@ -81,17 +85,18 @@
     return displayName;
   }
 
-  synchronized void addCounter(Counter counter) {
+  /** Add a counter to this group. */
+  public synchronized void addCounter(Counter counter) {
     counters.put(counter.getName(), counter);
   }
 
   /**
-   * Internal to find a counter in a group.
+   * Find a counter in a group.
    * @param counterName the name of the counter
    * @param displayName the display name of the counter
    * @return the counter that was found or added
    */
-  protected Counter findCounter(String counterName, String displayName) {
+  public Counter findCounter(String counterName, String displayName) {
     Counter result = counters.get(counterName);
     if (result == null) {
       result = new Counter(counterName, displayName);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Counters.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Counters.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Counters.java Sat Nov 28 20:26:01 2009
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.mapreduce;
 
 import java.io.DataInput;
@@ -25,7 +42,12 @@
   public Counters() {
   }
   
-  Counters(org.apache.hadoop.mapred.Counters counters) {
+  /**
+   * Utility method to  create a Counters object from the 
+   * org.apache.hadoop.mapred counters
+   * @param counters
+   */
+  public Counters(org.apache.hadoop.mapred.Counters counters) {
     for(org.apache.hadoop.mapred.Counters.Group group: counters) {
       String name = group.getName();
       CounterGroup newGroup = new CounterGroup(name, group.getDisplayName());
@@ -36,6 +58,11 @@
     }
   }
 
+  /** Add a group. */
+  public void addGroup(CounterGroup group) {
+    groups.put(group.getName(), group);
+  }
+
   public Counter findCounter(String groupName, String counterName) {
     CounterGroup grp = getGroup(groupName);
     return grp.findCounter(counterName);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/InputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/InputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/InputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/InputFormat.java Sat Nov 28 20:26:01 2009
@@ -50,8 +50,8 @@
  * bytes, of the input files. However, the {@link FileSystem} blocksize of  
  * the input files is treated as an upper bound for input splits. A lower bound 
  * on the split size can be set via 
- * <a href="{@docRoot}/../mapred-default.html#mapred.min.split.size">
- * mapred.min.split.size</a>.</p>
+ * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
+ * mapreduce.input.fileinputformat.split.minsize</a>.</p>
  * 
  * <p>Clearly, logical splits based on input-size is insufficient for many 
  * applications since record boundaries are to respected. In such cases, the

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Job.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Job.java Sat Nov 28 20:26:01 2009
@@ -18,43 +18,153 @@
 
 package org.apache.hadoop.mapreduce;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.Arrays;
+import java.net.URI;
 
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TaskCompletionEvent;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.util.ConfigUtil;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
 
 /**
- * The job submitter's view of the Job. It allows the user to configure the
+ * The job submitter's view of the Job.
+ * 
+ * <p>It allows the user to configure the
  * job, submit it, control its execution, and query the state. The set methods
  * only work until the job is submitted, afterwards they will throw an 
- * IllegalStateException.
+ * IllegalStateException. </p>
+ * 
+ * <p>
+ * Normally the user creates the application, describes various facets of the
+ * job via {@link Job} and then submits the job and monitor its progress.</p>
+ * 
+ * <p>Here is an example on how to submit a job:</p>
+ * <p><blockquote><pre>
+ *     // Create a new Job
+ *     Job job = new Job(new Configuration());
+ *     job.setJarByClass(MyJob.class);
+ *     
+ *     // Specify various job-specific parameters     
+ *     job.setJobName("myjob");
+ *     
+ *     job.setInputPath(new Path("in"));
+ *     job.setOutputPath(new Path("out"));
+ *     
+ *     job.setMapperClass(MyJob.MyMapper.class);
+ *     job.setReducerClass(MyJob.MyReducer.class);
+ *
+ *     // Submit the job, then poll for progress until the job is complete
+ *     job.waitForCompletion(true);
+ * </pre></blockquote></p>
+ * 
+ * 
  */
-public class Job extends JobContext {  
+public class Job extends JobContextImpl implements JobContext {  
+  private static final Log LOG = LogFactory.getLog(Job.class);
+
   public static enum JobState {DEFINE, RUNNING};
-  private JobState state = JobState.DEFINE;
-  private JobClient jobClient;
-  private RunningJob info;
+  private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
+  public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
+  /** Key in mapred-*.xml that sets completionPollInvervalMillis */
+  public static final String COMPLETION_POLL_INTERVAL_KEY = 
+    "mapreduce.client.completion.pollinterval";
+  
+  /** Default completionPollIntervalMillis is 5000 ms. */
+  static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
+  /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
+  public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
+    "mapreduce.client.progressmonitor.pollinterval";
+  /** Default progMonitorPollIntervalMillis is 1000 ms. */
+  static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
+
+  public static final String USED_GENERIC_PARSER = 
+    "mapreduce.client.genericoptionsparser.used";
+  public static final String SUBMIT_REPLICATION = 
+    "mapreduce.client.submit.file.replication";
+
+  public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
+
+  static {
+    ConfigUtil.loadResources();
+  }
 
+  private JobState state = JobState.DEFINE;
+  private JobStatus status;
+  private long statustime;
+  private Cluster cluster;
+  
+  @Deprecated
   public Job() throws IOException {
     this(new Configuration());
   }
 
+  @Deprecated
   public Job(Configuration conf) throws IOException {
-    super(conf, null);
-    jobClient = new JobClient((JobConf) getConfiguration());
+    this(new Cluster(conf), conf);
   }
 
+  @Deprecated
   public Job(Configuration conf, String jobName) throws IOException {
     this(conf);
     setJobName(jobName);
   }
 
+  Job(Cluster cluster) throws IOException {
+    this(cluster, new Configuration());
+  }
+
+  Job(Cluster cluster, Configuration conf) throws IOException {
+    super(conf, null);
+    this.cluster = cluster;
+  }
+
+  Job(Cluster cluster, JobStatus status,
+             Configuration conf) throws IOException {
+    this(cluster, conf);
+    state = JobState.RUNNING;
+    this.status = status;
+  }
+  
+  public static Job getInstance(Cluster cluster) throws IOException {
+     return new Job(cluster);
+  }
+  
+  public static Job getInstance(Cluster cluster, Configuration conf) 
+      throws IOException {
+    return new Job(cluster, conf);
+  }
+  
+  public static Job getInstance(Cluster cluster, JobStatus status, 
+      Configuration conf) throws IOException {
+    return new Job(cluster, status, conf);
+  }
+  
   private void ensureState(JobState state) throws IllegalStateException {
     if (state != this.state) {
       throw new IllegalStateException("Job in state "+ this.state + 
@@ -63,6 +173,351 @@
   }
 
   /**
+   * Some methods rely on having a recent job status object.  Refresh
+   * it, if necessary
+   */
+  synchronized void ensureFreshStatus() 
+      throws IOException, InterruptedException {
+    if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
+      updateStatus();
+    }
+  }
+    
+  /** Some methods need to update status immediately. So, refresh
+   * immediately
+   * @throws IOException
+   */
+  synchronized void updateStatus() throws IOException, InterruptedException {
+    this.status = cluster.getClient().getJobStatus(status.getJobID());
+    if (this.status == null) {
+      throw new IOException("Job status not available ");
+    }
+    this.statustime = System.currentTimeMillis();
+  }
+  
+  public JobStatus getStatus() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    updateStatus();
+    return status;
+  }
+  /**
+   * Get the job identifier.
+   * 
+   * @return the job identifier.
+   */
+  public JobID getID() {
+    ensureState(JobState.RUNNING);
+    return status.getJobID();
+  }
+
+  /**
+   * Returns the current state of the Job.
+   * 
+   * @return JobStatus#State
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public JobStatus.State getJobState() 
+      throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    updateStatus();
+    return status.getState();
+  }
+  
+  /**
+   * Get the URL where some job progress information will be displayed.
+   * 
+   * @return the URL where some job progress information will be displayed.
+   */
+  public String getTrackingURL(){
+    ensureState(JobState.RUNNING);
+    return status.getTrackingUrl().toString();
+  }
+
+  /**
+   * Get the path of the submitted job configuration.
+   * 
+   * @return the path of the submitted job configuration.
+   */
+  public String getJobFile() {
+    ensureState(JobState.RUNNING);
+    return status.getJobFile();
+  }
+
+  /**
+   * Get start time of the job.
+   * 
+   * @return the start time of the job
+   */
+  public long getStartTime() {
+    ensureState(JobState.RUNNING);
+    return status.getStartTime();
+  }
+
+  /**
+   * Get finish time of the job.
+   * 
+   * @return the finish time of the job
+   */
+  public long getFinishTime() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    updateStatus();
+    return status.getFinishTime();
+  }
+
+  /**
+   * Get scheduling info of the job.
+   * 
+   * @return the scheduling info of the job
+   */
+  public String getSchedulingInfo() {
+    ensureState(JobState.RUNNING);
+    return status.getSchedulingInfo();
+  }
+
+  /**
+   * Get scheduling info of the job.
+   * 
+   * @return the scheduling info of the job
+   */
+  public JobPriority getPriority() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    updateStatus();
+    return status.getPriority();
+  }
+
+  /**
+   * The user-specified job name.
+   */
+  public String getJobName() {
+    if (state == JobState.DEFINE) {
+      return super.getJobName();
+    }
+    ensureState(JobState.RUNNING);
+    return status.getJobName();
+  }
+
+  public String getHistoryUrl() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    updateStatus();
+    return status.getHistoryFile();
+  }
+
+  public boolean isRetired() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    updateStatus();
+    return status.isRetired();
+  }
+
+  /**
+   * Dump stats to screen.
+   */
+  @Override
+  public String toString() {
+    ensureState(JobState.RUNNING);
+    try {
+      updateStatus();
+    } catch (IOException e) {
+    } catch (InterruptedException ie) {
+    }
+    StringBuffer sb = new StringBuffer();
+    sb.append("Job: ").append(status.getJobID()).append("\n");
+    sb.append("Job File: ").append(status.getJobFile()).append("\n");
+    sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
+    sb.append("\n");
+    sb.append("map() completion: ");
+    sb.append(status.getMapProgress()).append("\n");
+    sb.append("reduce() completion: ");
+    sb.append(status.getReduceProgress()).append("\n");
+    sb.append("Job state: ");
+    sb.append(status.getState()).append("\n");
+    sb.append("history URL: ");
+    sb.append(status.getHistoryFile()).append("\n");
+    sb.append("retired: ").append(status.isRetired());
+    return sb.toString();
+  }
+      
+  /**
+   * Get the information of the current state of the tasks of a job.
+   * 
+   * @param type Type of the task
+   * @return the list of all of the map tips.
+   * @throws IOException
+   */
+  public TaskReport[] getTaskReports(TaskType type) 
+      throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    return cluster.getClient().getTaskReports(getID(), type);
+  }
+
+  /**
+   * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
+   * and 1.0.  When all map tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's map-tasks.
+   * @throws IOException
+   */
+  public float mapProgress() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    ensureFreshStatus();
+    return status.getMapProgress();
+  }
+
+  /**
+   * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 
+   * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's reduce-tasks.
+   * @throws IOException
+   */
+  public float reduceProgress() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    ensureFreshStatus();
+    return status.getReduceProgress();
+  }
+
+  /**
+   * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 
+   * and 1.0.  When all cleanup tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's cleanup-tasks.
+   * @throws IOException
+   */
+  public float cleanupProgress() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    ensureFreshStatus();
+    return status.getCleanupProgress();
+  }
+
+  /**
+   * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 
+   * and 1.0.  When all setup tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's setup-tasks.
+   * @throws IOException
+   */
+  public float setupProgress() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    ensureFreshStatus();
+    return status.getSetupProgress();
+  }
+
+  /**
+   * Check if the job is finished or not. 
+   * This is a non-blocking call.
+   * 
+   * @return <code>true</code> if the job is complete, else <code>false</code>.
+   * @throws IOException
+   */
+  public boolean isComplete() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    updateStatus();
+    return status.isJobComplete();
+  }
+
+  /**
+   * Check if the job completed successfully. 
+   * 
+   * @return <code>true</code> if the job succeeded, else <code>false</code>.
+   * @throws IOException
+   */
+  public boolean isSuccessful() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    updateStatus();
+    return status.getState() == JobStatus.State.SUCCEEDED;
+  }
+
+  /**
+   * Kill the running job.  Blocks until all job tasks have been
+   * killed as well.  If the job is no longer running, it simply returns.
+   * 
+   * @throws IOException
+   */
+  public void killJob() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    cluster.getClient().killJob(getID());
+  }
+
+  /**
+   * Set the priority of a running job.
+   * @param priority the new priority for the job.
+   * @throws IOException
+   */
+  public void setPriority(JobPriority priority) 
+      throws IOException, InterruptedException {
+    if (state == JobState.DEFINE) {
+      conf.setJobPriority(
+        org.apache.hadoop.mapred.JobPriority.valueOf(priority.name()));
+    } else {
+      ensureState(JobState.RUNNING);
+      cluster.getClient().setJobPriority(getID(), priority.toString());
+    }
+  }
+
+  /**
+   * Get events indicating completion (success/failure) of component tasks.
+   *  
+   * @param startFrom index to start fetching events from
+   * @param numEvents number of events to fetch
+   * @return an array of {@link TaskCompletionEvent}s
+   * @throws IOException
+   */
+  public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom,
+      int numEvents) throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    return cluster.getClient().getTaskCompletionEvents(getID(),
+      startFrom, numEvents); 
+  }
+  
+  /**
+   * Kill indicated task attempt.
+   * 
+   * @param taskId the id of the task to be terminated.
+   * @throws IOException
+   */
+  public boolean killTask(TaskAttemptID taskId) 
+      throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    return cluster.getClient().killTask(taskId, false);
+  }
+
+  /**
+   * Fail indicated task attempt.
+   * 
+   * @param taskId the id of the task to be terminated.
+   * @throws IOException
+   */
+  public boolean failTask(TaskAttemptID taskId) 
+      throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    return cluster.getClient().killTask(taskId, true);
+  }
+
+  /**
+   * Gets the counters for this job.
+   * 
+   * @return the counters for this job.
+   * @throws IOException
+   */
+  public Counters getCounters() 
+      throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    return cluster.getClient().getJobCounters(getID());
+  }
+
+  /**
+   * Gets the diagnostic messages for a given task attempt.
+   * @param taskid
+   * @return the list of diagnostic messages for the task
+   * @throws IOException
+   */
+  public String[] getTaskDiagnostics(TaskAttemptID taskid) 
+      throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    return cluster.getClient().getTaskDiagnostics(taskid);
+  }
+
+  /**
    * Set the number of reduce tasks for the job.
    * @param tasks the number of reduce tasks
    * @throws IllegalStateException if the job is submitted
@@ -91,7 +546,8 @@
   public void setInputFormatClass(Class<? extends InputFormat> cls
                                   ) throws IllegalStateException {
     ensureState(JobState.DEFINE);
-    conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, InputFormat.class);
+    conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 
+                  InputFormat.class);
   }
 
   /**
@@ -102,7 +558,8 @@
   public void setOutputFormatClass(Class<? extends OutputFormat> cls
                                    ) throws IllegalStateException {
     ensureState(JobState.DEFINE);
-    conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, OutputFormat.class);
+    conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 
+                  OutputFormat.class);
   }
 
   /**
@@ -121,15 +578,26 @@
    * @param cls the example class
    */
   public void setJarByClass(Class<?> cls) {
+    ensureState(JobState.DEFINE);
     conf.setJarByClass(cls);
   }
-  
+
   /**
-   * Get the pathname of the job's jar.
-   * @return the pathname
+   * Set the job jar 
    */
-  public String getJar() {
-    return conf.getJar();
+  public void setJar(String jar) {
+    ensureState(JobState.DEFINE);
+    conf.setJar(jar);
+  }
+
+  /**
+   * Set the reported username for this job.
+   * 
+   * @param user the username for this job.
+   */
+  public void setUser(String user) {
+    ensureState(JobState.DEFINE);
+    conf.setUser(user);
   }
 
   /**
@@ -162,7 +630,8 @@
   public void setPartitionerClass(Class<? extends Partitioner> cls
                                   ) throws IllegalStateException {
     ensureState(JobState.DEFINE);
-    conf.setClass(PARTITIONER_CLASS_ATTR, cls, Partitioner.class);
+    conf.setClass(PARTITIONER_CLASS_ATTR, cls, 
+                  Partitioner.class);
   }
 
   /**
@@ -298,133 +767,133 @@
    */
   public void setJobSetupCleanupNeeded(boolean needed) {
     ensureState(JobState.DEFINE);
-    conf.setBoolean("mapred.committer.job.setup.cleanup.needed", needed);
+    conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
   }
 
   /**
-   * Get the URL where some job progress information will be displayed.
-   * 
-   * @return the URL where some job progress information will be displayed.
+   * Set the given set of archives
+   * @param archives The list of archives that need to be localized
    */
-  public String getTrackingURL() {
-    ensureState(JobState.RUNNING);
-    return info.getTrackingURL();
+  public void setCacheArchives(URI[] archives) {
+    ensureState(JobState.DEFINE);
+    DistributedCache.setCacheArchives(archives, conf);
   }
 
   /**
-   * Get the job identifier.
-   * 
-   * @return the job identifier.
+   * Set the given set of files
+   * @param files The list of files that need to be localized
    */
-  public JobID getID() {
-    ensureState(JobState.RUNNING);
-    return info.getID();
+  public void setCacheFiles(URI[] files) {
+    ensureState(JobState.DEFINE);
+    DistributedCache.setCacheFiles(files, conf);
   }
-  
+
   /**
-   * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
-   * and 1.0.  When all map tasks have completed, the function returns 1.0.
-   * 
-   * @return the progress of the job's map-tasks.
-   * @throws IOException
+   * Add a archives to be localized
+   * @param uri The uri of the cache to be localized
    */
-  public float mapProgress() throws IOException {
-    ensureState(JobState.RUNNING);
-    return info.mapProgress();
+  public void addCacheArchive(URI uri) {
+    ensureState(JobState.DEFINE);
+    DistributedCache.addCacheArchive(uri, conf);
   }
-
+  
   /**
-   * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 
-   * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
-   * 
-   * @return the progress of the job's reduce-tasks.
-   * @throws IOException
+   * Add a file to be localized
+   * @param uri The uri of the cache to be localized
    */
-  public float reduceProgress() throws IOException {
-    ensureState(JobState.RUNNING);
-    return info.reduceProgress();
+  public void addCacheFile(URI uri) {
+    ensureState(JobState.DEFINE);
+    DistributedCache.addCacheFile(uri, conf);
   }
 
   /**
-   * Check if the job is finished or not. 
-   * This is a non-blocking call.
+   * Add an file path to the current set of classpath entries It adds the file
+   * to cache as well.
    * 
-   * @return <code>true</code> if the job is complete, else <code>false</code>.
-   * @throws IOException
+   * @param file Path of the file to be added
    */
-  public boolean isComplete() throws IOException {
-    ensureState(JobState.RUNNING);
-    return info.isComplete();
+  public void addFileToClassPath(Path file)
+    throws IOException {
+    ensureState(JobState.DEFINE);
+    DistributedCache.addFileToClassPath(file, conf);
   }
 
   /**
-   * Check if the job completed successfully. 
+   * Add an archive path to the current set of classpath entries. It adds the
+   * archive to cache as well.
    * 
-   * @return <code>true</code> if the job succeeded, else <code>false</code>.
-   * @throws IOException
+   * @param archive Path of the archive to be added
    */
-  public boolean isSuccessful() throws IOException {
-    ensureState(JobState.RUNNING);
-    return info.isSuccessful();
+  public void addArchiveToClassPath(Path archive)
+    throws IOException {
+    ensureState(JobState.DEFINE);
+    DistributedCache.addArchiveToClassPath(archive, conf);
   }
 
   /**
-   * Kill the running job.  Blocks until all job tasks have been
-   * killed as well.  If the job is no longer running, it simply returns.
+   * This method allows you to create symlinks in the current working directory
+   * of the task to all the cache files/archives
+   */
+  public void createSymlink() {
+    ensureState(JobState.DEFINE);
+    DistributedCache.createSymlink(conf);
+  }
+  
+  /** 
+   * Expert: Set the number of maximum attempts that will be made to run a
+   * map task.
    * 
-   * @throws IOException
+   * @param n the number of attempts per map task.
    */
-  public void killJob() throws IOException {
-    ensureState(JobState.RUNNING);
-    info.killJob();
+  public void setMaxMapAttempts(int n) {
+    ensureState(JobState.DEFINE);
+    conf.setMaxMapAttempts(n);
   }
-    
-  /**
-   * Get events indicating completion (success/failure) of component tasks.
-   *  
-   * @param startFrom index to start fetching events from
-   * @return an array of {@link TaskCompletionEvent}s
-   * @throws IOException
+
+  /** 
+   * Expert: Set the number of maximum attempts that will be made to run a
+   * reduce task.
+   * 
+   * @param n the number of attempts per reduce task.
    */
-  public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom
-                                                       ) throws IOException {
-    ensureState(JobState.RUNNING);
-    return info.getTaskCompletionEvents(startFrom);
+  public void setMaxReduceAttempts(int n) {
+    ensureState(JobState.DEFINE);
+    conf.setMaxReduceAttempts(n);
   }
-  
+
   /**
-   * Kill indicated task attempt.
-   * 
-   * @param taskId the id of the task to be terminated.
-   * @throws IOException
+   * Set whether the system should collect profiler information for some of 
+   * the tasks in this job? The information is stored in the user log 
+   * directory.
+   * @param newValue true means it should be gathered
    */
-  public void killTask(TaskAttemptID taskId) throws IOException {
-    ensureState(JobState.RUNNING);
-    info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId), 
-                  false);
+  public void setProfileEnabled(boolean newValue) {
+    ensureState(JobState.DEFINE);
+    conf.setProfileEnabled(newValue);
   }
 
   /**
-   * Fail indicated task attempt.
-   * 
-   * @param taskId the id of the task to be terminated.
-   * @throws IOException
+   * Set the profiler configuration arguments. If the string contains a '%s' it
+   * will be replaced with the name of the profiling output file when the task
+   * runs.
+   *
+   * This value is passed to the task child JVM on the command line.
+   *
+   * @param value the configuration string
    */
-  public void failTask(TaskAttemptID taskId) throws IOException {
-    ensureState(JobState.RUNNING);
-    info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId), 
-                  true);
+  public void setProfileParams(String value) {
+    ensureState(JobState.DEFINE);
+    conf.setProfileParams(value);
   }
 
   /**
-   * Gets the counters for this job.
-   * 
-   * @return the counters for this job.
-   * @throws IOException
+   * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
+   * must also be called.
+   * @param newValue a set of integer ranges of the map ids
    */
-  public Counters getCounters() throws IOException {
-    ensureState(JobState.RUNNING);
-    return new Counters(info.getCounters());
+  public void setProfileTaskRange(boolean isMap, String newValue) {
+    ensureState(JobState.DEFINE);
+    conf.setProfileTaskRange(isMap, newValue);
   }
 
   private void ensureNotSet(String attr, String msg) throws IOException {
@@ -455,12 +924,12 @@
       }      
     } else {
       String mode = "map compatability";
-      ensureNotSet(JobContext.INPUT_FORMAT_CLASS_ATTR, mode);
-      ensureNotSet(JobContext.MAP_CLASS_ATTR, mode);
+      ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
+      ensureNotSet(MAP_CLASS_ATTR, mode);
       if (numReduces != 0) {
-        ensureNotSet(JobContext.PARTITIONER_CLASS_ATTR, mode);
+        ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
        } else {
-        ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode);
+        ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
       }
     }
     if (numReduces != 0) {
@@ -472,8 +941,8 @@
         ensureNotSet(oldReduceClass, mode);   
       } else {
         String mode = "reduce compatability";
-        ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode);
-        ensureNotSet(JobContext.REDUCE_CLASS_ATTR, mode);   
+        ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
+        ensureNotSet(REDUCE_CLASS_ATTR, mode);   
       }
     }   
   }
@@ -486,7 +955,8 @@
                               ClassNotFoundException {
     ensureState(JobState.DEFINE);
     setUseNewAPI();
-    info = jobClient.submitJobInternal(conf);
+    status = new JobSubmitter(cluster.getFileSystem(),
+      cluster.getClient()).submitJobInternal(this);
     state = JobState.RUNNING;
    }
   
@@ -504,11 +974,343 @@
       submit();
     }
     if (verbose) {
-      jobClient.monitorAndPrintJob(conf, info);
+      monitorAndPrintJob();
     } else {
-      info.waitForCompletion();
+      // get the completion poll interval from the client.
+      int completionPollIntervalMillis = 
+        Job.getCompletionPollInterval(cluster.getConf());
+      while (!isComplete()) {
+        try {
+          Thread.sleep(completionPollIntervalMillis);
+        } catch (InterruptedException ie) {
+        }
+      }
+    }
+    return isSuccessful();
+  }
+  
+  /**
+   * Monitor a job and print status in real-time as progress is made and tasks 
+   * fail.
+   * @return true if the job succeeded
+   * @throws IOException if communication to the JobTracker fails
+   */
+  public boolean monitorAndPrintJob() 
+      throws IOException, InterruptedException {
+    String lastReport = null;
+    Job.TaskStatusFilter filter;
+    Configuration clientConf = cluster.getConf();
+    filter = Job.getTaskOutputFilter(clientConf);
+    JobID jobId = getID();
+    LOG.info("Running job: " + jobId);
+    int eventCounter = 0;
+    boolean profiling = getProfileEnabled();
+    IntegerRanges mapRanges = getProfileTaskRange(true);
+    IntegerRanges reduceRanges = getProfileTaskRange(false);
+    int progMonitorPollIntervalMillis = 
+      Job.getProgressPollInterval(clientConf);
+    while (!isComplete()) {
+      Thread.sleep(progMonitorPollIntervalMillis);
+      String report = 
+        (" map " + StringUtils.formatPercent(mapProgress(), 0)+
+            " reduce " + 
+            StringUtils.formatPercent(reduceProgress(), 0));
+      if (!report.equals(lastReport)) {
+        LOG.info(report);
+        lastReport = report;
+      }
+
+      TaskCompletionEvent[] events = 
+        getTaskCompletionEvents(eventCounter, 10); 
+      eventCounter += events.length;
+      printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
+    }
+    LOG.info("Job complete: " + jobId);
+    Counters counters = getCounters();
+    if (counters != null) {
+      LOG.info(counters.toString());
     }
     return isSuccessful();
   }
+
+  private void printTaskEvents(TaskCompletionEvent[] events,
+      Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
+      IntegerRanges reduceRanges) throws IOException, InterruptedException {
+    for (TaskCompletionEvent event : events) {
+      TaskCompletionEvent.Status status = event.getStatus();
+      if (profiling && 
+         (status == TaskCompletionEvent.Status.SUCCEEDED ||
+            status == TaskCompletionEvent.Status.FAILED) &&
+            (event.isMapTask() ? mapRanges : reduceRanges).
+              isIncluded(event.idWithinJob())) {
+        downloadProfile(event);
+      }
+      switch (filter) {
+      case NONE:
+        break;
+      case SUCCEEDED:
+        if (event.getStatus() == 
+          TaskCompletionEvent.Status.SUCCEEDED) {
+          LOG.info(event.toString());
+          displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
+        }
+        break; 
+      case FAILED:
+        if (event.getStatus() == 
+          TaskCompletionEvent.Status.FAILED) {
+          LOG.info(event.toString());
+          // Displaying the task diagnostic information
+          TaskAttemptID taskId = event.getTaskAttemptId();
+          String[] taskDiagnostics = getTaskDiagnostics(taskId); 
+          if (taskDiagnostics != null) {
+            for (String diagnostics : taskDiagnostics) {
+              System.err.println(diagnostics);
+            }
+          }
+          // Displaying the task logs
+          displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
+        }
+        break; 
+      case KILLED:
+        if (event.getStatus() == TaskCompletionEvent.Status.KILLED){
+          LOG.info(event.toString());
+        }
+        break; 
+      case ALL:
+        LOG.info(event.toString());
+        displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
+        break;
+      }
+    }
+  }
+  
+  private void downloadProfile(TaskCompletionEvent e) throws IOException  {
+    URLConnection connection = new URL(
+      getTaskLogURL(e.getTaskAttemptId(), e.getTaskTrackerHttp()) + 
+      "&filter=profile").openConnection();
+    InputStream in = connection.getInputStream();
+    OutputStream out = new FileOutputStream(e.getTaskAttemptId() + ".profile");
+    IOUtils.copyBytes(in, out, 64 * 1024, true);
+  }
   
+  private void displayTaskLogs(TaskAttemptID taskId, String baseUrl)
+      throws IOException {
+    // The tasktracker for a 'failed/killed' job might not be around...
+    if (baseUrl != null) {
+      // Construct the url for the tasklogs
+      String taskLogUrl = getTaskLogURL(taskId, baseUrl);
+      
+      // Copy tasks's stdout of the JobClient
+      getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stdout"), System.out);
+        
+      // Copy task's stderr to stderr of the JobClient 
+      getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stderr"), System.err);
+    }
+  }
+    
+  private void getTaskLogs(TaskAttemptID taskId, URL taskLogUrl, 
+                           OutputStream out) {
+    try {
+      URLConnection connection = taskLogUrl.openConnection();
+      BufferedReader input = 
+        new BufferedReader(new InputStreamReader(connection.getInputStream()));
+      BufferedWriter output = 
+        new BufferedWriter(new OutputStreamWriter(out));
+      try {
+        String logData = null;
+        while ((logData = input.readLine()) != null) {
+          if (logData.length() > 0) {
+            output.write(taskId + ": " + logData + "\n");
+            output.flush();
+          }
+        }
+      } finally {
+        input.close();
+      }
+    } catch(IOException ioe) {
+      LOG.warn("Error reading task output" + ioe.getMessage()); 
+    }
+  }
+  
+  private String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
+    return (baseUrl + "/tasklog?plaintext=true&taskid=" + taskId); 
+  }
+
+  /**
+   * Set the UGI, user name and the group name for the job.
+   * 
+   * This method is called by job submission code while submitting the job.
+   * Internal to MapReduce project. 
+   * @throws IOException
+   */
+  public void setUGIAndUserGroupNames()
+      throws IOException {
+    UnixUserGroupInformation ugi = Job.getUGI(conf);
+    setUser(ugi.getUserName());
+    if (ugi.getGroupNames().length > 0) {
+      conf.set("group.name", ugi.getGroupNames()[0]);
+    }
+  }
+
+  /** The interval at which monitorAndPrintJob() prints status */
+  public static int getProgressPollInterval(Configuration conf) {
+    // Read progress monitor poll interval from config. Default is 1 second.
+    int progMonitorPollIntervalMillis = conf.getInt(
+      PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
+    if (progMonitorPollIntervalMillis < 1) {
+      LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 
+        " has been set to an invalid value; "
+        + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
+      progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
+    }
+    return progMonitorPollIntervalMillis;
+  }
+
+  /** The interval at which waitForCompletion() should check. */
+  public static int getCompletionPollInterval(Configuration conf) {
+    int completionPollIntervalMillis = conf.getInt(
+      COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
+    if (completionPollIntervalMillis < 1) { 
+      LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 
+       " has been set to an invalid value; "
+       + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
+      completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
+    }
+    return completionPollIntervalMillis;
+  }
+
+  /**
+   * Get the task output filter.
+   * 
+   * @param conf the configuration.
+   * @return the filter level.
+   */
+  public static TaskStatusFilter getTaskOutputFilter(Configuration conf) {
+    return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED"));
+  }
+
+  /**
+   * Modify the Configuration to set the task output filter.
+   * 
+   * @param conf the Configuration to modify.
+   * @param newValue the value to set.
+   */
+  public static void setTaskOutputFilter(Configuration conf, 
+      TaskStatusFilter newValue) {
+    conf.set(Job.OUTPUT_FILTER, newValue.toString());
+  }
+
+  public static UnixUserGroupInformation getUGI(Configuration job) 
+      throws IOException {
+    UnixUserGroupInformation ugi = null;
+    try {
+      ugi = UnixUserGroupInformation.login(job, true);
+    } catch (LoginException e) {
+      throw (IOException)(new IOException(
+        "Failed to get the current user's information.").initCause(e));
+    }
+    return ugi;
+  }
+
+  /**
+   * Read a splits file into a list of raw splits.
+   * 
+   * @param in the stream to read from
+   * @return the complete list of splits
+   * @throws IOException
+   */
+  public static RawSplit[] readSplitFile(DataInput in) throws IOException {
+    byte[] header = new byte[JobSubmitter.SPLIT_FILE_HEADER.length];
+    in.readFully(header);
+    if (!Arrays.equals(JobSubmitter.SPLIT_FILE_HEADER, header)) {
+      throw new IOException("Invalid header on split file");
+    }
+    int vers = WritableUtils.readVInt(in);
+    if (vers != JobSubmitter.CURRENT_SPLIT_FILE_VERSION) {
+      throw new IOException("Unsupported split version " + vers);
+    }
+    int len = WritableUtils.readVInt(in);
+    RawSplit[] result = new RawSplit[len];
+    for (int i=0; i < len; ++i) {
+      result[i] = new RawSplit();
+      result[i].readFields(in);
+    }
+    return result;
+  }
+
+  public static class RawSplit implements Writable {
+    private String splitClass;
+    private BytesWritable bytes = new BytesWritable();
+    private String[] locations;
+    long dataLength;
+
+    public RawSplit() {
+    }
+    
+    protected RawSplit(String splitClass, BytesWritable bytes,
+        String[] locations, long dataLength) {
+      this.splitClass = splitClass;
+      this.bytes = bytes;
+      this.locations = locations;
+      this.dataLength = dataLength;
+    }
+
+    public void setBytes(byte[] data, int offset, int length) {
+      bytes.set(data, offset, length);
+    }
+
+    public void setClassName(String className) {
+      splitClass = className;
+    }
+      
+    public String getClassName() {
+      return splitClass;
+    }
+      
+    public BytesWritable getBytes() {
+      return bytes;
+    }
+
+    public void clearBytes() {
+      bytes = null;
+    }
+      
+    public void setLocations(String[] locations) {
+      this.locations = locations;
+    }
+      
+    public String[] getLocations() {
+      return locations;
+    }
+
+    public long getDataLength() {
+      return dataLength;
+    }
+
+    public void setDataLength(long l) {
+      dataLength = l;
+    }
+      
+    public void readFields(DataInput in) throws IOException {
+      splitClass = Text.readString(in);
+      dataLength = in.readLong();
+      bytes.readFields(in);
+      int len = WritableUtils.readVInt(in);
+      locations = new String[len];
+      for (int i=0; i < len; ++i) {
+        locations[i] = Text.readString(in);
+      }
+    }
+      
+    public void write(DataOutput out) throws IOException {
+      Text.writeString(out, splitClass);
+      out.writeLong(dataLength);
+      bytes.write(out);
+      WritableUtils.writeVInt(out, locations.length);
+      for (int i = 0; i < locations.length; i++) {
+        Text.writeString(out, locations[i]);
+      }        
+    }
+  }
+
 }



Mime
View raw message