hadoop-hdfs-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mohammad Tariq <donta...@gmail.com>
Subject Re: Change the output of Reduce function
Date Thu, 25 Jul 2013 19:27:37 GMT
@Shahab : Please correct me if I misunderstood you.

Warm Regards,
Tariq
cloudfront.blogspot.com


On Fri, Jul 26, 2013 at 12:56 AM, Mohammad Tariq <dontariq@gmail.com> wrote:

> Probably by that Shahab means that you can use NullWritable as your key
> from the Reducer. If you do so your Reducer will just emit the value
> without the key. Something like this :
>
> output.collect(NullWritable.get(), new MyWritable(text));
>
> Warm Regards,
> Tariq
> cloudfront.blogspot.com
>
>
> On Fri, Jul 26, 2013 at 12:48 AM, Felipe Gutierrez <
> felipe.o.gutierrez@gmail.com> wrote:
>
>> Sorry, I think I didnt understand,
>> Does NullWritable go to replate MyWritable? But this is may value. My key
>> is a Text.
>> Regards,
>> Felipe
>>
>>
>>
>> On Thu, Jul 25, 2013 at 4:07 PM, Shahab Yunus <shahab.yunus@gmail.com>wrote:
>>
>>> I think uou can use NullWritable as key.
>>>
>>> http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/NullWritable.html
>>>
>>>
>>> Regards,
>>> Shahab
>>>
>>>
>>> On Thu, Jul 25, 2013 at 2:58 PM, Felipe Gutierrez <
>>> felipe.o.gutierrez@gmail.com> wrote:
>>>
>>>> I did a MapReduce program to execute a Grep function. I know there is a
>>>> Grep function at hadoop examples, but I want to make my Grep MapReduce to
>>>> explain to other.
>>>> My problem is that my out put shows the key/value. I want to show only
>>>> the value, since I saved the line number at this value. Example:
>>>>
>>>> 00048 [ line 6298 : Jul 25 15:18:14 felipe kernel: [ 2168.644689]
>>>> wlan0: associated ]
>>>>
>>>> Here is my code. Thanks,
>>>> Felipe
>>>>
>>>> package grep;
>>>>
>>>> import java.io.File;
>>>> import java.io.FileReader;
>>>> import java.io.LineNumberReader;
>>>>
>>>> import org.apache.hadoop.fs.Path;
>>>> import org.apache.hadoop.io.Text;
>>>> import org.apache.hadoop.mapred.FileInputFormat;
>>>> import org.apache.hadoop.mapred.FileOutputFormat;
>>>> import org.apache.hadoop.mapred.JobClient;
>>>> import org.apache.hadoop.mapred.JobConf;
>>>> import org.apache.hadoop.mapred.TextInputFormat;
>>>> import org.apache.hadoop.mapred.TextOutputFormat;
>>>>
>>>> public class Main {
>>>>
>>>>  public static void main(String[] args) throws Exception {
>>>>
>>>> if (args == null || args.length != 3) {
>>>>  System.err.println("Usage: Main <in> <out> <regex>");
>>>> System.exit(-1);
>>>>  }
>>>>
>>>> JobConf conf = new JobConf(Main.class);
>>>> conf.setJobName("grep");
>>>>
>>>> String input = args[0];
>>>> String output = args[1];
>>>> String regex = args[2];
>>>>
>>>> File arquivoLeitura = new File(input);
>>>> LineNumberReader linhaLeitura = new LineNumberReader(new FileReader(
>>>>  arquivoLeitura));
>>>> linhaLeitura.skip(arquivoLeitura.length());
>>>> String lines = String.valueOf(linhaLeitura.getLineNumber() + 1);
>>>>  conf.set("grep.regex", regex);
>>>> conf.set("grep.lines", lines);
>>>>
>>>> conf.setOutputKeyClass(Text.class);
>>>>  conf.setOutputValueClass(MyWritable.class);
>>>>
>>>> conf.setMapperClass(GrepMapper.class);
>>>> conf.setCombinerClass(GrepReducer.class);
>>>>  conf.setReducerClass(GrepReducer.class);
>>>>
>>>> conf.setInputFormat(TextInputFormat.class);
>>>>  conf.setOutputFormat(TextOutputFormat.class);
>>>>
>>>> FileInputFormat.setInputPaths(conf, new Path(input));
>>>>  FileOutputFormat.setOutputPath(conf, new Path(output));
>>>>
>>>> JobClient.runJob(conf);
>>>> }
>>>> }
>>>>
>>>> package grep;
>>>>
>>>> import java.io.IOException;
>>>> import java.text.DecimalFormat;
>>>>
>>>> import org.apache.hadoop.io.LongWritable;
>>>> import org.apache.hadoop.io.Text;
>>>> import org.apache.hadoop.mapred.JobConf;
>>>> import org.apache.hadoop.mapred.MapReduceBase;
>>>> import org.apache.hadoop.mapred.Mapper;
>>>> import org.apache.hadoop.mapred.OutputCollector;
>>>> import org.apache.hadoop.mapred.Reporter;
>>>>
>>>> public class GrepMapper extends MapReduceBase implements
>>>> Mapper<LongWritable, Text, Text, MyWritable> {
>>>>
>>>> private static long line = 1;
>>>> private static long n = 0;
>>>> private static long divisor = 1;
>>>>  private static long qtdLines = 0;
>>>> private Text k = new Text();
>>>>
>>>> public void map(LongWritable key, Text value,
>>>>  OutputCollector<Text, MyWritable> output, Reporter reporter)
>>>> throws IOException {
>>>> String str = value.toString();
>>>>  MyWritable text = new MyWritable("line " + line + " : " + str);
>>>> if ((line % divisor) == 0) {
>>>>  n++;
>>>> }
>>>> k.set(customFormat("00000", n));
>>>>  output.collect(k, text);
>>>> line++;
>>>> }
>>>>
>>>>  @Override
>>>> public void configure(JobConf job) {
>>>> qtdLines = Long.parseLong(job.get("grep.lines"));
>>>>  if (qtdLines <= 500) {
>>>> divisor = 10;
>>>> } else if (qtdLines <= 1000) {
>>>>  divisor = 20;
>>>> } else if (qtdLines <= 1500) {
>>>> divisor = 30;
>>>>  } else if (qtdLines <= 2000) {
>>>> divisor = 40;
>>>> } else if (qtdLines <= 2500) {
>>>>  divisor = 50;
>>>> } else if (qtdLines <= 3000) {
>>>> divisor = 60;
>>>>  } else if (qtdLines <= 3500) {
>>>> divisor = 70;
>>>> } else if (qtdLines <= 4000) {
>>>>  divisor = 80;
>>>> } else if (qtdLines <= 4500) {
>>>> divisor = 90;
>>>>  } else if (qtdLines <= 5000) {
>>>> divisor = 100;
>>>> } else if (qtdLines <= 5500) {
>>>>  divisor = 110;
>>>> } else if (qtdLines <= 6000) {
>>>> divisor = 120;
>>>>  } else if (qtdLines <= 6500) {
>>>> divisor = 130;
>>>> } else if (qtdLines <= 7000) {
>>>>  divisor = 140;
>>>> }
>>>> }
>>>>
>>>> static public String customFormat(String pattern, double value) {
>>>>  DecimalFormat myFormatter = new DecimalFormat(pattern);
>>>> return myFormatter.format(value);
>>>> }
>>>> }
>>>>
>>>> package grep;
>>>>
>>>> import java.io.IOException;
>>>> import java.util.Iterator;
>>>> import java.util.regex.Matcher;
>>>> import java.util.regex.Pattern;
>>>>
>>>> import org.apache.hadoop.io.Text;
>>>> import org.apache.hadoop.mapred.JobConf;
>>>> import org.apache.hadoop.mapred.MapReduceBase;
>>>> import org.apache.hadoop.mapred.OutputCollector;
>>>> import org.apache.hadoop.mapred.Reducer;
>>>> import org.apache.hadoop.mapred.Reporter;
>>>>
>>>> public class GrepReducer extends MapReduceBase implements
>>>> Reducer<Text, MyWritable, Text, MyWritable> {
>>>>
>>>> private Pattern pattern;
>>>>
>>>> @Override
>>>> public void configure(JobConf job) {
>>>>  pattern = Pattern.compile(job.get("grep.regex"));
>>>> }
>>>>
>>>> public void reduce(Text key, Iterator<MyWritable> values,
>>>>  OutputCollector<Text, MyWritable> output, Reporter reporter)
>>>> throws IOException {
>>>>
>>>>  while (values.hasNext()) {
>>>> String text = (String) values.next().get();
>>>> Matcher matcher = pattern.matcher(text);
>>>>  while (matcher.find()) {
>>>> output.collect(key, new MyWritable(text));
>>>>  }
>>>> }
>>>> }
>>>> }
>>>>
>>>> package grep;
>>>>
>>>> import java.io.DataInput;
>>>> import java.io.DataOutput;
>>>> import java.io.IOException;
>>>> import java.lang.reflect.Array;
>>>> import java.util.HashMap;
>>>> import java.util.Map;
>>>>
>>>> import org.apache.hadoop.conf.Configurable;
>>>> import org.apache.hadoop.conf.Configuration;
>>>> import org.apache.hadoop.conf.Configured;
>>>> import org.apache.hadoop.io.UTF8;
>>>>  import org.apache.hadoop.io.Writable;
>>>> import org.apache.hadoop.io.WritableFactories;
>>>>
>>>> public class MyWritable implements Writable, Configurable {
>>>>
>>>> private Class declaredClass;
>>>>  private Object instance;
>>>> private Configuration conf;
>>>>
>>>> public MyWritable() {
>>>>  }
>>>>
>>>> public MyWritable(Object instance) {
>>>> set(instance);
>>>>  }
>>>>
>>>> public MyWritable(Class declaredClass, Object instance) {
>>>> this.declaredClass = declaredClass;
>>>>  this.instance = instance;
>>>> }
>>>>
>>>> /** Return the instance, or null if none. */
>>>>  public Object get() {
>>>> return instance;
>>>> }
>>>>
>>>>  /** Return the class this is meant to be. */
>>>> public Class getDeclaredClass() {
>>>> return declaredClass;
>>>>  }
>>>>
>>>> /** Reset the instance. */
>>>> public void set(Object instance) {
>>>>  this.declaredClass = instance.getClass();
>>>> this.instance = instance;
>>>> }
>>>>
>>>> public String toString() {
>>>> return "[ " + instance + " ]";
>>>> }
>>>>
>>>> public void readFields(DataInput in) throws IOException {
>>>> readObject(in, this, this.conf);
>>>>  }
>>>>
>>>> public void write(DataOutput out) throws IOException {
>>>> writeObject(out, instance, declaredClass, conf);
>>>>  }
>>>>
>>>> private static final Map<String, Class<?>> PRIMITIVE_NAMES =
new
>>>> HashMap<String, Class<?>>();
>>>>  static {
>>>> PRIMITIVE_NAMES.put("boolean", Boolean.TYPE);
>>>> PRIMITIVE_NAMES.put("byte", Byte.TYPE);
>>>>  PRIMITIVE_NAMES.put("char", Character.TYPE);
>>>> PRIMITIVE_NAMES.put("short", Short.TYPE);
>>>>  PRIMITIVE_NAMES.put("int", Integer.TYPE);
>>>> PRIMITIVE_NAMES.put("long", Long.TYPE);
>>>>  PRIMITIVE_NAMES.put("float", Float.TYPE);
>>>> PRIMITIVE_NAMES.put("double", Double.TYPE);
>>>>  PRIMITIVE_NAMES.put("void", Void.TYPE);
>>>> }
>>>>
>>>> private static class NullInstance extends Configured implements
>>>> Writable {
>>>>  private Class<?> declaredClass;
>>>>
>>>> public NullInstance() {
>>>> super(null);
>>>>  }
>>>>
>>>> public NullInstance(Class declaredClass, Configuration conf) {
>>>> super(conf);
>>>>  this.declaredClass = declaredClass;
>>>> }
>>>>
>>>> public void readFields(DataInput in) throws IOException {
>>>>  String className = UTF8.readString(in);
>>>> declaredClass = PRIMITIVE_NAMES.get(className);
>>>> if (declaredClass == null) {
>>>>  try {
>>>> declaredClass = getConf().getClassByName(className);
>>>> } catch (ClassNotFoundException e) {
>>>>  throw new RuntimeException(e.toString());
>>>> }
>>>> }
>>>>  }
>>>>
>>>> public void write(DataOutput out) throws IOException {
>>>> UTF8.writeString(out, declaredClass.getName());
>>>>  }
>>>> }
>>>>
>>>> /**
>>>>  * Write a {@link Writable}, {@link String}, primitive type, or an
>>>> array of
>>>>  * the preceding.
>>>>  */
>>>> public static void writeObject(DataOutput out, Object instance,
>>>>  Class declaredClass, Configuration conf) throws IOException {
>>>>
>>>> if (instance == null) { // null
>>>>  instance = new NullInstance(declaredClass, conf);
>>>> declaredClass = Writable.class;
>>>> }
>>>>
>>>> UTF8.writeString(out, declaredClass.getName()); // always write declared
>>>>
>>>> if (declaredClass.isArray()) { // array
>>>>  int length = Array.getLength(instance);
>>>> out.writeInt(length);
>>>> for (int i = 0; i < length; i++) {
>>>>  writeObject(out, Array.get(instance, i),
>>>> declaredClass.getComponentType(), conf);
>>>> }
>>>>
>>>> } else if (declaredClass == String.class) { // String
>>>> UTF8.writeString(out, (String) instance);
>>>>
>>>> } else if (declaredClass.isPrimitive()) { // primitive type
>>>>
>>>> if (declaredClass == Boolean.TYPE) { // boolean
>>>>  out.writeBoolean(((Boolean) instance).booleanValue());
>>>> } else if (declaredClass == Character.TYPE) { // char
>>>>  out.writeChar(((Character) instance).charValue());
>>>> } else if (declaredClass == Byte.TYPE) { // byte
>>>>  out.writeByte(((Byte) instance).byteValue());
>>>> } else if (declaredClass == Short.TYPE) { // short
>>>> out.writeShort(((Short) instance).shortValue());
>>>>  } else if (declaredClass == Integer.TYPE) { // int
>>>> out.writeInt(((Integer) instance).intValue());
>>>>  } else if (declaredClass == Long.TYPE) { // long
>>>> out.writeLong(((Long) instance).longValue());
>>>> } else if (declaredClass == Float.TYPE) { // float
>>>>  out.writeFloat(((Float) instance).floatValue());
>>>> } else if (declaredClass == Double.TYPE) { // double
>>>>  out.writeDouble(((Double) instance).doubleValue());
>>>> } else if (declaredClass == Void.TYPE) { // void
>>>>  } else {
>>>> throw new IllegalArgumentException("Not a primitive: "
>>>> + declaredClass);
>>>>  }
>>>> } else if (declaredClass.isEnum()) { // enum
>>>> UTF8.writeString(out, ((Enum) instance).name());
>>>>  } else if (Writable.class.isAssignableFrom(declaredClass)) { //
>>>> Writable
>>>> UTF8.writeString(out, instance.getClass().getName());
>>>>  ((Writable) instance).write(out);
>>>>
>>>> } else {
>>>> throw new IOException("Can't write: " + instance + " as "
>>>>  + declaredClass);
>>>> }
>>>> }
>>>>
>>>> /**
>>>>  * Read a {@link Writable}, {@link String}, primitive type, or an
>>>> array of
>>>>  * the preceding.
>>>>  */
>>>>  public static Object readObject(DataInput in, Configuration conf)
>>>> throws IOException {
>>>> return readObject(in, null, conf);
>>>>  }
>>>>
>>>> /**
>>>>  * Read a {@link Writable}, {@link String}, primitive type, or an array
>>>> of
>>>>  * the preceding.
>>>>  */
>>>> @SuppressWarnings("unchecked")
>>>>  public static Object readObject(DataInput in, MyWritable
>>>> objectWritable,
>>>> Configuration conf) throws IOException {
>>>>  String className = UTF8.readString(in);
>>>> Class<?> declaredClass = PRIMITIVE_NAMES.get(className);
>>>>  if (declaredClass == null) {
>>>> try {
>>>> declaredClass = conf.getClassByName(className);
>>>>  } catch (ClassNotFoundException e) {
>>>> throw new RuntimeException("readObject can't find class "
>>>>  + className, e);
>>>> }
>>>> }
>>>>
>>>> Object instance;
>>>>
>>>> if (declaredClass.isPrimitive()) { // primitive types
>>>>
>>>> if (declaredClass == Boolean.TYPE) { // boolean
>>>>  instance = Boolean.valueOf(in.readBoolean());
>>>> } else if (declaredClass == Character.TYPE) { // char
>>>>  instance = Character.valueOf(in.readChar());
>>>> } else if (declaredClass == Byte.TYPE) { // byte
>>>> instance = Byte.valueOf(in.readByte());
>>>>  } else if (declaredClass == Short.TYPE) { // short
>>>> instance = Short.valueOf(in.readShort());
>>>> } else if (declaredClass == Integer.TYPE) { // int
>>>>  instance = Integer.valueOf(in.readInt());
>>>> } else if (declaredClass == Long.TYPE) { // long
>>>> instance = Long.valueOf(in.readLong());
>>>>  } else if (declaredClass == Float.TYPE) { // float
>>>> instance = Float.valueOf(in.readFloat());
>>>> } else if (declaredClass == Double.TYPE) { // double
>>>>  instance = Double.valueOf(in.readDouble());
>>>> } else if (declaredClass == Void.TYPE) { // void
>>>> instance = null;
>>>>  } else {
>>>> throw new IllegalArgumentException("Not a primitive: "
>>>> + declaredClass);
>>>>  }
>>>>
>>>> } else if (declaredClass.isArray()) { // array
>>>> int length = in.readInt();
>>>>  instance = Array.newInstance(declaredClass.getComponentType(),
>>>> length);
>>>> for (int i = 0; i < length; i++) {
>>>>  Array.set(instance, i, readObject(in, conf));
>>>> }
>>>>
>>>> } else if (declaredClass == String.class) { // String
>>>>  instance = UTF8.readString(in);
>>>> } else if (declaredClass.isEnum()) { // enum
>>>> instance = Enum.valueOf((Class<? extends Enum>) declaredClass,
>>>>  UTF8.readString(in));
>>>> } else { // Writable
>>>> Class instanceClass = null;
>>>>  String str = "";
>>>> try {
>>>> str = UTF8.readString(in);
>>>>  instanceClass = conf.getClassByName(str);
>>>> } catch (ClassNotFoundException e) {
>>>> throw new RuntimeException(
>>>>  "readObject can't find class " + str, e);
>>>> }
>>>>
>>>> Writable writable = WritableFactories.newInstance(instanceClass,
>>>>  conf);
>>>> writable.readFields(in);
>>>> instance = writable;
>>>>
>>>> if (instanceClass == NullInstance.class) { // null
>>>> declaredClass = ((NullInstance) instance).declaredClass;
>>>>  instance = null;
>>>> }
>>>> }
>>>>
>>>> if (objectWritable != null) { // store values
>>>>  objectWritable.declaredClass = declaredClass;
>>>> objectWritable.instance = instance;
>>>> }
>>>>
>>>> return instance;
>>>>
>>>> }
>>>>
>>>> public void setConf(Configuration conf) {
>>>>  this.conf = conf;
>>>> }
>>>>
>>>> public Configuration getConf() {
>>>>  return this.conf;
>>>> }
>>>> }
>>>>
>>>>
>>>> --
>>>> *--
>>>> -- Felipe Oliveira Gutierrez
>>>> -- Felipe.o.Gutierrez@gmail.com
>>>> -- https://sites.google.com/site/lipe82/Home/diaadia*
>>>>
>>>
>>>
>>
>>
>> --
>> *--
>> -- Felipe Oliveira Gutierrez
>> -- Felipe.o.Gutierrez@gmail.com
>> -- https://sites.google.com/site/lipe82/Home/diaadia*
>>
>
>

Mime
View raw message