hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Felipe Gutierrez <felipe.o.gutier...@gmail.com>
Subject Change the output of Reduce function
Date Thu, 25 Jul 2013 18:58:45 GMT
I did a MapReduce program to execute a Grep function. I know there is a
Grep function at hadoop examples, but I want to make my Grep MapReduce to
explain to other.
My problem is that my out put shows the key/value. I want to show only the
value, since I saved the line number at this value. Example:

00048 [ line 6298 : Jul 25 15:18:14 felipe kernel: [ 2168.644689] wlan0:
associated ]

Here is my code. Thanks,
Felipe

package grep;

import java.io.File;
import java.io.FileReader;
import java.io.LineNumberReader;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class Main {

public static void main(String[] args) throws Exception {

if (args == null || args.length != 3) {
System.err.println("Usage: Main <in> <out> <regex>");
System.exit(-1);
}

JobConf conf = new JobConf(Main.class);
conf.setJobName("grep");

String input = args[0];
String output = args[1];
String regex = args[2];

File arquivoLeitura = new File(input);
LineNumberReader linhaLeitura = new LineNumberReader(new FileReader(
arquivoLeitura));
linhaLeitura.skip(arquivoLeitura.length());
String lines = String.valueOf(linhaLeitura.getLineNumber() + 1);
conf.set("grep.regex", regex);
conf.set("grep.lines", lines);

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(MyWritable.class);

conf.setMapperClass(GrepMapper.class);
conf.setCombinerClass(GrepReducer.class);
conf.setReducerClass(GrepReducer.class);

conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path(input));
FileOutputFormat.setOutputPath(conf, new Path(output));

JobClient.runJob(conf);
}
}

package grep;

import java.io.IOException;
import java.text.DecimalFormat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class GrepMapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, MyWritable> {

private static long line = 1;
private static long n = 0;
private static long divisor = 1;
private static long qtdLines = 0;
private Text k = new Text();

public void map(LongWritable key, Text value,
OutputCollector<Text, MyWritable> output, Reporter reporter)
throws IOException {
String str = value.toString();
MyWritable text = new MyWritable("line " + line + " : " + str);
if ((line % divisor) == 0) {
n++;
}
k.set(customFormat("00000", n));
output.collect(k, text);
line++;
}

@Override
public void configure(JobConf job) {
qtdLines = Long.parseLong(job.get("grep.lines"));
if (qtdLines <= 500) {
divisor = 10;
} else if (qtdLines <= 1000) {
divisor = 20;
} else if (qtdLines <= 1500) {
divisor = 30;
} else if (qtdLines <= 2000) {
divisor = 40;
} else if (qtdLines <= 2500) {
divisor = 50;
} else if (qtdLines <= 3000) {
divisor = 60;
} else if (qtdLines <= 3500) {
divisor = 70;
} else if (qtdLines <= 4000) {
divisor = 80;
} else if (qtdLines <= 4500) {
divisor = 90;
} else if (qtdLines <= 5000) {
divisor = 100;
} else if (qtdLines <= 5500) {
divisor = 110;
} else if (qtdLines <= 6000) {
divisor = 120;
} else if (qtdLines <= 6500) {
divisor = 130;
} else if (qtdLines <= 7000) {
divisor = 140;
}
}

static public String customFormat(String pattern, double value) {
DecimalFormat myFormatter = new DecimalFormat(pattern);
return myFormatter.format(value);
}
}

package grep;

import java.io.IOException;
import java.util.Iterator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class GrepReducer extends MapReduceBase implements
Reducer<Text, MyWritable, Text, MyWritable> {

private Pattern pattern;

@Override
public void configure(JobConf job) {
pattern = Pattern.compile(job.get("grep.regex"));
}

public void reduce(Text key, Iterator<MyWritable> values,
OutputCollector<Text, MyWritable> output, Reporter reporter)
throws IOException {

while (values.hasNext()) {
String text = (String) values.next().get();
Matcher matcher = pattern.matcher(text);
while (matcher.find()) {
output.collect(key, new MyWritable(text));
}
}
}
}

