hadoop-hdfs-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Felipe Gutierrez <felipe.o.gutier...@gmail.com>
Subject Re: Change the output of Reduce function
Date Thu, 25 Jul 2013 19:18:28 GMT
Sorry, I think I didnt understand,
Does NullWritable go to replate MyWritable? But this is may value. My key
is a Text.
Regards,
Felipe



On Thu, Jul 25, 2013 at 4:07 PM, Shahab Yunus <shahab.yunus@gmail.com>wrote:

> I think uou can use NullWritable as key.
>
> http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/NullWritable.html
>
>
> Regards,
> Shahab
>
>
> On Thu, Jul 25, 2013 at 2:58 PM, Felipe Gutierrez <
> felipe.o.gutierrez@gmail.com> wrote:
>
>> I did a MapReduce program to execute a Grep function. I know there is a
>> Grep function at hadoop examples, but I want to make my Grep MapReduce to
>> explain to other.
>> My problem is that my out put shows the key/value. I want to show only
>> the value, since I saved the line number at this value. Example:
>>
>> 00048 [ line 6298 : Jul 25 15:18:14 felipe kernel: [ 2168.644689] wlan0:
>> associated ]
>>
>> Here is my code. Thanks,
>> Felipe
>>
>> package grep;
>>
>> import java.io.File;
>> import java.io.FileReader;
>> import java.io.LineNumberReader;
>>
>> import org.apache.hadoop.fs.Path;
>> import org.apache.hadoop.io.Text;
>> import org.apache.hadoop.mapred.FileInputFormat;
>> import org.apache.hadoop.mapred.FileOutputFormat;
>> import org.apache.hadoop.mapred.JobClient;
>> import org.apache.hadoop.mapred.JobConf;
>> import org.apache.hadoop.mapred.TextInputFormat;
>> import org.apache.hadoop.mapred.TextOutputFormat;
>>
>> public class Main {
>>
>>  public static void main(String[] args) throws Exception {
>>
>> if (args == null || args.length != 3) {
>>  System.err.println("Usage: Main <in> <out> <regex>");
>> System.exit(-1);
>>  }
>>
>> JobConf conf = new JobConf(Main.class);
>> conf.setJobName("grep");
>>
>> String input = args[0];
>> String output = args[1];
>> String regex = args[2];
>>
>> File arquivoLeitura = new File(input);
>> LineNumberReader linhaLeitura = new LineNumberReader(new FileReader(
>>  arquivoLeitura));
>> linhaLeitura.skip(arquivoLeitura.length());
>> String lines = String.valueOf(linhaLeitura.getLineNumber() + 1);
>>  conf.set("grep.regex", regex);
>> conf.set("grep.lines", lines);
>>
>> conf.setOutputKeyClass(Text.class);
>>  conf.setOutputValueClass(MyWritable.class);
>>
>> conf.setMapperClass(GrepMapper.class);
>> conf.setCombinerClass(GrepReducer.class);
>>  conf.setReducerClass(GrepReducer.class);
>>
>> conf.setInputFormat(TextInputFormat.class);
>>  conf.setOutputFormat(TextOutputFormat.class);
>>
>> FileInputFormat.setInputPaths(conf, new Path(input));
>>  FileOutputFormat.setOutputPath(conf, new Path(output));
>>
>> JobClient.runJob(conf);
>> }
>> }
>>
>> package grep;
>>
>> import java.io.IOException;
>> import java.text.DecimalFormat;
>>
>> import org.apache.hadoop.io.LongWritable;
>> import org.apache.hadoop.io.Text;
>> import org.apache.hadoop.mapred.JobConf;
>> import org.apache.hadoop.mapred.MapReduceBase;
>> import org.apache.hadoop.mapred.Mapper;
>> import org.apache.hadoop.mapred.OutputCollector;
>> import org.apache.hadoop.mapred.Reporter;
>>
>> public class GrepMapper extends MapReduceBase implements
>> Mapper<LongWritable, Text, Text, MyWritable> {
>>
>> private static long line = 1;
>> private static long n = 0;
>> private static long divisor = 1;
>>  private static long qtdLines = 0;
>> private Text k = new Text();
>>
>> public void map(LongWritable key, Text value,
>>  OutputCollector<Text, MyWritable> output, Reporter reporter)
>> throws IOException {
>> String str = value.toString();
>>  MyWritable text = new MyWritable("line " + line + " : " + str);
>> if ((line % divisor) == 0) {
>>  n++;
>> }
>> k.set(customFormat("00000", n));
>>  output.collect(k, text);
>> line++;
>> }
>>
>>  @Override
>> public void configure(JobConf job) {
>> qtdLines = Long.parseLong(job.get("grep.lines"));
>>  if (qtdLines <= 500) {
>> divisor = 10;
>> } else if (qtdLines <= 1000) {
>>  divisor = 20;
>> } else if (qtdLines <= 1500) {
>> divisor = 30;
>>  } else if (qtdLines <= 2000) {
>> divisor = 40;
>> } else if (qtdLines <= 2500) {
>>  divisor = 50;
>> } else if (qtdLines <= 3000) {
>> divisor = 60;
>>  } else if (qtdLines <= 3500) {
>> divisor = 70;
>> } else if (qtdLines <= 4000) {
>>  divisor = 80;
>> } else if (qtdLines <= 4500) {
>> divisor = 90;
>>  } else if (qtdLines <= 5000) {
>> divisor = 100;
>> } else if (qtdLines <= 5500) {
>>  divisor = 110;
>> } else if (qtdLines <= 6000) {
>> divisor = 120;
>>  } else if (qtdLines <= 6500) {
>> divisor = 130;
>> } else if (qtdLines <= 7000) {
>>  divisor = 140;
>> }
>> }
>>
>> static public String customFormat(String pattern, double value) {
>>  DecimalFormat myFormatter = new DecimalFormat(pattern);
>> return myFormatter.format(value);
>> }
>> }
>>
>> package grep;
>>
>> import java.io.IOException;
>> import java.util.Iterator;
>> import java.util.regex.Matcher;
>> import java.util.regex.Pattern;
>>
>> import org.apache.hadoop.io.Text;
>> import org.apache.hadoop.mapred.JobConf;
>> import org.apache.hadoop.mapred.MapReduceBase;
>> import org.apache.hadoop.mapred.OutputCollector;
>> import org.apache.hadoop.mapred.Reducer;
>> import org.apache.hadoop.mapred.Reporter;
>>
>> public class GrepReducer extends MapReduceBase implements
>> Reducer<Text, MyWritable, Text, MyWritable> {
>>
>> private Pattern pattern;
>>
>> @Override
>> public void configure(JobConf job) {
>>  pattern = Pattern.compile(job.get("grep.regex"));
>> }
>>
>> public void reduce(Text key, Iterator<MyWritable> values,
>>  OutputCollector<Text, MyWritable> output, Reporter reporter)
>> throws IOException {
>>
>>  while (values.hasNext()) {
>> String text = (String) values.next().get();
>> Matcher matcher = pattern.matcher(text);
>>  while (matcher.find()) {
>> output.collect(key, new MyWritable(text));
>>  }
>> }
>> }
>> }
>>
>> package grep;
>>
>> import java.io.DataInput;
>> import java.io.DataOutput;
>> import java.io.IOException;
>> import java.lang.reflect.Array;
>> import java.util.HashMap;
>> import java.util.Map;
>>
>> import org.apache.hadoop.conf.Configurable;
>> import org.apache.hadoop.conf.Configuration;
>> import org.apache.hadoop.conf.Configured;
>> import org.apache.hadoop.io.UTF8;
>>  import org.apache.hadoop.io.Writable;
>> import org.apache.hadoop.io.WritableFactories;
>>
>> public class MyWritable implements Writable, Configurable {
>>
>> private Class declaredClass;
>>  private Object instance;
>> private Configuration conf;
>>
>> public MyWritable() {
>>  }
>>
>> public MyWritable(Object instance) {
>> set(instance);
>>  }
>>
>> public MyWritable(Class declaredClass, Object instance) {
>> this.declaredClass = declaredClass;
>>  this.instance = instance;
>> }
>>
>> /** Return the instance, or null if none. */
>>  public Object get() {
>> return instance;
>> }
>>
>>  /** Return the class this is meant to be. */
>> public Class getDeclaredClass() {
>> return declaredClass;
>>  }
>>
>> /** Reset the instance. */
>> public void set(Object instance) {
>>  this.declaredClass = instance.getClass();
>> this.instance = instance;
>> }
>>
>> public String toString() {
>> return "[ " + instance + " ]";
>> }
>>
>> public void readFields(DataInput in) throws IOException {
>> readObject(in, this, this.conf);
>>  }
>>
>> public void write(DataOutput out) throws IOException {
>> writeObject(out, instance, declaredClass, conf);
>>  }
>>
>> private static final Map<String, Class<?>> PRIMITIVE_NAMES = new
>> HashMap<String, Class<?>>();
>>  static {
>> PRIMITIVE_NAMES.put("boolean", Boolean.TYPE);
>> PRIMITIVE_NAMES.put("byte", Byte.TYPE);
>>  PRIMITIVE_NAMES.put("char", Character.TYPE);
>> PRIMITIVE_NAMES.put("short", Short.TYPE);
>>  PRIMITIVE_NAMES.put("int", Integer.TYPE);
>> PRIMITIVE_NAMES.put("long", Long.TYPE);
>>  PRIMITIVE_NAMES.put("float", Float.TYPE);
>> PRIMITIVE_NAMES.put("double", Double.TYPE);
>>  PRIMITIVE_NAMES.put("void", Void.TYPE);
>> }
>>
>> private static class NullInstance extends Configured implements Writable {
>>  private Class<?> declaredClass;
>>
>> public NullInstance() {
>> super(null);
>>  }
>>
>> public NullInstance(Class declaredClass, Configuration conf) {
>> super(conf);
>>  this.declaredClass = declaredClass;
>> }
>>
>> public void readFields(DataInput in) throws IOException {
>>  String className = UTF8.readString(in);
>> declaredClass = PRIMITIVE_NAMES.get(className);
>> if (declaredClass == null) {
>>  try {
>> declaredClass = getConf().getClassByName(className);
>> } catch (ClassNotFoundException e) {
>>  throw new RuntimeException(e.toString());
>> }
>> }
>>  }
>>
>> public void write(DataOutput out) throws IOException {
>> UTF8.writeString(out, declaredClass.getName());
>>  }
>> }
>>
>> /**
>>  * Write a {@link Writable}, {@link String}, primitive type, or an array
>> of
>>  * the preceding.
>>  */
>> public static void writeObject(DataOutput out, Object instance,
>>  Class declaredClass, Configuration conf) throws IOException {
>>
>> if (instance == null) { // null
>>  instance = new NullInstance(declaredClass, conf);
>> declaredClass = Writable.class;
>> }
>>
>> UTF8.writeString(out, declaredClass.getName()); // always write declared
>>
>> if (declaredClass.isArray()) { // array
>>  int length = Array.getLength(instance);
>> out.writeInt(length);
>> for (int i = 0; i < length; i++) {
>>  writeObject(out, Array.get(instance, i),
>> declaredClass.getComponentType(), conf);
>> }
>>
>> } else if (declaredClass == String.class) { // String
>> UTF8.writeString(out, (String) instance);
>>
>> } else if (declaredClass.isPrimitive()) { // primitive type
>>
>> if (declaredClass == Boolean.TYPE) { // boolean
>>  out.writeBoolean(((Boolean) instance).booleanValue());
>> } else if (declaredClass == Character.TYPE) { // char
>>  out.writeChar(((Character) instance).charValue());
>> } else if (declaredClass == Byte.TYPE) { // byte
>>  out.writeByte(((Byte) instance).byteValue());
>> } else if (declaredClass == Short.TYPE) { // short
>> out.writeShort(((Short) instance).shortValue());
>>  } else if (declaredClass == Integer.TYPE) { // int
>> out.writeInt(((Integer) instance).intValue());
>>  } else if (declaredClass == Long.TYPE) { // long
>> out.writeLong(((Long) instance).longValue());
>> } else if (declaredClass == Float.TYPE) { // float
>>  out.writeFloat(((Float) instance).floatValue());
>> } else if (declaredClass == Double.TYPE) { // double
>>  out.writeDouble(((Double) instance).doubleValue());
>> } else if (declaredClass == Void.TYPE) { // void
>>  } else {
>> throw new IllegalArgumentException("Not a primitive: "
>> + declaredClass);
>>  }
>> } else if (declaredClass.isEnum()) { // enum
>> UTF8.writeString(out, ((Enum) instance).name());
>>  } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable
>> UTF8.writeString(out, instance.getClass().getName());
>>  ((Writable) instance).write(out);
>>
>> } else {
>> throw new IOException("Can't write: " + instance + " as "
>>  + declaredClass);
>> }
>> }
>>
>> /**
>>  * Read a {@link Writable}, {@link String}, primitive type, or an array
>> of
>>  * the preceding.
>>  */
>>  public static Object readObject(DataInput in, Configuration conf)
>> throws IOException {
>> return readObject(in, null, conf);
>>  }
>>
>> /**
>>  * Read a {@link Writable}, {@link String}, primitive type, or an array of
>>  * the preceding.
>>  */
>> @SuppressWarnings("unchecked")
>>  public static Object readObject(DataInput in, MyWritable objectWritable,
>> Configuration conf) throws IOException {
>>  String className = UTF8.readString(in);
>> Class<?> declaredClass = PRIMITIVE_NAMES.get(className);
>>  if (declaredClass == null) {
>> try {
>> declaredClass = conf.getClassByName(className);
>>  } catch (ClassNotFoundException e) {
>> throw new RuntimeException("readObject can't find class "
>>  + className, e);
>> }
>> }
>>
>> Object instance;
>>
>> if (declaredClass.isPrimitive()) { // primitive types
>>
>> if (declaredClass == Boolean.TYPE) { // boolean
>>  instance = Boolean.valueOf(in.readBoolean());
>> } else if (declaredClass == Character.TYPE) { // char
>>  instance = Character.valueOf(in.readChar());
>> } else if (declaredClass == Byte.TYPE) { // byte
>> instance = Byte.valueOf(in.readByte());
>>  } else if (declaredClass == Short.TYPE) { // short
>> instance = Short.valueOf(in.readShort());
>> } else if (declaredClass == Integer.TYPE) { // int
>>  instance = Integer.valueOf(in.readInt());
>> } else if (declaredClass == Long.TYPE) { // long
>> instance = Long.valueOf(in.readLong());
>>  } else if (declaredClass == Float.TYPE) { // float
>> instance = Float.valueOf(in.readFloat());
>> } else if (declaredClass == Double.TYPE) { // double
>>  instance = Double.valueOf(in.readDouble());
>> } else if (declaredClass == Void.TYPE) { // void
>> instance = null;
>>  } else {
>> throw new IllegalArgumentException("Not a primitive: "
>> + declaredClass);
>>  }
>>
>> } else if (declaredClass.isArray()) { // array
>> int length = in.readInt();
>>  instance = Array.newInstance(declaredClass.getComponentType(),
>> length);
>> for (int i = 0; i < length; i++) {
>>  Array.set(instance, i, readObject(in, conf));
>> }
>>
>> } else if (declaredClass == String.class) { // String
>>  instance = UTF8.readString(in);
>> } else if (declaredClass.isEnum()) { // enum
>> instance = Enum.valueOf((Class<? extends Enum>) declaredClass,
>>  UTF8.readString(in));
>> } else { // Writable
>> Class instanceClass = null;
>>  String str = "";
>> try {
>> str = UTF8.readString(in);
>>  instanceClass = conf.getClassByName(str);
>> } catch (ClassNotFoundException e) {
>> throw new RuntimeException(
>>  "readObject can't find class " + str, e);
>> }
>>
>> Writable writable = WritableFactories.newInstance(instanceClass,
>>  conf);
>> writable.readFields(in);
>> instance = writable;
>>
>> if (instanceClass == NullInstance.class) { // null
>> declaredClass = ((NullInstance) instance).declaredClass;
>>  instance = null;
>> }
>> }
>>
>> if (objectWritable != null) { // store values
>>  objectWritable.declaredClass = declaredClass;
>> objectWritable.instance = instance;
>> }
>>
>> return instance;
>>
>> }
>>
>> public void setConf(Configuration conf) {
>>  this.conf = conf;
>> }
>>
>> public Configuration getConf() {
>>  return this.conf;
>> }
>> }
>>
>>
>> --
>> *--
>> -- Felipe Oliveira Gutierrez
>> -- Felipe.o.Gutierrez@gmail.com
>> -- https://sites.google.com/site/lipe82/Home/diaadia*
>>
>
>


-- 
*--
-- Felipe Oliveira Gutierrez
-- Felipe.o.Gutierrez@gmail.com
-- https://sites.google.com/site/lipe82/Home/diaadia*

Mime
View raw message