hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shahab Yunus <shahab.yu...@gmail.com>
Subject Re: Change the output of Reduce function
Date Thu, 25 Jul 2013 19:07:15 GMT
I think uou can use NullWritable as key.
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/NullWritable.html


Regards,
Shahab


On Thu, Jul 25, 2013 at 2:58 PM, Felipe Gutierrez <
felipe.o.gutierrez@gmail.com> wrote:

> I did a MapReduce program to execute a Grep function. I know there is a
> Grep function at hadoop examples, but I want to make my Grep MapReduce to
> explain to other.
> My problem is that my out put shows the key/value. I want to show only the
> value, since I saved the line number at this value. Example:
>
> 00048 [ line 6298 : Jul 25 15:18:14 felipe kernel: [ 2168.644689] wlan0:
> associated ]
>
> Here is my code. Thanks,
> Felipe
>
> package grep;
>
> import java.io.File;
> import java.io.FileReader;
> import java.io.LineNumberReader;
>
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.FileInputFormat;
> import org.apache.hadoop.mapred.FileOutputFormat;
> import org.apache.hadoop.mapred.JobClient;
> import org.apache.hadoop.mapred.JobConf;
> import org.apache.hadoop.mapred.TextInputFormat;
> import org.apache.hadoop.mapred.TextOutputFormat;
>
> public class Main {
>
>  public static void main(String[] args) throws Exception {
>
> if (args == null || args.length != 3) {
>  System.err.println("Usage: Main <in> <out> <regex>");
> System.exit(-1);
>  }
>
> JobConf conf = new JobConf(Main.class);
> conf.setJobName("grep");
>
> String input = args[0];
> String output = args[1];
> String regex = args[2];
>
> File arquivoLeitura = new File(input);
> LineNumberReader linhaLeitura = new LineNumberReader(new FileReader(
>  arquivoLeitura));
> linhaLeitura.skip(arquivoLeitura.length());
> String lines = String.valueOf(linhaLeitura.getLineNumber() + 1);
>  conf.set("grep.regex", regex);
> conf.set("grep.lines", lines);
>
> conf.setOutputKeyClass(Text.class);
>  conf.setOutputValueClass(MyWritable.class);
>
> conf.setMapperClass(GrepMapper.class);
> conf.setCombinerClass(GrepReducer.class);
>  conf.setReducerClass(GrepReducer.class);
>
> conf.setInputFormat(TextInputFormat.class);
>  conf.setOutputFormat(TextOutputFormat.class);
>
> FileInputFormat.setInputPaths(conf, new Path(input));
>  FileOutputFormat.setOutputPath(conf, new Path(output));
>
> JobClient.runJob(conf);
> }
> }
>
> package grep;
>
> import java.io.IOException;
> import java.text.DecimalFormat;
>
> import org.apache.hadoop.io.LongWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.JobConf;
> import org.apache.hadoop.mapred.MapReduceBase;
> import org.apache.hadoop.mapred.Mapper;
> import org.apache.hadoop.mapred.OutputCollector;
> import org.apache.hadoop.mapred.Reporter;
>
> public class GrepMapper extends MapReduceBase implements
> Mapper<LongWritable, Text, Text, MyWritable> {
>
> private static long line = 1;
> private static long n = 0;
> private static long divisor = 1;
>  private static long qtdLines = 0;
> private Text k = new Text();
>
> public void map(LongWritable key, Text value,
>  OutputCollector<Text, MyWritable> output, Reporter reporter)
> throws IOException {
> String str = value.toString();
>  MyWritable text = new MyWritable("line " + line + " : " + str);
> if ((line % divisor) == 0) {
>  n++;
> }
> k.set(customFormat("00000", n));
>  output.collect(k, text);
> line++;
> }
>
>  @Override
> public void configure(JobConf job) {
> qtdLines = Long.parseLong(job.get("grep.lines"));
>  if (qtdLines <= 500) {
> divisor = 10;
> } else if (qtdLines <= 1000) {
>  divisor = 20;
> } else if (qtdLines <= 1500) {
> divisor = 30;
>  } else if (qtdLines <= 2000) {
> divisor = 40;
> } else if (qtdLines <= 2500) {
>  divisor = 50;
> } else if (qtdLines <= 3000) {
> divisor = 60;
>  } else if (qtdLines <= 3500) {
> divisor = 70;
> } else if (qtdLines <= 4000) {
>  divisor = 80;
> } else if (qtdLines <= 4500) {
> divisor = 90;
>  } else if (qtdLines <= 5000) {
> divisor = 100;
> } else if (qtdLines <= 5500) {
>  divisor = 110;
> } else if (qtdLines <= 6000) {
> divisor = 120;
>  } else if (qtdLines <= 6500) {
> divisor = 130;
> } else if (qtdLines <= 7000) {
>  divisor = 140;
> }
> }
>
> static public String customFormat(String pattern, double value) {
>  DecimalFormat myFormatter = new DecimalFormat(pattern);
> return myFormatter.format(value);
> }
> }
>
> package grep;
>
> import java.io.IOException;
> import java.util.Iterator;
> import java.util.regex.Matcher;
> import java.util.regex.Pattern;
>
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.JobConf;
> import org.apache.hadoop.mapred.MapReduceBase;
> import org.apache.hadoop.mapred.OutputCollector;
> import org.apache.hadoop.mapred.Reducer;
> import org.apache.hadoop.mapred.Reporter;
>
> public class GrepReducer extends MapReduceBase implements
> Reducer<Text, MyWritable, Text, MyWritable> {
>
> private Pattern pattern;
>
> @Override
> public void configure(JobConf job) {
>  pattern = Pattern.compile(job.get("grep.regex"));
> }
>
> public void reduce(Text key, Iterator<MyWritable> values,
>  OutputCollector<Text, MyWritable> output, Reporter reporter)
> throws IOException {
>
>  while (values.hasNext()) {
> String text = (String) values.next().get();
> Matcher matcher = pattern.matcher(text);
>  while (matcher.find()) {
> output.collect(key, new MyWritable(text));
>  }
> }
> }
> }
>
> package grep;
>
> import java.io.DataInput;
> import java.io.DataOutput;
> import java.io.IOException;
> import java.lang.reflect.Array;
> import java.util.HashMap;
> import java.util.Map;
>
> import org.apache.hadoop.conf.Configurable;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.conf.Configured;
> import org.apache.hadoop.io.UTF8;
> import org.apache.hadoop.io.Writable;
> import org.apache.hadoop.io.WritableFactories;
>
> public class MyWritable implements Writable, Configurable {
>
> private Class declaredClass;
>  private Object instance;
> private Configuration conf;
>
> public MyWritable() {
>  }
>
> public MyWritable(Object instance) {
> set(instance);
>  }
>
> public MyWritable(Class declaredClass, Object instance) {
> this.declaredClass = declaredClass;
>  this.instance = instance;
> }
>
> /** Return the instance, or null if none. */
>  public Object get() {
> return instance;
> }
>
>  /** Return the class this is meant to be. */
> public Class getDeclaredClass() {
> return declaredClass;
>  }
>
> /** Reset the instance. */
> public void set(Object instance) {
>  this.declaredClass = instance.getClass();
> this.instance = instance;
> }
>
> public String toString() {
> return "[ " + instance + " ]";
> }
>
> public void readFields(DataInput in) throws IOException {
> readObject(in, this, this.conf);
>  }
>
> public void write(DataOutput out) throws IOException {
> writeObject(out, instance, declaredClass, conf);
>  }
>
> private static final Map<String, Class<?>> PRIMITIVE_NAMES = new
> HashMap<String, Class<?>>();
>  static {
> PRIMITIVE_NAMES.put("boolean", Boolean.TYPE);
> PRIMITIVE_NAMES.put("byte", Byte.TYPE);
>  PRIMITIVE_NAMES.put("char", Character.TYPE);
> PRIMITIVE_NAMES.put("short", Short.TYPE);
>  PRIMITIVE_NAMES.put("int", Integer.TYPE);
> PRIMITIVE_NAMES.put("long", Long.TYPE);
>  PRIMITIVE_NAMES.put("float", Float.TYPE);
> PRIMITIVE_NAMES.put("double", Double.TYPE);
>  PRIMITIVE_NAMES.put("void", Void.TYPE);
> }
>
> private static class NullInstance extends Configured implements Writable {
>  private Class<?> declaredClass;
>
> public NullInstance() {
> super(null);
>  }
>
> public NullInstance(Class declaredClass, Configuration conf) {
> super(conf);
>  this.declaredClass = declaredClass;
> }
>
> public void readFields(DataInput in) throws IOException {
>  String className = UTF8.readString(in);
> declaredClass = PRIMITIVE_NAMES.get(className);
> if (declaredClass == null) {
>  try {
> declaredClass = getConf().getClassByName(className);
> } catch (ClassNotFoundException e) {
>  throw new RuntimeException(e.toString());
> }
> }
>  }
>
> public void write(DataOutput out) throws IOException {
> UTF8.writeString(out, declaredClass.getName());
>  }
> }
>
> /**
>  * Write a {@link Writable}, {@link String}, primitive type, or an array of
>  * the preceding.
>  */
> public static void writeObject(DataOutput out, Object instance,
>  Class declaredClass, Configuration conf) throws IOException {
>
> if (instance == null) { // null
>  instance = new NullInstance(declaredClass, conf);
> declaredClass = Writable.class;
> }
>
> UTF8.writeString(out, declaredClass.getName()); // always write declared
>
> if (declaredClass.isArray()) { // array
>  int length = Array.getLength(instance);
> out.writeInt(length);
> for (int i = 0; i < length; i++) {
>  writeObject(out, Array.get(instance, i),
> declaredClass.getComponentType(), conf);
> }
>
> } else if (declaredClass == String.class) { // String
> UTF8.writeString(out, (String) instance);
>
> } else if (declaredClass.isPrimitive()) { // primitive type
>
> if (declaredClass == Boolean.TYPE) { // boolean
>  out.writeBoolean(((Boolean) instance).booleanValue());
> } else if (declaredClass == Character.TYPE) { // char
>  out.writeChar(((Character) instance).charValue());
> } else if (declaredClass == Byte.TYPE) { // byte
>  out.writeByte(((Byte) instance).byteValue());
> } else if (declaredClass == Short.TYPE) { // short
> out.writeShort(((Short) instance).shortValue());
>  } else if (declaredClass == Integer.TYPE) { // int
> out.writeInt(((Integer) instance).intValue());
>  } else if (declaredClass == Long.TYPE) { // long
> out.writeLong(((Long) instance).longValue());
> } else if (declaredClass == Float.TYPE) { // float
>  out.writeFloat(((Float) instance).floatValue());
> } else if (declaredClass == Double.TYPE) { // double
>  out.writeDouble(((Double) instance).doubleValue());
> } else if (declaredClass == Void.TYPE) { // void
>  } else {
> throw new IllegalArgumentException("Not a primitive: "
> + declaredClass);
>  }
> } else if (declaredClass.isEnum()) { // enum
> UTF8.writeString(out, ((Enum) instance).name());
>  } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable
> UTF8.writeString(out, instance.getClass().getName());
>  ((Writable) instance).write(out);
>
> } else {
> throw new IOException("Can't write: " + instance + " as "
>  + declaredClass);
> }
> }
>
> /**
>  * Read a {@link Writable}, {@link String}, primitive type, or an array of
>  * the preceding.
>  */
>  public static Object readObject(DataInput in, Configuration conf)
> throws IOException {
> return readObject(in, null, conf);
>  }
>
> /**
>  * Read a {@link Writable}, {@link String}, primitive type, or an array of
>  * the preceding.
>  */
> @SuppressWarnings("unchecked")
>  public static Object readObject(DataInput in, MyWritable objectWritable,
> Configuration conf) throws IOException {
>  String className = UTF8.readString(in);
> Class<?> declaredClass = PRIMITIVE_NAMES.get(className);
>  if (declaredClass == null) {
> try {
> declaredClass = conf.getClassByName(className);
>  } catch (ClassNotFoundException e) {
> throw new RuntimeException("readObject can't find class "
>  + className, e);
> }
> }
>
> Object instance;
>
> if (declaredClass.isPrimitive()) { // primitive types
>
> if (declaredClass == Boolean.TYPE) { // boolean
>  instance = Boolean.valueOf(in.readBoolean());
> } else if (declaredClass == Character.TYPE) { // char
>  instance = Character.valueOf(in.readChar());
> } else if (declaredClass == Byte.TYPE) { // byte
> instance = Byte.valueOf(in.readByte());
>  } else if (declaredClass == Short.TYPE) { // short
> instance = Short.valueOf(in.readShort());
> } else if (declaredClass == Integer.TYPE) { // int
>  instance = Integer.valueOf(in.readInt());
> } else if (declaredClass == Long.TYPE) { // long
> instance = Long.valueOf(in.readLong());
>  } else if (declaredClass == Float.TYPE) { // float
> instance = Float.valueOf(in.readFloat());
> } else if (declaredClass == Double.TYPE) { // double
>  instance = Double.valueOf(in.readDouble());
> } else if (declaredClass == Void.TYPE) { // void
> instance = null;
>  } else {
> throw new IllegalArgumentException("Not a primitive: "
> + declaredClass);
>  }
>
> } else if (declaredClass.isArray()) { // array
> int length = in.readInt();
>  instance = Array.newInstance(declaredClass.getComponentType(),
> length);
> for (int i = 0; i < length; i++) {
>  Array.set(instance, i, readObject(in, conf));
> }
>
> } else if (declaredClass == String.class) { // String
>  instance = UTF8.readString(in);
> } else if (declaredClass.isEnum()) { // enum
> instance = Enum.valueOf((Class<? extends Enum>) declaredClass,
>  UTF8.readString(in));
> } else { // Writable
> Class instanceClass = null;
>  String str = "";
> try {
> str = UTF8.readString(in);
>  instanceClass = conf.getClassByName(str);
> } catch (ClassNotFoundException e) {
> throw new RuntimeException(
>  "readObject can't find class " + str, e);
> }
>
> Writable writable = WritableFactories.newInstance(instanceClass,
>  conf);
> writable.readFields(in);
> instance = writable;
>
> if (instanceClass == NullInstance.class) { // null
> declaredClass = ((NullInstance) instance).declaredClass;
>  instance = null;
> }
> }
>
> if (objectWritable != null) { // store values
>  objectWritable.declaredClass = declaredClass;
> objectWritable.instance = instance;
> }
>
> return instance;
>
> }
>
> public void setConf(Configuration conf) {
>  this.conf = conf;
> }
>
> public Configuration getConf() {
>  return this.conf;
> }
> }
>
>
> --
> *--
> -- Felipe Oliveira Gutierrez
> -- Felipe.o.Gutierrez@gmail.com
> -- https://sites.google.com/site/lipe82/Home/diaadia*
>

Mime
View raw message