package grep;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;

public class MyWritable implements Writable, Configurable {

private Class declaredClass;
private Object instance;
private Configuration conf;

public MyWritable() {
}

public MyWritable(Object instance) {
set(instance);
}

public MyWritable(Class declaredClass, Object instance) {
this.declaredClass = declaredClass;
this.instance = instance;
}

/** Return the instance, or null if none. */
public Object get() {
return instance;
}

/** Return the class this is meant to be. */
public Class getDeclaredClass() {
return declaredClass;
}

/** Reset the instance. */
public void set(Object instance) {
this.declaredClass = instance.getClass();
this.instance = instance;
}

public String toString() {
return "[ " + instance + " ]";
}

public void readFields(DataInput in) throws IOException {
readObject(in, this, this.conf);
}

public void write(DataOutput out) throws IOException {
writeObject(out, instance, declaredClass, conf);
}

private static final Map<String, Class<?>> PRIMITIVE_NAMES = new
HashMap<String, Class<?>>();
static {
PRIMITIVE_NAMES.put("boolean", Boolean.TYPE);
PRIMITIVE_NAMES.put("byte", Byte.TYPE);
PRIMITIVE_NAMES.put("char", Character.TYPE);
PRIMITIVE_NAMES.put("short", Short.TYPE);
PRIMITIVE_NAMES.put("int", Integer.TYPE);
PRIMITIVE_NAMES.put("long", Long.TYPE);
PRIMITIVE_NAMES.put("float", Float.TYPE);
PRIMITIVE_NAMES.put("double", Double.TYPE);
PRIMITIVE_NAMES.put("void", Void.TYPE);
}

private static class NullInstance extends Configured implements Writable {
private Class<?> declaredClass;

public NullInstance() {
super(null);
}

public NullInstance(Class declaredClass, Configuration conf) {
super(conf);
this.declaredClass = declaredClass;
}

public void readFields(DataInput in) throws IOException {
String className = UTF8.readString(in);
declaredClass = PRIMITIVE_NAMES.get(className);
if (declaredClass == null) {
try {
declaredClass = getConf().getClassByName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e.toString());
}
}
}

public void write(DataOutput out) throws IOException {
UTF8.writeString(out, declaredClass.getName());
}
}

/**
 * Write a {@link Writable}, {@link String}, primitive type, or an array of
 * the preceding.
 */
public static void writeObject(DataOutput out, Object instance,
Class declaredClass, Configuration conf) throws IOException {

if (instance == null) { // null
instance = new NullInstance(declaredClass, conf);
declaredClass = Writable.class;
}

UTF8.writeString(out, declaredClass.getName()); // always write declared

if (declaredClass.isArray()) { // array
int length = Array.getLength(instance);
out.writeInt(length);
for (int i = 0; i < length; i++) {
writeObject(out, Array.get(instance, i),
declaredClass.getComponentType(), conf);
}

} else if (declaredClass == String.class) { // String
UTF8.writeString(out, (String) instance);

} else if (declaredClass.isPrimitive()) { // primitive type

if (declaredClass == Boolean.TYPE) { // boolean
out.writeBoolean(((Boolean) instance).booleanValue());
} else if (declaredClass == Character.TYPE) { // char
out.writeChar(((Character) instance).charValue());
} else if (declaredClass == Byte.TYPE) { // byte
out.writeByte(((Byte) instance).byteValue());
} else if (declaredClass == Short.TYPE) { // short
out.writeShort(((Short) instance).shortValue());
} else if (declaredClass == Integer.TYPE) { // int
out.writeInt(((Integer) instance).intValue());
} else if (declaredClass == Long.TYPE) { // long
out.writeLong(((Long) instance).longValue());
} else if (declaredClass == Float.TYPE) { // float
out.writeFloat(((Float) instance).floatValue());
} else if (declaredClass == Double.TYPE) { // double
out.writeDouble(((Double) instance).doubleValue());
} else if (declaredClass == Void.TYPE) { // void
} else {
throw new IllegalArgumentException("Not a primitive: "
+ declaredClass);
}
} else if (declaredClass.isEnum()) { // enum
UTF8.writeString(out, ((Enum) instance).name());
} else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable
UTF8.writeString(out, instance.getClass().getName());
((Writable) instance).write(out);

} else {
throw new IOException("Can't write: " + instance + " as "
+ declaredClass);
}
}

