hadoop-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shahab Yunus <shahab.yu...@gmail.com>
Subject Re: Change the output of Reduce function
Date Thu, 25 Jul 2013 19:32:53 GMT
@Tariq. No need. You understood me correctly :)

Regards,
Shahab


On Thu, Jul 25, 2013 at 3:27 PM, Mohammad Tariq <dontariq@gmail.com> wrote:

> @Shahab : Please correct me if I misunderstood you.
>
> Warm Regards,
> Tariq
> cloudfront.blogspot.com
>
>
> On Fri, Jul 26, 2013 at 12:56 AM, Mohammad Tariq <dontariq@gmail.com>wrote:
>
>> Probably by that Shahab means that you can use NullWritable as your key
>> from the Reducer. If you do so your Reducer will just emit the value
>> without the key. Something like this :
>>
>> output.collect(NullWritable.get(), new MyWritable(text));
>>
>> Warm Regards,
>> Tariq
>> cloudfront.blogspot.com
>>
>>
>> On Fri, Jul 26, 2013 at 12:48 AM, Felipe Gutierrez <
>> felipe.o.gutierrez@gmail.com> wrote:
>>
>>> Sorry, I think I didnt understand,
>>> Does NullWritable go to replate MyWritable? But this is may value. My
>>> key is a Text.
>>> Regards,
>>> Felipe
>>>
>>>
>>>
>>> On Thu, Jul 25, 2013 at 4:07 PM, Shahab Yunus <shahab.yunus@gmail.com>wrote:
>>>
>>>> I think uou can use NullWritable as key.
>>>>
>>>> http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/NullWritable.html
>>>>
>>>>
>>>> Regards,
>>>> Shahab
>>>>
>>>>
>>>> On Thu, Jul 25, 2013 at 2:58 PM, Felipe Gutierrez <
>>>> felipe.o.gutierrez@gmail.com> wrote:
>>>>
>>>>> I did a MapReduce program to execute a Grep function. I know there is
>>>>> a Grep function at hadoop examples, but I want to make my Grep MapReduce
to
>>>>> explain to other.
>>>>> My problem is that my out put shows the key/value. I want to show only
>>>>> the value, since I saved the line number at this value. Example:
>>>>>
>>>>> 00048 [ line 6298 : Jul 25 15:18:14 felipe kernel: [ 2168.644689]
>>>>> wlan0: associated ]
>>>>>
>>>>> Here is my code. Thanks,
>>>>> Felipe
>>>>>
>>>>> package grep;
>>>>>
>>>>> import java.io.File;
>>>>> import java.io.FileReader;
>>>>> import java.io.LineNumberReader;
>>>>>
>>>>> import org.apache.hadoop.fs.Path;
>>>>> import org.apache.hadoop.io.Text;
>>>>> import org.apache.hadoop.mapred.FileInputFormat;
>>>>> import org.apache.hadoop.mapred.FileOutputFormat;
>>>>> import org.apache.hadoop.mapred.JobClient;
>>>>> import org.apache.hadoop.mapred.JobConf;
>>>>> import org.apache.hadoop.mapred.TextInputFormat;
>>>>> import org.apache.hadoop.mapred.TextOutputFormat;
>>>>>
>>>>> public class Main {
>>>>>
>>>>>  public static void main(String[] args) throws Exception {
>>>>>
>>>>> if (args == null || args.length != 3) {
>>>>>  System.err.println("Usage: Main <in> <out> <regex>");
>>>>> System.exit(-1);
>>>>>  }
>>>>>
>>>>> JobConf conf = new JobConf(Main.class);
>>>>> conf.setJobName("grep");
>>>>>
>>>>> String input = args[0];
>>>>> String output = args[1];
>>>>> String regex = args[2];
>>>>>
>>>>> File arquivoLeitura = new File(input);
>>>>> LineNumberReader linhaLeitura = new LineNumberReader(new FileReader(
>>>>>  arquivoLeitura));
>>>>> linhaLeitura.skip(arquivoLeitura.length());
>>>>> String lines = String.valueOf(linhaLeitura.getLineNumber() + 1);
>>>>>  conf.set("grep.regex", regex);
>>>>> conf.set("grep.lines", lines);
>>>>>
>>>>> conf.setOutputKeyClass(Text.class);
>>>>>  conf.setOutputValueClass(MyWritable.class);
>>>>>
>>>>> conf.setMapperClass(GrepMapper.class);
>>>>> conf.setCombinerClass(GrepReducer.class);
>>>>>  conf.setReducerClass(GrepReducer.class);
>>>>>
>>>>> conf.setInputFormat(TextInputFormat.class);
>>>>>  conf.setOutputFormat(TextOutputFormat.class);
>>>>>
>>>>> FileInputFormat.setInputPaths(conf, new Path(input));
>>>>>  FileOutputFormat.setOutputPath(conf, new Path(output));
>>>>>
>>>>> JobClient.runJob(conf);
>>>>> }
>>>>> }
>>>>>
>>>>> package grep;
>>>>>
>>>>> import java.io.IOException;
>>>>> import java.text.DecimalFormat;
>>>>>
>>>>> import org.apache.hadoop.io.LongWritable;
>>>>> import org.apache.hadoop.io.Text;
>>>>> import org.apache.hadoop.mapred.JobConf;
>>>>> import org.apache.hadoop.mapred.MapReduceBase;
>>>>> import org.apache.hadoop.mapred.Mapper;
>>>>> import org.apache.hadoop.mapred.OutputCollector;
>>>>> import org.apache.hadoop.mapred.Reporter;
>>>>>
>>>>> public class GrepMapper extends MapReduceBase implements
>>>>> Mapper<LongWritable, Text, Text, MyWritable> {
>>>>>
>>>>> private static long line = 1;
>>>>> private static long n = 0;
>>>>> private static long divisor = 1;
>>>>>  private static long qtdLines = 0;
>>>>> private Text k = new Text();
>>>>>
>>>>> public void map(LongWritable key, Text value,
>>>>>  OutputCollector<Text, MyWritable> output, Reporter reporter)
>>>>> throws IOException {
>>>>> String str = value.toString();
>>>>>  MyWritable text = new MyWritable("line " + line + " : " + str);
>>>>> if ((line % divisor) == 0) {
>>>>>  n++;
>>>>> }
>>>>> k.set(customFormat("00000", n));
>>>>>  output.collect(k, text);
>>>>> line++;
>>>>> }
>>>>>
>>>>>  @Override
>>>>> public void configure(JobConf job) {
>>>>> qtdLines = Long.parseLong(job.get("grep.lines"));
>>>>>  if (qtdLines <= 500) {
>>>>> divisor = 10;
>>>>> } else if (qtdLines <= 1000) {
>>>>>  divisor = 20;
>>>>> } else if (qtdLines <= 1500) {
>>>>> divisor = 30;
>>>>>  } else if (qtdLines <= 2000) {
>>>>> divisor = 40;
>>>>> } else if (qtdLines <= 2500) {
>>>>>  divisor = 50;
>>>>> } else if (qtdLines <= 3000) {
>>>>> divisor = 60;
>>>>>  } else if (qtdLines <= 3500) {
>>>>> divisor = 70;
>>>>> } else if (qtdLines <= 4000) {
>>>>>  divisor = 80;
>>>>> } else if (qtdLines <= 4500) {
>>>>> divisor = 90;
>>>>>  } else if (qtdLines <= 5000) {
>>>>> divisor = 100;
>>>>> } else if (qtdLines <= 5500) {
>>>>>  divisor = 110;
>>>>> } else if (qtdLines <= 6000) {
>>>>> divisor = 120;
>>>>>  } else if (qtdLines <= 6500) {
>>>>> divisor = 130;
>>>>> } else if (qtdLines <= 7000) {
>>>>>  divisor = 140;
>>>>> }
>>>>> }
>>>>>
>>>>> static public String customFormat(String pattern, double value) {
>>>>>  DecimalFormat myFormatter = new DecimalFormat(pattern);
>>>>> return myFormatter.format(value);
>>>>> }
>>>>> }
>>>>>
>>>>> package grep;
>>>>>
>>>>> import java.io.IOException;
>>>>> import java.util.Iterator;
>>>>> import java.util.regex.Matcher;
>>>>> import java.util.regex.Pattern;
>>>>>
>>>>> import org.apache.hadoop.io.Text;
>>>>> import org.apache.hadoop.mapred.JobConf;
>>>>> import org.apache.hadoop.mapred.MapReduceBase;
>>>>> import org.apache.hadoop.mapred.OutputCollector;
>>>>> import org.apache.hadoop.mapred.Reducer;
>>>>> import org.apache.hadoop.mapred.Reporter;
>>>>>
>>>>> public class GrepReducer extends MapReduceBase implements
>>>>> Reducer<Text, MyWritable, Text, MyWritable> {
>>>>>
>>>>> private Pattern pattern;
>>>>>
>>>>> @Override
>>>>> public void configure(JobConf job) {
>>>>>  pattern = Pattern.compile(job.get("grep.regex"));
>>>>> }
>>>>>
>>>>> public void reduce(Text key, Iterator<MyWritable> values,
>>>>>  OutputCollector<Text, MyWritable> output, Reporter reporter)
>>>>> throws IOException {
>>>>>
>>>>>  while (values.hasNext()) {
>>>>> String text = (String) values.next().get();
>>>>> Matcher matcher = pattern.matcher(text);
>>>>>  while (matcher.find()) {
>>>>> output.collect(key, new MyWritable(text));
>>>>>  }
>>>>> }
>>>>> }
>>>>> }
>>>>>
>>>>> package grep;
>>>>>
>>>>> import java.io.DataInput;
>>>>> import java.io.DataOutput;
>>>>> import java.io.IOException;
>>>>> import java.lang.reflect.Array;
>>>>> import java.util.HashMap;
>>>>> import java.util.Map;
>>>>>
>>>>> import org.apache.hadoop.conf.Configurable;
>>>>> import org.apache.hadoop.conf.Configuration;
>>>>> import org.apache.hadoop.conf.Configured;
>>>>> import org.apache.hadoop.io.UTF8;
>>>>>  import org.apache.hadoop.io.Writable;
>>>>> import org.apache.hadoop.io.WritableFactories;
>>>>>
>>>>> public class MyWritable implements Writable, Configurable {
>>>>>
>>>>> private Class declaredClass;
>>>>>  private Object instance;
>>>>> private Configuration conf;
>>>>>
>>>>> public MyWritable() {
>>>>>  }
>>>>>
>>>>> public MyWritable(Object instance) {
>>>>> set(instance);
>>>>>  }
>>>>>
>>>>> public MyWritable(Class declaredClass, Object instance) {
>>>>> this.declaredClass = declaredClass;
>>>>>  this.instance = instance;
>>>>> }
>>>>>
>>>>> /** Return the instance, or null if none. */
>>>>>  public Object get() {
>>>>> return instance;
>>>>> }
>>>>>
>>>>>  /** Return the class this is meant to be. */
>>>>> public Class getDeclaredClass() {
>>>>> return declaredClass;
>>>>>  }
>>>>>
>>>>> /** Reset the instance. */
>>>>> public void set(Object instance) {
>>>>>  this.declaredClass = instance.getClass();
>>>>> this.instance = instance;
>>>>> }
>>>>>
>>>>> public String toString() {
>>>>> return "[ " + instance + " ]";
>>>>> }
>>>>>
>>>>> public void readFields(DataInput in) throws IOException {
>>>>> readObject(in, this, this.conf);
>>>>>  }
>>>>>
>>>>> public void write(DataOutput out) throws IOException {
>>>>> writeObject(out, instance, declaredClass, conf);
>>>>>  }
>>>>>
>>>>> private static final Map<String, Class<?>> PRIMITIVE_NAMES
= new
>>>>> HashMap<String, Class<?>>();
>>>>>  static {
>>>>> PRIMITIVE_NAMES.put("boolean", Boolean.TYPE);
>>>>> PRIMITIVE_NAMES.put("byte", Byte.TYPE);
>>>>>  PRIMITIVE_NAMES.put("char", Character.TYPE);
>>>>> PRIMITIVE_NAMES.put("short", Short.TYPE);
>>>>>  PRIMITIVE_NAMES.put("int", Integer.TYPE);
>>>>> PRIMITIVE_NAMES.put("long", Long.TYPE);
>>>>>  PRIMITIVE_NAMES.put("float", Float.TYPE);
>>>>> PRIMITIVE_NAMES.put("double", Double.TYPE);
>>>>>  PRIMITIVE_NAMES.put("void", Void.TYPE);
>>>>> }
>>>>>
>>>>> private static class NullInstance extends Configured implements
>>>>> Writable {
>>>>>  private Class<?> declaredClass;
>>>>>
>>>>> public NullInstance() {
>>>>> super(null);
>>>>>  }
>>>>>
>>>>> public NullInstance(Class declaredClass, Configuration conf) {
>>>>> super(conf);
>>>>>  this.declaredClass = declaredClass;
>>>>> }
>>>>>
>>>>> public void readFields(DataInput in) throws IOException {
>>>>>  String className = UTF8.readString(in);
>>>>> declaredClass = PRIMITIVE_NAMES.get(className);
>>>>> if (declaredClass == null) {
>>>>>  try {
>>>>> declaredClass = getConf().getClassByName(className);
>>>>> } catch (ClassNotFoundException e) {
>>>>>  throw new RuntimeException(e.toString());
>>>>> }
>>>>> }
>>>>>  }
>>>>>
>>>>> public void write(DataOutput out) throws IOException {
>>>>> UTF8.writeString(out, declaredClass.getName());
>>>>>  }
>>>>> }
>>>>>
>>>>> /**
>>>>>  * Write a {@link Writable}, {@link String}, primitive type, or an
>>>>> array of
>>>>>  * the preceding.
>>>>>  */
>>>>> public static void writeObject(DataOutput out, Object instance,
>>>>>  Class declaredClass, Configuration conf) throws IOException {
>>>>>
>>>>> if (instance == null) { // null
>>>>>  instance = new NullInstance(declaredClass, conf);
>>>>> declaredClass = Writable.class;
>>>>> }
>>>>>
>>>>> UTF8.writeString(out, declaredClass.getName()); // always write
>>>>> declared
>>>>>
>>>>> if (declaredClass.isArray()) { // array
>>>>>  int length = Array.getLength(instance);
>>>>> out.writeInt(length);
>>>>> for (int i = 0; i < length; i++) {
>>>>>  writeObject(out, Array.get(instance, i),
>>>>> declaredClass.getComponentType(), conf);
>>>>> }
>>>>>
>>>>> } else if (declaredClass == String.class) { // String
>>>>> UTF8.writeString(out, (String) instance);
>>>>>
>>>>> } else if (declaredClass.isPrimitive()) { // primitive type
>>>>>
>>>>> if (declaredClass == Boolean.TYPE) { // boolean
>>>>>  out.writeBoolean(((Boolean) instance).booleanValue());
>>>>> } else if (declaredClass == Character.TYPE) { // char
>>>>>  out.writeChar(((Character) instance).charValue());
>>>>> } else if (declaredClass == Byte.TYPE) { // byte
>>>>>  out.writeByte(((Byte) instance).byteValue());
>>>>> } else if (declaredClass == Short.TYPE) { // short
>>>>> out.writeShort(((Short) instance).shortValue());
>>>>>  } else if (declaredClass == Integer.TYPE) { // int
>>>>> out.writeInt(((Integer) instance).intValue());
>>>>>  } else if (declaredClass == Long.TYPE) { // long
>>>>> out.writeLong(((Long) instance).longValue());
>>>>> } else if (declaredClass == Float.TYPE) { // float
>>>>>  out.writeFloat(((Float) instance).floatValue());
>>>>> } else if (declaredClass == Double.TYPE) { // double
>>>>>  out.writeDouble(((Double) instance).doubleValue());
>>>>> } else if (declaredClass == Void.TYPE) { // void
>>>>>  } else {
>>>>> throw new IllegalArgumentException("Not a primitive: "
>>>>> + declaredClass);
>>>>>  }
>>>>> } else if (declaredClass.isEnum()) { // enum
>>>>> UTF8.writeString(out, ((Enum) instance).name());
>>>>>  } else if (Writable.class.isAssignableFrom(declaredClass)) { //
>>>>> Writable
>>>>> UTF8.writeString(out, instance.getClass().getName());
>>>>>  ((Writable) instance).write(out);
>>>>>
>>>>> } else {
>>>>> throw new IOException("Can't write: " + instance + " as "
>>>>>  + declaredClass);
>>>>> }
>>>>> }
>>>>>
>>>>> /**
>>>>>  * Read a {@link Writable}, {@link String}, primitive type, or an
>>>>> array of
>>>>>  * the preceding.
>>>>>  */
>>>>>  public static Object readObject(DataInput in, Configuration conf)
>>>>> throws IOException {
>>>>> return readObject(in, null, conf);
>>>>>  }
>>>>>
>>>>> /**
>>>>>  * Read a {@link Writable}, {@link String}, primitive type, or an
>>>>> array of
>>>>>  * the preceding.
>>>>>  */
>>>>> @SuppressWarnings("unchecked")
>>>>>  public static Object readObject(DataInput in, MyWritable
>>>>> objectWritable,
>>>>> Configuration conf) throws IOException {
>>>>>  String className = UTF8.readString(in);
>>>>> Class<?> declaredClass = PRIMITIVE_NAMES.get(className);
>>>>>  if (declaredClass == null) {
>>>>> try {
>>>>> declaredClass = conf.getClassByName(className);
>>>>>  } catch (ClassNotFoundException e) {
>>>>> throw new RuntimeException("readObject can't find class "
>>>>>  + className, e);
>>>>> }
>>>>> }
>>>>>
>>>>> Object instance;
>>>>>
>>>>> if (declaredClass.isPrimitive()) { // primitive types
>>>>>
>>>>> if (declaredClass == Boolean.TYPE) { // boolean
>>>>>  instance = Boolean.valueOf(in.readBoolean());
>>>>> } else if (declaredClass == Character.TYPE) { // char
>>>>>  instance = Character.valueOf(in.readChar());
>>>>> } else if (declaredClass == Byte.TYPE) { // byte
>>>>> instance = Byte.valueOf(in.readByte());
>>>>>  } else if (declaredClass == Short.TYPE) { // short
>>>>> instance = Short.valueOf(in.readShort());
>>>>> } else if (declaredClass == Integer.TYPE) { // int
>>>>>  instance = Integer.valueOf(in.readInt());
>>>>> } else if (declaredClass == Long.TYPE) { // long
>>>>> instance = Long.valueOf(in.readLong());
>>>>>  } else if (declaredClass == Float.TYPE) { // float
>>>>> instance = Float.valueOf(in.readFloat());
>>>>> } else if (declaredClass == Double.TYPE) { // double
>>>>>  instance = Double.valueOf(in.readDouble());
>>>>> } else if (declaredClass == Void.TYPE) { // void
>>>>> instance = null;
>>>>>  } else {
>>>>> throw new IllegalArgumentException("Not a primitive: "
>>>>> + declaredClass);
>>>>>  }
>>>>>
>>>>> } else if (declaredClass.isArray()) { // array
>>>>> int length = in.readInt();
>>>>>  instance = Array.newInstance(declaredClass.getComponentType(),
>>>>> length);
>>>>> for (int i = 0; i < length; i++) {
>>>>>  Array.set(instance, i, readObject(in, conf));
>>>>> }
>>>>>
>>>>> } else if (declaredClass == String.class) { // String
>>>>>  instance = UTF8.readString(in);
>>>>> } else if (declaredClass.isEnum()) { // enum
>>>>> instance = Enum.valueOf((Class<? extends Enum>) declaredClass,
>>>>>  UTF8.readString(in));
>>>>> } else { // Writable
>>>>> Class instanceClass = null;
>>>>>  String str = "";
>>>>> try {
>>>>> str = UTF8.readString(in);
>>>>>  instanceClass = conf.getClassByName(str);
>>>>> } catch (ClassNotFoundException e) {
>>>>> throw new RuntimeException(
>>>>>  "readObject can't find class " + str, e);
>>>>> }
>>>>>
>>>>> Writable writable = WritableFactories.newInstance(instanceClass,
>>>>>  conf);
>>>>> writable.readFields(in);
>>>>> instance = writable;
>>>>>
>>>>> if (instanceClass == NullInstance.class) { // null
>>>>> declaredClass = ((NullInstance) instance).declaredClass;
>>>>>  instance = null;
>>>>> }
>>>>> }
>>>>>
>>>>> if (objectWritable != null) { // store values
>>>>>  objectWritable.declaredClass = declaredClass;
>>>>> objectWritable.instance = instance;
>>>>> }
>>>>>
>>>>> return instance;
>>>>>
>>>>> }
>>>>>
>>>>> public void setConf(Configuration conf) {
>>>>>  this.conf = conf;
>>>>> }
>>>>>
>>>>> public Configuration getConf() {
>>>>>  return this.conf;
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>> --
>>>>> *--
>>>>> -- Felipe Oliveira Gutierrez
>>>>> -- Felipe.o.Gutierrez@gmail.com
>>>>> -- https://sites.google.com/site/lipe82/Home/diaadia*
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> *--
>>> -- Felipe Oliveira Gutierrez
>>> -- Felipe.o.Gutierrez@gmail.com
>>> -- https://sites.google.com/site/lipe82/Home/diaadia*
>>>
>>
>>
>

Mime
View raw message