hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shahab Yunus <shahab.yu...@gmail.com>
Subject Re: Change the output of Reduce function
Date Thu, 25 Jul 2013 19:27:59 GMT
If I understood your issue correctly, you want to only output the 'value'
part from the reduce after your job ends. Right?

If that is the case then you can do something like that in the reduce
function.

This will only output value and no key part.

*output.colllect(NullWritable.get(), value);*

(or context.write(NullWritable.get(), value) in the new API)

More explanation here:
http://books.google.com/books?id=Wu_xeGdU4G8C&pg=PA102&lpg=PA102&dq=example+nullwritable&source=bl&ots=i7AQWEOaSq&sig=Snzxo2SPNFdpKWK8fs5VkFYKlD8&hl=en&sa=X&ei=jXvxUfuyOIjO9QS6g4DACQ&ved=0CEkQ6AEwAw#v=onepage&q=example%20nullwritable&f=false


Regards,
Shahab


On Thu, Jul 25, 2013 at 3:18 PM, Felipe Gutierrez <
felipe.o.gutierrez@gmail.com> wrote:

> Sorry, I think I didnt understand,
> Does NullWritable go to replate MyWritable? But this is may value. My key
> is a Text.
> Regards,
> Felipe
>
>
>
> On Thu, Jul 25, 2013 at 4:07 PM, Shahab Yunus <shahab.yunus@gmail.com>wrote:
>
>> I think uou can use NullWritable as key.
>>
>> http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/NullWritable.html
>>
>>
>> Regards,
>> Shahab
>>
>>
>> On Thu, Jul 25, 2013 at 2:58 PM, Felipe Gutierrez <
>> felipe.o.gutierrez@gmail.com> wrote:
>>
>>> I did a MapReduce program to execute a Grep function. I know there is a
>>> Grep function at hadoop examples, but I want to make my Grep MapReduce to
>>> explain to other.
>>> My problem is that my out put shows the key/value. I want to show only
>>> the value, since I saved the line number at this value. Example:
>>>
>>> 00048 [ line 6298 : Jul 25 15:18:14 felipe kernel: [ 2168.644689]
>>> wlan0: associated ]
>>>
>>> Here is my code. Thanks,
>>> Felipe
>>>
>>> package grep;
>>>
>>> import java.io.File;
>>> import java.io.FileReader;
>>> import java.io.LineNumberReader;
>>>
>>> import org.apache.hadoop.fs.Path;
>>> import org.apache.hadoop.io.Text;
>>> import org.apache.hadoop.mapred.FileInputFormat;
>>> import org.apache.hadoop.mapred.FileOutputFormat;
>>> import org.apache.hadoop.mapred.JobClient;
>>> import org.apache.hadoop.mapred.JobConf;
>>> import org.apache.hadoop.mapred.TextInputFormat;
>>> import org.apache.hadoop.mapred.TextOutputFormat;
>>>
>>> public class Main {
>>>
>>>  public static void main(String[] args) throws Exception {
>>>
>>> if (args == null || args.length != 3) {
>>>  System.err.println("Usage: Main <in> <out> <regex>");
>>> System.exit(-1);
>>>  }
>>>
>>> JobConf conf = new JobConf(Main.class);
>>> conf.setJobName("grep");
>>>
>>> String input = args[0];
>>> String output = args[1];
>>> String regex = args[2];
>>>
>>> File arquivoLeitura = new File(input);
>>> LineNumberReader linhaLeitura = new LineNumberReader(new FileReader(
>>>  arquivoLeitura));
>>> linhaLeitura.skip(arquivoLeitura.length());
>>> String lines = String.valueOf(linhaLeitura.getLineNumber() + 1);
>>>  conf.set("grep.regex", regex);
>>> conf.set("grep.lines", lines);
>>>
>>> conf.setOutputKeyClass(Text.class);
>>>  conf.setOutputValueClass(MyWritable.class);
>>>
>>> conf.setMapperClass(GrepMapper.class);
>>> conf.setCombinerClass(GrepReducer.class);
>>>  conf.setReducerClass(GrepReducer.class);
>>>
>>> conf.setInputFormat(TextInputFormat.class);
>>>  conf.setOutputFormat(TextOutputFormat.class);
>>>
>>> FileInputFormat.setInputPaths(conf, new Path(input));
>>>  FileOutputFormat.setOutputPath(conf, new Path(output));
>>>
>>> JobClient.runJob(conf);
>>> }
>>> }
>>>
>>> package grep;
>>>
>>> import java.io.IOException;
>>> import java.text.DecimalFormat;
>>>
>>> import org.apache.hadoop.io.LongWritable;
>>> import org.apache.hadoop.io.Text;
>>> import org.apache.hadoop.mapred.JobConf;
>>> import org.apache.hadoop.mapred.MapReduceBase;
>>> import org.apache.hadoop.mapred.Mapper;
>>> import org.apache.hadoop.mapred.OutputCollector;
>>> import org.apache.hadoop.mapred.Reporter;
>>>
>>> public class GrepMapper extends MapReduceBase implements
>>> Mapper<LongWritable, Text, Text, MyWritable> {
>>>
>>> private static long line = 1;
>>> private static long n = 0;
>>> private static long divisor = 1;
>>>  private static long qtdLines = 0;
>>> private Text k = new Text();
>>>
>>> public void map(LongWritable key, Text value,
>>>  OutputCollector<Text, MyWritable> output, Reporter reporter)
>>> throws IOException {
>>> String str = value.toString();
>>>  MyWritable text = new MyWritable("line " + line + " : " + str);
>>> if ((line % divisor) == 0) {
>>>  n++;
>>> }
>>> k.set(customFormat("00000", n));
>>>  output.collect(k, text);
>>> line++;
>>> }
>>>
>>>  @Override
>>> public void configure(JobConf job) {
>>> qtdLines = Long.parseLong(job.get("grep.lines"));
>>>  if (qtdLines <= 500) {
>>> divisor = 10;
>>> } else if (qtdLines <= 1000) {
>>>  divisor = 20;
>>> } else if (qtdLines <= 1500) {
>>> divisor = 30;
>>>  } else if (qtdLines <= 2000) {
>>> divisor = 40;
>>> } else if (qtdLines <= 2500) {
>>>  divisor = 50;
>>> } else if (qtdLines <= 3000) {
>>> divisor = 60;
>>>  } else if (qtdLines <= 3500) {
>>> divisor = 70;
>>> } else if (qtdLines <= 4000) {
>>>  divisor = 80;
>>> } else if (qtdLines <= 4500) {
>>> divisor = 90;
>>>  } else if (qtdLines <= 5000) {
>>> divisor = 100;
>>> } else if (qtdLines <= 5500) {
>>>  divisor = 110;
>>> } else if (qtdLines <= 6000) {
>>> divisor = 120;
>>>  } else if (qtdLines <= 6500) {
>>> divisor = 130;
>>> } else if (qtdLines <= 7000) {
>>>  divisor = 140;
>>> }
>>> }
>>>
>>> static public String customFormat(String pattern, double value) {
>>>  DecimalFormat myFormatter = new DecimalFormat(pattern);
>>> return myFormatter.format(value);
>>> }
>>> }
>>>
>>> package grep;
>>>
>>> import java.io.IOException;
>>> import java.util.Iterator;
>>> import java.util.regex.Matcher;
>>> import java.util.regex.Pattern;
>>>
>>> import org.apache.hadoop.io.Text;
>>> import org.apache.hadoop.mapred.JobConf;
>>> import org.apache.hadoop.mapred.MapReduceBase;
>>> import org.apache.hadoop.mapred.OutputCollector;
>>> import org.apache.hadoop.mapred.Reducer;
>>> import org.apache.hadoop.mapred.Reporter;
>>>
>>> public class GrepReducer extends MapReduceBase implements
>>> Reducer<Text, MyWritable, Text, MyWritable> {
>>>
>>> private Pattern pattern;
>>>
>>> @Override
>>> public void configure(JobConf job) {
>>>  pattern = Pattern.compile(job.get("grep.regex"));
>>> }
>>>
>>> public void reduce(Text key, Iterator<MyWritable> values,
>>>  OutputCollector<Text, MyWritable> output, Reporter reporter)
>>> throws IOException {
>>>
>>>  while (values.hasNext()) {
>>> String text = (String) values.next().get();
>>> Matcher matcher = pattern.matcher(text);
>>>  while (matcher.find()) {
>>> output.collect(key, new MyWritable(text));
>>>  }
>>> }
>>> }
>>> }
>>>
>>> package grep;
>>>
>>> import java.io.DataInput;
>>> import java.io.DataOutput;
>>> import java.io.IOException;
>>> import java.lang.reflect.Array;
>>> import java.util.HashMap;
>>> import java.util.Map;
>>>
>>> import org.apache.hadoop.conf.Configurable;
>>> import org.apache.hadoop.conf.Configuration;
>>> import org.apache.hadoop.conf.Configured;
>>> import org.apache.hadoop.io.UTF8;
>>>  import org.apache.hadoop.io.Writable;
>>> import org.apache.hadoop.io.WritableFactories;
>>>
>>> public class MyWritable implements Writable, Configurable {
>>>
>>> private Class declaredClass;
>>>  private Object instance;
>>> private Configuration conf;
>>>
>>> public MyWritable() {
>>>  }
>>>
>>> public MyWritable(Object instance) {
>>> set(instance);
>>>  }
>>>
>>> public MyWritable(Class declaredClass, Object instance) {
>>> this.declaredClass = declaredClass;
>>>  this.instance = instance;
>>> }
>>>
>>> /** Return the instance, or null if none. */
>>>  public Object get() {
>>> return instance;
>>> }
>>>
>>>  /** Return the class this is meant to be. */
>>> public Class getDeclaredClass() {
>>> return declaredClass;
>>>  }
>>>
>>> /** Reset the instance. */
>>> public void set(Object instance) {
>>>  this.declaredClass = instance.getClass();
>>> this.instance = instance;
>>> }
>>>
>>> public String toString() {
>>> return "[ " + instance + " ]";
>>> }
>>>
>>> public void readFields(DataInput in) throws IOException {
>>> readObject(in, this, this.conf);
>>>  }
>>>
>>> public void write(DataOutput out) throws IOException {
>>> writeObject(out, instance, declaredClass, conf);
>>>  }
>>>
>>> private static final Map<String, Class<?>> PRIMITIVE_NAMES = new
>>> HashMap<String, Class<?>>();
>>>  static {
>>> PRIMITIVE_NAMES.put("boolean", Boolean.TYPE);
>>> PRIMITIVE_NAMES.put("byte", Byte.TYPE);
>>>  PRIMITIVE_NAMES.put("char", Character.TYPE);
>>> PRIMITIVE_NAMES.put("short", Short.TYPE);
>>>  PRIMITIVE_NAMES.put("int", Integer.TYPE);
>>> PRIMITIVE_NAMES.put("long", Long.TYPE);
>>>  PRIMITIVE_NAMES.put("float", Float.TYPE);
>>> PRIMITIVE_NAMES.put("double", Double.TYPE);
>>>  PRIMITIVE_NAMES.put("void", Void.TYPE);
>>> }
>>>
>>> private static class NullInstance extends Configured implements Writable
>>> {
>>>  private Class<?> declaredClass;
>>>
>>> public NullInstance() {
>>> super(null);
>>>  }
>>>
>>> public NullInstance(Class declaredClass, Configuration conf) {
>>> super(conf);
>>>  this.declaredClass = declaredClass;
>>> }
>>>
>>> public void readFields(DataInput in) throws IOException {
>>>  String className = UTF8.readString(in);
>>> declaredClass = PRIMITIVE_NAMES.get(className);
>>> if (declaredClass == null) {
>>>  try {
>>> declaredClass = getConf().getClassByName(className);
>>> } catch (ClassNotFoundException e) {
>>>  throw new RuntimeException(e.toString());
>>> }
>>> }
>>>  }
>>>
>>> public void write(DataOutput out) throws IOException {
>>> UTF8.writeString(out, declaredClass.getName());
>>>  }
>>> }
>>>
>>> /**
>>>  * Write a {@link Writable}, {@link String}, primitive type, or an array
>>> of
>>>  * the preceding.
>>>  */
>>> public static void writeObject(DataOutput out, Object instance,
>>>  Class declaredClass, Configuration conf) throws IOException {
>>>
>>> if (instance == null) { // null
>>>  instance = new NullInstance(declaredClass, conf);
>>> declaredClass = Writable.class;
>>> }
>>>
>>> UTF8.writeString(out, declaredClass.getName()); // always write declared
>>>
>>> if (declaredClass.isArray()) { // array
>>>  int length = Array.getLength(instance);
>>> out.writeInt(length);
>>> for (int i = 0; i < length; i++) {
>>>  writeObject(out, Array.get(instance, i),
>>> declaredClass.getComponentType(), conf);
>>> }
>>>
>>> } else if (declaredClass == String.class) { // String
>>> UTF8.writeString(out, (String) instance);
>>>
>>> } else if (declaredClass.isPrimitive()) { // primitive type
>>>
>>> if (declaredClass == Boolean.TYPE) { // boolean
>>>  out.writeBoolean(((Boolean) instance).booleanValue());
>>> } else if (declaredClass == Character.TYPE) { // char
>>>  out.writeChar(((Character) instance).charValue());
>>> } else if (declaredClass == Byte.TYPE) { // byte
>>>  out.writeByte(((Byte) instance).byteValue());
>>> } else if (declaredClass == Short.TYPE) { // short
>>> out.writeShort(((Short) instance).shortValue());
>>>  } else if (declaredClass == Integer.TYPE) { // int
>>> out.writeInt(((Integer) instance).intValue());
>>>  } else if (declaredClass == Long.TYPE) { // long
>>> out.writeLong(((Long) instance).longValue());
>>> } else if (declaredClass == Float.TYPE) { // float
>>>  out.writeFloat(((Float) instance).floatValue());
>>> } else if (declaredClass == Double.TYPE) { // double
>>>  out.writeDouble(((Double) instance).doubleValue());
>>> } else if (declaredClass == Void.TYPE) { // void
>>>  } else {
>>> throw new IllegalArgumentException("Not a primitive: "
>>> + declaredClass);
>>>  }
>>> } else if (declaredClass.isEnum()) { // enum
>>> UTF8.writeString(out, ((Enum) instance).name());
>>>  } else if (Writable.class.isAssignableFrom(declaredClass)) { //
>>> Writable
>>> UTF8.writeString(out, instance.getClass().getName());
>>>  ((Writable) instance).write(out);
>>>
>>> } else {
>>> throw new IOException("Can't write: " + instance + " as "
>>>  + declaredClass);
>>> }
>>> }
>>>
>>> /**
>>>  * Read a {@link Writable}, {@link String}, primitive type, or an array
>>> of
>>>  * the preceding.
>>>  */
>>>  public static Object readObject(DataInput in, Configuration conf)
>>> throws IOException {
>>> return readObject(in, null, conf);
>>>  }
>>>
>>> /**
>>>  * Read a {@link Writable}, {@link String}, primitive type, or an array
>>> of
>>>  * the preceding.
>>>  */
>>> @SuppressWarnings("unchecked")
>>>  public static Object readObject(DataInput in, MyWritable
>>> objectWritable,
>>> Configuration conf) throws IOException {
>>>  String className = UTF8.readString(in);
>>> Class<?> declaredClass = PRIMITIVE_NAMES.get(className);
>>>  if (declaredClass == null) {
>>> try {
>>> declaredClass = conf.getClassByName(className);
>>>  } catch (ClassNotFoundException e) {
>>> throw new RuntimeException("readObject can't find class "
>>>  + className, e);
>>> }
>>> }
>>>
>>> Object instance;
>>>
>>> if (declaredClass.isPrimitive()) { // primitive types
>>>
>>> if (declaredClass == Boolean.TYPE) { // boolean
>>>  instance = Boolean.valueOf(in.readBoolean());
>>> } else if (declaredClass == Character.TYPE) { // char
>>>  instance = Character.valueOf(in.readChar());
>>> } else if (declaredClass == Byte.TYPE) { // byte
>>> instance = Byte.valueOf(in.readByte());
>>>  } else if (declaredClass == Short.TYPE) { // short
>>> instance = Short.valueOf(in.readShort());
>>> } else if (declaredClass == Integer.TYPE) { // int
>>>  instance = Integer.valueOf(in.readInt());
>>> } else if (declaredClass == Long.TYPE) { // long
>>> instance = Long.valueOf(in.readLong());
>>>  } else if (declaredClass == Float.TYPE) { // float
>>> instance = Float.valueOf(in.readFloat());
>>> } else if (declaredClass == Double.TYPE) { // double
>>>  instance = Double.valueOf(in.readDouble());
>>> } else if (declaredClass == Void.TYPE) { // void
>>> instance = null;
>>>  } else {
>>> throw new IllegalArgumentException("Not a primitive: "
>>> + declaredClass);
>>>  }
>>>
>>> } else if (declaredClass.isArray()) { // array
>>> int length = in.readInt();
>>>  instance = Array.newInstance(declaredClass.getComponentType(),
>>> length);
>>> for (int i = 0; i < length; i++) {
>>>  Array.set(instance, i, readObject(in, conf));
>>> }
>>>
>>> } else if (declaredClass == String.class) { // String
>>>  instance = UTF8.readString(in);
>>> } else if (declaredClass.isEnum()) { // enum
>>> instance = Enum.valueOf((Class<? extends Enum>) declaredClass,
>>>  UTF8.readString(in));
>>> } else { // Writable
>>> Class instanceClass = null;
>>>  String str = "";
>>> try {
>>> str = UTF8.readString(in);
>>>  instanceClass = conf.getClassByName(str);
>>> } catch (ClassNotFoundException e) {
>>> throw new RuntimeException(
>>>  "readObject can't find class " + str, e);
>>> }
>>>
>>> Writable writable = WritableFactories.newInstance(instanceClass,
>>>  conf);
>>> writable.readFields(in);
>>> instance = writable;
>>>
>>> if (instanceClass == NullInstance.class) { // null
>>> declaredClass = ((NullInstance) instance).declaredClass;
>>>  instance = null;
>>> }
>>> }
>>>
>>> if (objectWritable != null) { // store values
>>>  objectWritable.declaredClass = declaredClass;
>>> objectWritable.instance = instance;
>>> }
>>>
>>> return instance;
>>>
>>> }
>>>
>>> public void setConf(Configuration conf) {
>>>  this.conf = conf;
>>> }
>>>
>>> public Configuration getConf() {
>>>  return this.conf;
>>> }
>>> }
>>>
>>>
>>> --
>>> *--
>>> -- Felipe Oliveira Gutierrez
>>> -- Felipe.o.Gutierrez@gmail.com
>>> -- https://sites.google.com/site/lipe82/Home/diaadia*
>>>
>>
>>
>
>
> --
> *--
> -- Felipe Oliveira Gutierrez
> -- Felipe.o.Gutierrez@gmail.com
> -- https://sites.google.com/site/lipe82/Home/diaadia*
>

Mime
View raw message