/**
 * Read a {@link Writable}, {@link String}, primitive type, or an array of
 * the preceding.
 */
public static Object readObject(DataInput in, Configuration conf)
throws IOException {
return readObject(in, null, conf);
}

/**
 * Read a {@link Writable}, {@link String}, primitive type, or an array of
 * the preceding.
 */
@SuppressWarnings("unchecked")
public static Object readObject(DataInput in, MyWritable objectWritable,
Configuration conf) throws IOException {
String className = UTF8.readString(in);
Class<?> declaredClass = PRIMITIVE_NAMES.get(className);
if (declaredClass == null) {
try {
declaredClass = conf.getClassByName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException("readObject can't find class "
+ className, e);
}
}

Object instance;

if (declaredClass.isPrimitive()) { // primitive types

if (declaredClass == Boolean.TYPE) { // boolean
instance = Boolean.valueOf(in.readBoolean());
} else if (declaredClass == Character.TYPE) { // char
instance = Character.valueOf(in.readChar());
} else if (declaredClass == Byte.TYPE) { // byte
instance = Byte.valueOf(in.readByte());
} else if (declaredClass == Short.TYPE) { // short
instance = Short.valueOf(in.readShort());
} else if (declaredClass == Integer.TYPE) { // int
instance = Integer.valueOf(in.readInt());
} else if (declaredClass == Long.TYPE) { // long
instance = Long.valueOf(in.readLong());
} else if (declaredClass == Float.TYPE) { // float
instance = Float.valueOf(in.readFloat());
} else if (declaredClass == Double.TYPE) { // double
instance = Double.valueOf(in.readDouble());
} else if (declaredClass == Void.TYPE) { // void
instance = null;
} else {
throw new IllegalArgumentException("Not a primitive: "
+ declaredClass);
}

} else if (declaredClass.isArray()) { // array
int length = in.readInt();
instance = Array.newInstance(declaredClass.getComponentType(),
length);
for (int i = 0; i < length; i++) {
Array.set(instance, i, readObject(in, conf));
}

} else if (declaredClass == String.class) { // String
instance = UTF8.readString(in);
} else if (declaredClass.isEnum()) { // enum
instance = Enum.valueOf((Class<? extends Enum>) declaredClass,
UTF8.readString(in));
} else { // Writable
Class instanceClass = null;
String str = "";
try {
str = UTF8.readString(in);
instanceClass = conf.getClassByName(str);
} catch (ClassNotFoundException e) {
throw new RuntimeException(
"readObject can't find class " + str, e);
}

Writable writable = WritableFactories.newInstance(instanceClass,
conf);
writable.readFields(in);
instance = writable;

if (instanceClass == NullInstance.class) { // null
declaredClass = ((NullInstance) instance).declaredClass;
instance = null;
}
}

if (objectWritable != null) { // store values
objectWritable.declaredClass = declaredClass;
objectWritable.instance = instance;
}

return instance;

}

public void setConf(Configuration conf) {
this.conf = conf;
}

public Configuration getConf() {
return this.conf;
}
}


-- 
*--
-- Felipe Oliveira Gutierrez
-- Felipe.o.Gutierrez@gmail.com
-- https://sites.google.com/site/lipe82/Home/diaadia*

Mime
View raw message