hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r899844 [2/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/ src/examples/org/apache/hadoop/examples/ src/examples/org/apache/hadoop/examples/pi/ src/java/org/apache/hadoop/mapred/ src/...
Date Fri, 15 Jan 2010 23:48:55 GMT
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
Fri Jan 15 23:48:54 2010
@@ -22,7 +22,6 @@
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
@@ -363,6 +362,8 @@
 
       RawKeyValueIterator rIter = 
         Merger.merge(jobConf, rfs,
+                     (Class<K>)jobConf.getMapOutputKeyClass(),
+                     (Class<V>)jobConf.getMapOutputValueClass(),
                      inMemorySegments, inMemorySegments.size(),
                      new Path(reduceId.toString()),
                      (RawComparator<K>)jobConf.getOutputKeyComparator(),
@@ -417,7 +418,9 @@
                                                Task.MERGED_OUTPUT_PREFIX);
 
       Writer<K,V> writer = 
-        new Writer<K,V>(jobConf, rfs, outputPath, true,
+        new Writer<K,V>(jobConf, rfs, outputPath,
+                        (Class<K>) jobConf.getMapOutputKeyClass(),
+                        (Class<V>) jobConf.getMapOutputValueClass(),
                         codec, null);
 
       RawKeyValueIterator rIter = null;
@@ -426,6 +429,8 @@
                  " segments...");
         
         rIter = Merger.merge(jobConf, rfs,
+                             (Class<K>)jobConf.getMapOutputKeyClass(),
+                             (Class<V>)jobConf.getMapOutputValueClass(),
                              inMemorySegments, inMemorySegments.size(),
                              new Path(reduceId.toString()),
                              (RawComparator<K>)jobConf.getOutputKeyComparator(),
@@ -494,12 +499,16 @@
         localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), 
             approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
       Writer<K,V> writer = 
-        new Writer<K,V>(jobConf, rfs, outputPath,  true,
+        new Writer<K,V>(jobConf, rfs, outputPath, 
+                        (Class<K>) jobConf.getMapOutputKeyClass(), 
+                        (Class<V>) jobConf.getMapOutputValueClass(),
                         codec, null);
       RawKeyValueIterator iter  = null;
       Path tmpDir = new Path(reduceId.toString());
       try {
         iter = Merger.merge(jobConf, rfs,
+                            (Class<K>) jobConf.getMapOutputKeyClass(),
+                            (Class<V>) jobConf.getMapOutputValueClass(),
                             codec, inputs.toArray(new Path[inputs.size()]), 
                             true, ioSortFactor, tmpDir, 
                             (RawComparator<K>) jobConf.getOutputKeyComparator(), 
@@ -529,15 +538,13 @@
       Counters.Counter inCounter) throws IOException {
     JobConf job = jobConf;
     Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
-    Map<String, String> keyMetadata =
-        job.getMapOutputKeySerializationMetadata();
-    Map<String, String> valueMetadata =
-        job.getMapOutputValueSerializationMetadata();
+    Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
+    Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
     RawComparator<K> comparator = 
       (RawComparator<K>)job.getOutputKeyComparator();
     try {
       CombineValuesIterator values = new CombineValuesIterator(
-          kvIter, comparator, keyMetadata, valueMetadata, job, Reporter.NULL,
+          kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
           inCounter);
       while (values.more()) {
         combiner.reduce(values.getKey(), values, combineCollector,
@@ -631,6 +638,8 @@
     
 
     // merge config params
+    Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
+    Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
     boolean keepInputs = job.getKeepFailedTaskFiles();
     final Path tmpDir = new Path(reduceId.toString());
     final RawComparator<K> comparator =
@@ -663,11 +672,11 @@
                                              inMemToDiskBytes).suffix(
                                                  Task.MERGED_OUTPUT_PREFIX);
         final RawKeyValueIterator rIter = Merger.merge(job, fs,
-            memDiskSegments, numMemDiskSegments,
+            keyClass, valueClass, memDiskSegments, numMemDiskSegments,
             tmpDir, comparator, reporter, spilledRecordsCounter, null, 
             mergePhase);
         final Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
-            true, codec, null);
+            keyClass, valueClass, codec, null);
         try {
           Merger.writeFile(rIter, writer, reporter, job);
           // add to list of final disk outputs.
@@ -737,7 +746,7 @@
       // merges. See comment where mergePhaseFinished is being set
       Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; 
       RawKeyValueIterator diskMerge = Merger.merge(
-          job, fs, diskSegments,
+          job, fs, keyClass, valueClass, diskSegments,
           ioSortFactor, numInMemSegments, tmpDir, comparator,
           reporter, false, spilledRecordsCounter, null, thisPhase);
       diskSegments.clear();
@@ -747,7 +756,7 @@
       finalSegments.add(new Segment<K,V>(
             new RawKVIteratorReader(diskMerge, onDiskBytes), true));
     }
-    return Merger.merge(job, fs,
+    return Merger.merge(job, fs, keyClass, valueClass,
                  finalSegments, finalSegments.size(), tmpDir,
                  comparator, reporter, spilledRecordsCounter, null,
                  null);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
Fri Jan 15 23:48:54 2010
@@ -25,7 +25,6 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.jobdata.WritableJobData;
 
 import java.io.*;
 
@@ -67,8 +66,8 @@
 
     conf.setInputFormat(TextInputFormat.class);
 
-    WritableJobData.setMapOutputKeyClass(conf, LongWritable.class);
-    WritableJobData.setMapOutputValueClass(conf, Text.class);
+    conf.setMapOutputKeyClass(LongWritable.class);
+    conf.setMapOutputValueClass(Text.class);
 
     conf.setOutputFormat(TextOutputFormat.class);
     conf.setOutputKeyClass(LongWritable.class);
@@ -102,4 +101,4 @@
 
   }
 
-}
+}
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java
Fri Jan 15 23:48:54 2010
@@ -104,6 +104,7 @@
 
     conf.setOutputKeyClass(String.class);
     conf.setOutputValueClass(Long.class);
+    conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
 
     conf.setMapperClass(WordCountMapper.class);
     conf.setReducerClass(SumReducer.class);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java Fri
Jan 15 23:48:54 2010
@@ -29,7 +29,6 @@
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.mapreduce.lib.jobdata.WritableJobData;
 import org.apache.hadoop.util.Progressable;
 
 /**
@@ -80,11 +79,8 @@
     FileSystem localFs = FileSystem.getLocal(conf);
     FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
     Path path = new Path(tmpDir, "data.in");
-    JobConf job = new JobConf(conf);
-    WritableJobData.setMapOutputKeyClass(job, Text.class);
-    WritableJobData.setMapOutputValueClass(job, Text.class);
     IFile.Writer<Text, Text> writer = 
-      new IFile.Writer<Text, Text>(job, rfs, path, true,
+      new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class,
                                    codec, null);
     for(Pair p: vals) {
       writer.append(new Text(p.key), new Text(p.value));
@@ -93,16 +89,14 @@
     
     @SuppressWarnings("unchecked")
     RawKeyValueIterator rawItr = 
-      Merger.merge(job, rfs, codec, new Path[]{path}, 
+      Merger.merge(conf, rfs, Text.class, Text.class, codec, new Path[]{path}, 
                    false, conf.getInt(JobContext.IO_SORT_FACTOR, 100), tmpDir, 
                    new Text.Comparator(), new NullProgress(), null, null, null);
     @SuppressWarnings("unchecked") // WritableComparators are not generic
     ReduceTask.ValuesIterator valItr = 
       new ReduceTask.ValuesIterator<Text,Text>(rawItr,
-          WritableComparator.get(Text.class),
-          job.getMapOutputKeySerializationMetadata(),
-          job.getMapOutputValueSerializationMetadata(),
-          job, new NullProgress());
+          WritableComparator.get(Text.class), Text.class, Text.class,
+          conf, new NullProgress());
     int i = 0;
     while (valItr.more()) {
       Object key = valItr.getKey();

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java Fri Jan 15
23:48:54 2010
@@ -63,7 +63,6 @@
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.mapreduce.lib.jobdata.WritableJobData;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
@@ -442,8 +441,8 @@
     conf.setOutputFormat(NullOutputFormat.class);
     conf.setMapperClass(HArchivesMapper.class);
     conf.setReducerClass(HArchivesReducer.class);
-    WritableJobData.setMapOutputKeyClass(conf, IntWritable.class);
-    WritableJobData.setMapOutputValueClass(conf, Text.class);
+    conf.setMapOutputKeyClass(IntWritable.class);
+    conf.setMapOutputValueClass(Text.class);
     conf.set(JobContext.HISTORY_LOCATION, "none");
     FileInputFormat.addInputPath(conf, jobDirectory);
     //make sure no speculative execution is done



Mime
View raw message