hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r679845 [3/3] - in /hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce: ./ example/ lib/ lib/input/ lib/map/ lib/output/ lib/partition/ lib/reduce/
Date Fri, 25 Jul 2008 15:57:42 GMT
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java
(added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java
Fri Jul 25 08:57:41 2008
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.*;
+
+/** An {@link OutputFormat} that writes plain text files. */
+public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
+
+  protected static class LineRecordWriter<K, V>
+    implements RecordWriter<K, V> {
+    private static final String utf8 = "UTF-8";
+    private static final byte[] newline;
+    static {
+      try {
+        newline = "\n".getBytes(utf8);
+      } catch (UnsupportedEncodingException uee) {
+        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
+      }
+    }
+
+    protected DataOutputStream out;
+    private final byte[] keyValueSeparator;
+
+    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
+      this.out = out;
+      try {
+        this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
+      } catch (UnsupportedEncodingException uee) {
+        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
+      }
+    }
+
+    public LineRecordWriter(DataOutputStream out) {
+      this(out, "\t");
+    }
+
+    /**
+     * Write the object to the byte stream, handling Text as a special
+     * case.
+     * @param o the object to print
+     * @throws IOException if the write throws, we pass it on
+     */
+    private void writeObject(Object o) throws IOException {
+      if (o instanceof Text) {
+        Text to = (Text) o;
+        out.write(to.getBytes(), 0, to.getLength());
+      } else {
+        out.write(o.toString().getBytes(utf8));
+      }
+    }
+
+    public synchronized void write(K key, V value)
+      throws IOException {
+
+      boolean nullKey = key == null || key instanceof NullWritable;
+      boolean nullValue = value == null || value instanceof NullWritable;
+      if (nullKey && nullValue) {
+        return;
+      }
+      if (!nullKey) {
+        writeObject(key);
+      }
+      if (!(nullKey || nullValue)) {
+        out.write(keyValueSeparator);
+      }
+      if (!nullValue) {
+        writeObject(value);
+      }
+      out.write(newline);
+    }
+
+    public synchronized 
+    void close(TaskAttemptContext context) throws IOException {
+      out.close();
+    }
+  }
+
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+    throws IOException {
+    Configuration job = context.getConfiguration();
+    boolean isCompressed = getCompressOutput(job);
+    String keyValueSeparator = job.get("mapred.textoutputformat.separator", 
+                                       "\t");
+    Path file = FileOutputFormat.getTaskOutputPath(context);
+    if (!isCompressed) {
+      FileSystem fs = file.getFileSystem(job);
+      FSDataOutputStream fileOut = fs.create(file, context);
+      return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
+    } else {
+      Class<? extends CompressionCodec> codecClass =
+        getOutputCompressorClass(job, GzipCodec.class);
+      // create the named codec
+      CompressionCodec codec = (CompressionCodec)
+        ReflectionUtils.newInstance(codecClass, job);
+      // build the filename including the extension
+      file = new Path(file + codec.getDefaultExtension());
+      FileSystem fs = file.getFileSystem(job);
+      FSDataOutputStream fileOut = fs.create(file, context);
+      return new LineRecordWriter<K, V>(new DataOutputStream
+                                        (codec.createOutputStream(fileOut)),
+                                        keyValueSeparator);
+    }
+  }
+}
+

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/HashPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/HashPartitioner.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/HashPartitioner.java
(added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/HashPartitioner.java
Fri Jul 25 08:57:41 2008
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/** Partition keys by their {@link Object#hashCode()}. */
+public class HashPartitioner<K, V> implements Partitioner<K, V> {
+
+  /** Use {@link Object#hashCode()} to partition. */
+  public int getPartition(K key, V value,
+                          int numReduceTasks) {
+    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
+  }
+
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/IntSumReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/IntSumReducer.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/IntSumReducer.java
(added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/IntSumReducer.java
Fri Jul 25 08:57:41 2008
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.reduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+public class IntSumReducer<Key> extends Reducer<Key,IntWritable,
+                                                Key,IntWritable> {
+  private IntWritable result = new IntWritable();
+
+  public void reduce(Key key, Iterable<IntWritable> values, 
+                     Context context) throws IOException, InterruptedException {
+    int sum = 0;
+    for (IntWritable val : values) {
+      sum += val.get();
+    }
+    result.set(sum);
+    context.collect(key, result);
+  }
+
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/LongSumReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/LongSumReducer.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/LongSumReducer.java
(added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/LongSumReducer.java
Fri Jul 25 08:57:41 2008
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.reduce;
+
+import java.io.IOException;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+public class LongSumReducer<KEY> extends Reducer<KEY, LongWritable,
+                                                 KEY,LongWritable> {
+
+  private LongWritable result = new LongWritable();
+
+  public void reduce(KEY key, Iterable<LongWritable> values,
+                     Context context) throws IOException, InterruptedException {
+    long sum = 0;
+    for (LongWritable val : values) {
+      sum += val.get();
+    }
+    result.set(sum);
+    context.collect(key, result);
+  }
+
+}
\ No newline at end of file



Mime
View raw message