Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 77241 invoked from network); 15 Jan 2010 23:49:24 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 15 Jan 2010 23:49:24 -0000 Received: (qmail 82510 invoked by uid 500); 15 Jan 2010 23:49:24 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 82456 invoked by uid 500); 15 Jan 2010 23:49:23 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 82446 invoked by uid 99); 15 Jan 2010 23:49:23 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Jan 2010 23:49:23 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Jan 2010 23:49:15 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 92E9323889FF; Fri, 15 Jan 2010 23:48:55 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r899844 [2/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/ src/examples/org/apache/hadoop/examples/ src/examples/org/apache/hadoop/examples/pi/ src/java/org/apache/hadoop/mapred/ src/... Date: Fri, 15 Jan 2010 23:48:55 -0000 To: mapreduce-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100115234855.92E9323889FF@eris.apache.org> Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=899844&r1=899843&r2=899844&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java Fri Jan 15 23:48:54 2010 @@ -22,7 +22,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.TreeSet; @@ -363,6 +362,8 @@ RawKeyValueIterator rIter = Merger.merge(jobConf, rfs, + (Class)jobConf.getMapOutputKeyClass(), + (Class)jobConf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), new Path(reduceId.toString()), (RawComparator)jobConf.getOutputKeyComparator(), @@ -417,7 +418,9 @@ Task.MERGED_OUTPUT_PREFIX); Writer writer = - new Writer(jobConf, rfs, outputPath, true, + new Writer(jobConf, rfs, outputPath, + (Class) jobConf.getMapOutputKeyClass(), + (Class) jobConf.getMapOutputValueClass(), codec, null); RawKeyValueIterator rIter = null; @@ -426,6 +429,8 @@ " segments..."); rIter = Merger.merge(jobConf, rfs, + (Class)jobConf.getMapOutputKeyClass(), + (Class)jobConf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), new Path(reduceId.toString()), (RawComparator)jobConf.getOutputKeyComparator(), @@ -494,12 +499,16 @@ localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX); Writer writer = - new Writer(jobConf, rfs, outputPath, true, + new Writer(jobConf, rfs, outputPath, + (Class) jobConf.getMapOutputKeyClass(), + (Class) jobConf.getMapOutputValueClass(), codec, null); RawKeyValueIterator iter = null; Path tmpDir = new Path(reduceId.toString()); try { iter = Merger.merge(jobConf, rfs, + (Class) jobConf.getMapOutputKeyClass(), + (Class) jobConf.getMapOutputValueClass(), codec, inputs.toArray(new Path[inputs.size()]), true, ioSortFactor, tmpDir, (RawComparator) jobConf.getOutputKeyComparator(), @@ -529,15 +538,13 @@ Counters.Counter inCounter) throws IOException { JobConf job = jobConf; Reducer combiner = ReflectionUtils.newInstance(combinerClass, job); - Map keyMetadata = - job.getMapOutputKeySerializationMetadata(); - Map valueMetadata = - job.getMapOutputValueSerializationMetadata(); + Class keyClass = (Class) job.getMapOutputKeyClass(); + Class valClass = (Class) job.getMapOutputValueClass(); RawComparator comparator = (RawComparator)job.getOutputKeyComparator(); try { CombineValuesIterator values = new CombineValuesIterator( - kvIter, comparator, keyMetadata, valueMetadata, job, Reporter.NULL, + kvIter, comparator, keyClass, valClass, job, Reporter.NULL, inCounter); while (values.more()) { combiner.reduce(values.getKey(), values, combineCollector, @@ -631,6 +638,8 @@ // merge config params + Class keyClass = (Class)job.getMapOutputKeyClass(); + Class valueClass = (Class)job.getMapOutputValueClass(); boolean keepInputs = job.getKeepFailedTaskFiles(); final Path tmpDir = new Path(reduceId.toString()); final RawComparator comparator = @@ -663,11 +672,11 @@ inMemToDiskBytes).suffix( Task.MERGED_OUTPUT_PREFIX); final RawKeyValueIterator rIter = Merger.merge(job, fs, - memDiskSegments, numMemDiskSegments, + keyClass, valueClass, memDiskSegments, numMemDiskSegments, tmpDir, comparator, reporter, spilledRecordsCounter, null, mergePhase); final Writer writer = new Writer(job, fs, outputPath, - true, codec, null); + keyClass, valueClass, codec, null); try { Merger.writeFile(rIter, writer, reporter, job); // add to list of final disk outputs. @@ -737,7 +746,7 @@ // merges. See comment where mergePhaseFinished is being set Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; RawKeyValueIterator diskMerge = Merger.merge( - job, fs, diskSegments, + job, fs, keyClass, valueClass, diskSegments, ioSortFactor, numInMemSegments, tmpDir, comparator, reporter, false, spilledRecordsCounter, null, thisPhase); diskSegments.clear(); @@ -747,7 +756,7 @@ finalSegments.add(new Segment( new RawKVIteratorReader(diskMerge, onDiskBytes), true)); } - return Merger.merge(job, fs, + return Merger.merge(job, fs, keyClass, valueClass, finalSegments, finalSegments.size(), tmpDir, comparator, reporter, spilledRecordsCounter, null, null); Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java?rev=899844&r1=899843&r2=899844&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java Fri Jan 15 23:48:54 2010 @@ -25,7 +25,6 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.lib.jobdata.WritableJobData; import java.io.*; @@ -67,8 +66,8 @@ conf.setInputFormat(TextInputFormat.class); - WritableJobData.setMapOutputKeyClass(conf, LongWritable.class); - WritableJobData.setMapOutputValueClass(conf, Text.class); + conf.setMapOutputKeyClass(LongWritable.class); + conf.setMapOutputValueClass(Text.class); conf.setOutputFormat(TextOutputFormat.class); conf.setOutputKeyClass(LongWritable.class); @@ -102,4 +101,4 @@ } -} +} \ No newline at end of file Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java?rev=899844&r1=899843&r2=899844&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java Fri Jan 15 23:48:54 2010 @@ -104,6 +104,7 @@ conf.setOutputKeyClass(String.class); conf.setOutputValueClass(Long.class); + conf.setOutputKeyComparatorClass(JavaSerializationComparator.class); conf.setMapperClass(WordCountMapper.class); conf.setReducerClass(SumReducer.class); Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java?rev=899844&r1=899843&r2=899844&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java Fri Jan 15 23:48:54 2010 @@ -29,7 +29,6 @@ import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; -import org.apache.hadoop.mapreduce.lib.jobdata.WritableJobData; import org.apache.hadoop.util.Progressable; /** @@ -80,11 +79,8 @@ FileSystem localFs = FileSystem.getLocal(conf); FileSystem rfs = ((LocalFileSystem)localFs).getRaw(); Path path = new Path(tmpDir, "data.in"); - JobConf job = new JobConf(conf); - WritableJobData.setMapOutputKeyClass(job, Text.class); - WritableJobData.setMapOutputValueClass(job, Text.class); IFile.Writer writer = - new IFile.Writer(job, rfs, path, true, + new IFile.Writer(conf, rfs, path, Text.class, Text.class, codec, null); for(Pair p: vals) { writer.append(new Text(p.key), new Text(p.value)); @@ -93,16 +89,14 @@ @SuppressWarnings("unchecked") RawKeyValueIterator rawItr = - Merger.merge(job, rfs, codec, new Path[]{path}, + Merger.merge(conf, rfs, Text.class, Text.class, codec, new Path[]{path}, false, conf.getInt(JobContext.IO_SORT_FACTOR, 100), tmpDir, new Text.Comparator(), new NullProgress(), null, null, null); @SuppressWarnings("unchecked") // WritableComparators are not generic ReduceTask.ValuesIterator valItr = new ReduceTask.ValuesIterator(rawItr, - WritableComparator.get(Text.class), - job.getMapOutputKeySerializationMetadata(), - job.getMapOutputValueSerializationMetadata(), - job, new NullProgress()); + WritableComparator.get(Text.class), Text.class, Text.class, + conf, new NullProgress()); int i = 0; while (valItr.more()) { Object key = valItr.getKey(); Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java?rev=899844&r1=899843&r2=899844&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java Fri Jan 15 23:48:54 2010 @@ -63,7 +63,6 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobSubmissionFiles; -import org.apache.hadoop.mapreduce.lib.jobdata.WritableJobData; import org.apache.hadoop.security.UnixUserGroupInformation; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Tool; @@ -442,8 +441,8 @@ conf.setOutputFormat(NullOutputFormat.class); conf.setMapperClass(HArchivesMapper.class); conf.setReducerClass(HArchivesReducer.class); - WritableJobData.setMapOutputKeyClass(conf, IntWritable.class); - WritableJobData.setMapOutputValueClass(conf, Text.class); + conf.setMapOutputKeyClass(IntWritable.class); + conf.setMapOutputValueClass(Text.class); conf.set(JobContext.HISTORY_LOCATION, "none"); FileInputFormat.addInputPath(conf, jobDirectory); //make sure no speculative execution is done