Return-Path: X-Original-To: apmail-hadoop-hdfs-user-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8506D10355 for ; Thu, 25 Jul 2013 19:19:04 +0000 (UTC) Received: (qmail 74325 invoked by uid 500); 25 Jul 2013 19:18:58 -0000 Delivered-To: apmail-hadoop-hdfs-user-archive@hadoop.apache.org Received: (qmail 74149 invoked by uid 500); 25 Jul 2013 19:18:58 -0000 Mailing-List: contact user-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@hadoop.apache.org Delivered-To: mailing list user@hadoop.apache.org Received: (qmail 74090 invoked by uid 99); 25 Jul 2013 19:18:57 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Jul 2013 19:18:57 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of felipe.o.gutierrez@gmail.com designates 209.85.214.181 as permitted sender) Received: from [209.85.214.181] (HELO mail-ob0-f181.google.com) (209.85.214.181) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Jul 2013 19:18:49 +0000 Received: by mail-ob0-f181.google.com with SMTP id dn14so2150289obc.40 for ; Thu, 25 Jul 2013 12:18:28 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=JiKhZ4NpQBjbuNid9ig6HRW9z1BfJt+wxoPCvYDbmDw=; b=YyVftyUDn9A7wxtIf6Rjimd0xGIikdHK1N2LxYTeUwaKU9YaRPZF5+x0SzmU9EH9BR Zva4fuWrNo2Xt1mg5OzIRD3ZptAVGr3u1nX/ZsCblDYQFh2G414gh/b2dQ4gteRtwlTR iq5O8gIzWhhHKWiSWhVx2ptRhHrmIXNsN+ooKmpmxnc5fInwDEADwGhvHjrZMm0idvs3 r9xLOOXZXI5ErRK+gcFP73OKhvxWmiLFBScVYMnz0wsXNwBUEsJajrN16tg1xvyLO3Ns hMVQK6zqnrOOL66XVeE5HRRFBPnO5LDP1qr4BaYPEGFYP7bzivWP4Oi37z2SVNKQ+Xcu VZvQ== MIME-Version: 1.0 X-Received: by 10.60.173.169 with SMTP id bl9mr44970515oec.51.1374779908628; Thu, 25 Jul 2013 12:18:28 -0700 (PDT) Received: by 10.76.173.10 with HTTP; Thu, 25 Jul 2013 12:18:28 -0700 (PDT) In-Reply-To: References: Date: Thu, 25 Jul 2013 16:18:28 -0300 Message-ID: Subject: Re: Change the output of Reduce function From: Felipe Gutierrez To: user@hadoop.apache.org Content-Type: multipart/alternative; boundary=089e011761c91926f604e25ae4d1 X-Virus-Checked: Checked by ClamAV on apache.org --089e011761c91926f604e25ae4d1 Content-Type: text/plain; charset=ISO-8859-1 Sorry, I think I didnt understand, Does NullWritable go to replate MyWritable? But this is may value. My key is a Text. Regards, Felipe On Thu, Jul 25, 2013 at 4:07 PM, Shahab Yunus wrote: > I think uou can use NullWritable as key. > > http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/NullWritable.html > > > Regards, > Shahab > > > On Thu, Jul 25, 2013 at 2:58 PM, Felipe Gutierrez < > felipe.o.gutierrez@gmail.com> wrote: > >> I did a MapReduce program to execute a Grep function. I know there is a >> Grep function at hadoop examples, but I want to make my Grep MapReduce to >> explain to other. >> My problem is that my out put shows the key/value. I want to show only >> the value, since I saved the line number at this value. Example: >> >> 00048 [ line 6298 : Jul 25 15:18:14 felipe kernel: [ 2168.644689] wlan0: >> associated ] >> >> Here is my code. Thanks, >> Felipe >> >> package grep; >> >> import java.io.File; >> import java.io.FileReader; >> import java.io.LineNumberReader; >> >> import org.apache.hadoop.fs.Path; >> import org.apache.hadoop.io.Text; >> import org.apache.hadoop.mapred.FileInputFormat; >> import org.apache.hadoop.mapred.FileOutputFormat; >> import org.apache.hadoop.mapred.JobClient; >> import org.apache.hadoop.mapred.JobConf; >> import org.apache.hadoop.mapred.TextInputFormat; >> import org.apache.hadoop.mapred.TextOutputFormat; >> >> public class Main { >> >> public static void main(String[] args) throws Exception { >> >> if (args == null || args.length != 3) { >> System.err.println("Usage: Main "); >> System.exit(-1); >> } >> >> JobConf conf = new JobConf(Main.class); >> conf.setJobName("grep"); >> >> String input = args[0]; >> String output = args[1]; >> String regex = args[2]; >> >> File arquivoLeitura = new File(input); >> LineNumberReader linhaLeitura = new LineNumberReader(new FileReader( >> arquivoLeitura)); >> linhaLeitura.skip(arquivoLeitura.length()); >> String lines = String.valueOf(linhaLeitura.getLineNumber() + 1); >> conf.set("grep.regex", regex); >> conf.set("grep.lines", lines); >> >> conf.setOutputKeyClass(Text.class); >> conf.setOutputValueClass(MyWritable.class); >> >> conf.setMapperClass(GrepMapper.class); >> conf.setCombinerClass(GrepReducer.class); >> conf.setReducerClass(GrepReducer.class); >> >> conf.setInputFormat(TextInputFormat.class); >> conf.setOutputFormat(TextOutputFormat.class); >> >> FileInputFormat.setInputPaths(conf, new Path(input)); >> FileOutputFormat.setOutputPath(conf, new Path(output)); >> >> JobClient.runJob(conf); >> } >> } >> >> package grep; >> >> import java.io.IOException; >> import java.text.DecimalFormat; >> >> import org.apache.hadoop.io.LongWritable; >> import org.apache.hadoop.io.Text; >> import org.apache.hadoop.mapred.JobConf; >> import org.apache.hadoop.mapred.MapReduceBase; >> import org.apache.hadoop.mapred.Mapper; >> import org.apache.hadoop.mapred.OutputCollector; >> import org.apache.hadoop.mapred.Reporter; >> >> public class GrepMapper extends MapReduceBase implements >> Mapper { >> >> private static long line = 1; >> private static long n = 0; >> private static long divisor = 1; >> private static long qtdLines = 0; >> private Text k = new Text(); >> >> public void map(LongWritable key, Text value, >> OutputCollector output, Reporter reporter) >> throws IOException { >> String str = value.toString(); >> MyWritable text = new MyWritable("line " + line + " : " + str); >> if ((line % divisor) == 0) { >> n++; >> } >> k.set(customFormat("00000", n)); >> output.collect(k, text); >> line++; >> } >> >> @Override >> public void configure(JobConf job) { >> qtdLines = Long.parseLong(job.get("grep.lines")); >> if (qtdLines <= 500) { >> divisor = 10; >> } else if (qtdLines <= 1000) { >> divisor = 20; >> } else if (qtdLines <= 1500) { >> divisor = 30; >> } else if (qtdLines <= 2000) { >> divisor = 40; >> } else if (qtdLines <= 2500) { >> divisor = 50; >> } else if (qtdLines <= 3000) { >> divisor = 60; >> } else if (qtdLines <= 3500) { >> divisor = 70; >> } else if (qtdLines <= 4000) { >> divisor = 80; >> } else if (qtdLines <= 4500) { >> divisor = 90; >> } else if (qtdLines <= 5000) { >> divisor = 100; >> } else if (qtdLines <= 5500) { >> divisor = 110; >> } else if (qtdLines <= 6000) { >> divisor = 120; >> } else if (qtdLines <= 6500) { >> divisor = 130; >> } else if (qtdLines <= 7000) { >> divisor = 140; >> } >> } >> >> static public String customFormat(String pattern, double value) { >> DecimalFormat myFormatter = new DecimalFormat(pattern); >> return myFormatter.format(value); >> } >> } >> >> package grep; >> >> import java.io.IOException; >> import java.util.Iterator; >> import java.util.regex.Matcher; >> import java.util.regex.Pattern; >> >> import org.apache.hadoop.io.Text; >> import org.apache.hadoop.mapred.JobConf; >> import org.apache.hadoop.mapred.MapReduceBase; >> import org.apache.hadoop.mapred.OutputCollector; >> import org.apache.hadoop.mapred.Reducer; >> import org.apache.hadoop.mapred.Reporter; >> >> public class GrepReducer extends MapReduceBase implements >> Reducer { >> >> private Pattern pattern; >> >> @Override >> public void configure(JobConf job) { >> pattern = Pattern.compile(job.get("grep.regex")); >> } >> >> public void reduce(Text key, Iterator values, >> OutputCollector output, Reporter reporter) >> throws IOException { >> >> while (values.hasNext()) { >> String text = (String) values.next().get(); >> Matcher matcher = pattern.matcher(text); >> while (matcher.find()) { >> output.collect(key, new MyWritable(text)); >> } >> } >> } >> } >> >> package grep; >> >> import java.io.DataInput; >> import java.io.DataOutput; >> import java.io.IOException; >> import java.lang.reflect.Array; >> import java.util.HashMap; >> import java.util.Map; >> >> import org.apache.hadoop.conf.Configurable; >> import org.apache.hadoop.conf.Configuration; >> import org.apache.hadoop.conf.Configured; >> import org.apache.hadoop.io.UTF8; >> import org.apache.hadoop.io.Writable; >> import org.apache.hadoop.io.WritableFactories; >> >> public class MyWritable implements Writable, Configurable { >> >> private Class declaredClass; >> private Object instance; >> private Configuration conf; >> >> public MyWritable() { >> } >> >> public MyWritable(Object instance) { >> set(instance); >> } >> >> public MyWritable(Class declaredClass, Object instance) { >> this.declaredClass = declaredClass; >> this.instance = instance; >> } >> >> /** Return the instance, or null if none. */ >> public Object get() { >> return instance; >> } >> >> /** Return the class this is meant to be. */ >> public Class getDeclaredClass() { >> return declaredClass; >> } >> >> /** Reset the instance. */ >> public void set(Object instance) { >> this.declaredClass = instance.getClass(); >> this.instance = instance; >> } >> >> public String toString() { >> return "[ " + instance + " ]"; >> } >> >> public void readFields(DataInput in) throws IOException { >> readObject(in, this, this.conf); >> } >> >> public void write(DataOutput out) throws IOException { >> writeObject(out, instance, declaredClass, conf); >> } >> >> private static final Map> PRIMITIVE_NAMES = new >> HashMap>(); >> static { >> PRIMITIVE_NAMES.put("boolean", Boolean.TYPE); >> PRIMITIVE_NAMES.put("byte", Byte.TYPE); >> PRIMITIVE_NAMES.put("char", Character.TYPE); >> PRIMITIVE_NAMES.put("short", Short.TYPE); >> PRIMITIVE_NAMES.put("int", Integer.TYPE); >> PRIMITIVE_NAMES.put("long", Long.TYPE); >> PRIMITIVE_NAMES.put("float", Float.TYPE); >> PRIMITIVE_NAMES.put("double", Double.TYPE); >> PRIMITIVE_NAMES.put("void", Void.TYPE); >> } >> >> private static class NullInstance extends Configured implements Writable { >> private Class declaredClass; >> >> public NullInstance() { >> super(null); >> } >> >> public NullInstance(Class declaredClass, Configuration conf) { >> super(conf); >> this.declaredClass = declaredClass; >> } >> >> public void readFields(DataInput in) throws IOException { >> String className = UTF8.readString(in); >> declaredClass = PRIMITIVE_NAMES.get(className); >> if (declaredClass == null) { >> try { >> declaredClass = getConf().getClassByName(className); >> } catch (ClassNotFoundException e) { >> throw new RuntimeException(e.toString()); >> } >> } >> } >> >> public void write(DataOutput out) throws IOException { >> UTF8.writeString(out, declaredClass.getName()); >> } >> } >> >> /** >> * Write a {@link Writable}, {@link String}, primitive type, or an array >> of >> * the preceding. >> */ >> public static void writeObject(DataOutput out, Object instance, >> Class declaredClass, Configuration conf) throws IOException { >> >> if (instance == null) { // null >> instance = new NullInstance(declaredClass, conf); >> declaredClass = Writable.class; >> } >> >> UTF8.writeString(out, declaredClass.getName()); // always write declared >> >> if (declaredClass.isArray()) { // array >> int length = Array.getLength(instance); >> out.writeInt(length); >> for (int i = 0; i < length; i++) { >> writeObject(out, Array.get(instance, i), >> declaredClass.getComponentType(), conf); >> } >> >> } else if (declaredClass == String.class) { // String >> UTF8.writeString(out, (String) instance); >> >> } else if (declaredClass.isPrimitive()) { // primitive type >> >> if (declaredClass == Boolean.TYPE) { // boolean >> out.writeBoolean(((Boolean) instance).booleanValue()); >> } else if (declaredClass == Character.TYPE) { // char >> out.writeChar(((Character) instance).charValue()); >> } else if (declaredClass == Byte.TYPE) { // byte >> out.writeByte(((Byte) instance).byteValue()); >> } else if (declaredClass == Short.TYPE) { // short >> out.writeShort(((Short) instance).shortValue()); >> } else if (declaredClass == Integer.TYPE) { // int >> out.writeInt(((Integer) instance).intValue()); >> } else if (declaredClass == Long.TYPE) { // long >> out.writeLong(((Long) instance).longValue()); >> } else if (declaredClass == Float.TYPE) { // float >> out.writeFloat(((Float) instance).floatValue()); >> } else if (declaredClass == Double.TYPE) { // double >> out.writeDouble(((Double) instance).doubleValue()); >> } else if (declaredClass == Void.TYPE) { // void >> } else { >> throw new IllegalArgumentException("Not a primitive: " >> + declaredClass); >> } >> } else if (declaredClass.isEnum()) { // enum >> UTF8.writeString(out, ((Enum) instance).name()); >> } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable >> UTF8.writeString(out, instance.getClass().getName()); >> ((Writable) instance).write(out); >> >> } else { >> throw new IOException("Can't write: " + instance + " as " >> + declaredClass); >> } >> } >> >> /** >> * Read a {@link Writable}, {@link String}, primitive type, or an array >> of >> * the preceding. >> */ >> public static Object readObject(DataInput in, Configuration conf) >> throws IOException { >> return readObject(in, null, conf); >> } >> >> /** >> * Read a {@link Writable}, {@link String}, primitive type, or an array of >> * the preceding. >> */ >> @SuppressWarnings("unchecked") >> public static Object readObject(DataInput in, MyWritable objectWritable, >> Configuration conf) throws IOException { >> String className = UTF8.readString(in); >> Class declaredClass = PRIMITIVE_NAMES.get(className); >> if (declaredClass == null) { >> try { >> declaredClass = conf.getClassByName(className); >> } catch (ClassNotFoundException e) { >> throw new RuntimeException("readObject can't find class " >> + className, e); >> } >> } >> >> Object instance; >> >> if (declaredClass.isPrimitive()) { // primitive types >> >> if (declaredClass == Boolean.TYPE) { // boolean >> instance = Boolean.valueOf(in.readBoolean()); >> } else if (declaredClass == Character.TYPE) { // char >> instance = Character.valueOf(in.readChar()); >> } else if (declaredClass == Byte.TYPE) { // byte >> instance = Byte.valueOf(in.readByte()); >> } else if (declaredClass == Short.TYPE) { // short >> instance = Short.valueOf(in.readShort()); >> } else if (declaredClass == Integer.TYPE) { // int >> instance = Integer.valueOf(in.readInt()); >> } else if (declaredClass == Long.TYPE) { // long >> instance = Long.valueOf(in.readLong()); >> } else if (declaredClass == Float.TYPE) { // float >> instance = Float.valueOf(in.readFloat()); >> } else if (declaredClass == Double.TYPE) { // double >> instance = Double.valueOf(in.readDouble()); >> } else if (declaredClass == Void.TYPE) { // void >> instance = null; >> } else { >> throw new IllegalArgumentException("Not a primitive: " >> + declaredClass); >> } >> >> } else if (declaredClass.isArray()) { // array >> int length = in.readInt(); >> instance = Array.newInstance(declaredClass.getComponentType(), >> length); >> for (int i = 0; i < length; i++) { >> Array.set(instance, i, readObject(in, conf)); >> } >> >> } else if (declaredClass == String.class) { // String >> instance = UTF8.readString(in); >> } else if (declaredClass.isEnum()) { // enum >> instance = Enum.valueOf((Class) declaredClass, >> UTF8.readString(in)); >> } else { // Writable >> Class instanceClass = null; >> String str = ""; >> try { >> str = UTF8.readString(in); >> instanceClass = conf.getClassByName(str); >> } catch (ClassNotFoundException e) { >> throw new RuntimeException( >> "readObject can't find class " + str, e); >> } >> >> Writable writable = WritableFactories.newInstance(instanceClass, >> conf); >> writable.readFields(in); >> instance = writable; >> >> if (instanceClass == NullInstance.class) { // null >> declaredClass = ((NullInstance) instance).declaredClass; >> instance = null; >> } >> } >> >> if (objectWritable != null) { // store values >> objectWritable.declaredClass = declaredClass; >> objectWritable.instance = instance; >> } >> >> return instance; >> >> } >> >> public void setConf(Configuration conf) { >> this.conf = conf; >> } >> >> public Configuration getConf() { >> return this.conf; >> } >> } >> >> >> -- >> *-- >> -- Felipe Oliveira Gutierrez >> -- Felipe.o.Gutierrez@gmail.com >> -- https://sites.google.com/site/lipe82/Home/diaadia* >> > > -- *-- -- Felipe Oliveira Gutierrez -- Felipe.o.Gutierrez@gmail.com -- https://sites.google.com/site/lipe82/Home/diaadia* --089e011761c91926f604e25ae4d1 Content-Type: text/html; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable
Sorry, I think I didnt understand,
Does NullWritable g= o to replate=A0MyWritable? But this is may value. My key is a Text.
Regards,
Felipe



On Thu, Jul 25, 2013 at 4:07 PM, Shahab = Yunus <shahab.yunus@gmail.com> wrote:
I think uou can use NullWritable as key. =A0


Regards,
Shahab

<= br>
On Thu, Jul 25, 2013 at 2:58 PM, Felipe Gutie= rrez <felipe.o.gutierrez@gmail.com> wrote:
I did a MapReduce prog= ram to execute a Grep function. I know there is a Grep function at hadoop e= xamples, but I want to make my Grep MapReduce to explain to other.
My problem is that my out put shows the key/value. I want to show only= the value, since I saved the line number at this value. Example:

00048 [ lin= e 6298 : Jul 25 15:18:14 felipe kernel: [ 2168.644689] wlan0: associate= d ]

Here is my code. Thanks,
Felipe

package grep;

impo= rt java.io.File;
import java.io.FileReader;
import java= .io.LineNumberReader;

import org.apache.hadoop.fs.= Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.m= apred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutput= Format;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop= .mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutp= utFormat;

public class Main {

public static void main(S= tring[] args) throws Exception {

if (args =3D=3D null || args.length !=3D 3) {<= /div>
System.err.println(&quo= t;Usage: Main <in> <out> <regex>");
System.exit(-1);
}

JobConf conf =3D new JobConf(Ma= in.class);
conf.set= JobName("grep");

String in= put =3D args[0];
St= ring output =3D args[1];
<= /span>String regex =3D args[2];

File arqu= ivoLeitura =3D new File(input);
LineNumberReader linhaLeitura =3D new LineNumberReader(new Fil= eReader(
arquivoLeitura));
linhaLeitura.skip(arqui= voLeitura.length());
String lines =3D String.valueOf(linhaLeitura.getLineNumber() + 1);
conf.set("grep.rege= x", regex);
co= nf.set("grep.lines", lines);

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass= (MyWritable.class);

conf.setMapperClass(GrepMapper.class);
conf.setCombinerClass(GrepReducer.class= );
conf.setReducerClass(Gre= pReducer.class);

conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutp= utFormat.class);

FileInputFormat.setInputPaths(conf, new Path(input));
FileOutputFormat.setOutp= utPath(conf, new Path(output));

JobClient.runJob(conf);
}
}

package grep;

import java.io.IOException;
import java.text.DecimalFormat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred= .JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoo= p.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

pub= lic class GrepMapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, MyWr= itable> {

private st= atic long line =3D 1;
private static long n =3D 0;
private static long divisor =3D 1;
private static long qtdLi= nes =3D 0;
private T= ext k =3D new Text();

public void map(LongWritable key, Text value,
OutputCollector<Text= , MyWritable> output, Reporter reporter)
throws IOException {
String str =3D value.toString();
MyWritable text =3D new = MyWritable("line " + line + " : " + str);
if ((line % divisor) =3D=3D 0) {=
n++;
}
k.set(customFormat("00000", n));
output.collect(k, text);=
line++;
= }

@Override
public void configure(JobConf job) = {
qtdLines =3D Long= .parseLong(job.get("grep.lines"));
if (qtdLines <=3D 500= ) {
divisor =3D 10= ;
} else if (qtdLin= es <=3D 1000) {
divisor =3D 20;
} else if (qtdLines <= =3D 1500) {
diviso= r =3D 30;
} else if (qtdLines <= =3D 2000) {
diviso= r =3D 40;
} else if= (qtdLines <=3D 2500) {
divisor =3D 50;
} else if (qtdLines <= =3D 3000) {
diviso= r =3D 60;
} else if (qtdLines <= =3D 3500) {
diviso= r =3D 70;
} else if= (qtdLines <=3D 4000) {
divisor =3D 80;
} else if (qtdLines <= =3D 4500) {
diviso= r =3D 90;
} else if (qtdLines <= =3D 5000) {
diviso= r =3D 100;
} else i= f (qtdLines <=3D 5500) {
divisor =3D 110;
<= div> } else if (qtdLines <= =3D 6000) {
diviso= r =3D 120;
} else if (qtdLines <= =3D 6500) {
diviso= r =3D 130;
} else i= f (qtdLines <=3D 7000) {
divisor =3D 140;
<= div> }
}

static public String customFormat(String pat= tern, double value) {
DecimalFormat myFormatte= r =3D new DecimalFormat(pattern);
return myFormatter.format(value);
}
}

package grep;

import java.io.IOException;
import java.util.Iterator;
import java.util.regex.Matcher;
import java.util.regex.Pat= tern;

import org.apache.hadoop.io.Text;
import org.= apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Map= ReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.h= adoop.mapred.Reporter;

public class GrepReducer ex= tends MapReduceBase implements
Reducer<Text, MyWritable, Text, MyWritable> {

private Pa= ttern pattern;

@Override
= public void configure(JobConf job) {
pattern =3D Pattern.comp= ile(job.get("grep.regex"));
}

public void reduce(Text key, Iterator<MyWritable> valu= es,
OutputCollector<Text= , MyWritable> output, Reporter reporter)
throws IOException {

while (values.hasNext())= {
String text =3D= (String) values.next().get();
Matcher matcher =3D pattern.matcher(text);
while (matcher.find()) = {
= output.collect(key, new MyWritable(text));
}
}
}
}

package grep;=

import java.io.DataInput;
import java.io.Data= Output;
import java.io.IOException;
import java.lang.re= flect.Array;
import java.util.HashMap;
import java.util= .Map;

import org.apache.hadoop.conf.Configurable;
i= mport org.apache.hadoop.conf.Configuration;
import org.apache.had= oop.conf.Configured;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io= .WritableFactories;

public class MyWritable implem= ents Writable, Configurable {

private Class declaredClass;
private Object instance;<= /div>
private Configuratio= n conf;

public MyWritable() {
}

public MyWritable(Object ins= tance) {
set(instan= ce);
}

public MyWritable(Class decl= aredClass, Object instance) {
this.declaredClass =3D declaredClass;
this.instance =3D instan= ce;
}

=
/** Return the inst= ance, or null if none. */
public Object get() {
return instance;
=
}

/** Return the clas= s this is meant to be. */
<= /span>public Class getDeclaredClass() {
return declaredClass;
}

/** Reset the instance. */
public void set(Objec= t instance) {
this.declaredClass =3D i= nstance.getClass();
this.instance =3D instance;
}

public Str= ing toString() {
re= turn "[ " + instance + " ]";
}

public voi= d readFields(DataInput in) throws IOException {
readObject(in, this, this.conf);
}

public void write(DataOutput= out) throws IOException {
= writeObject(out, instance, declaredClass, conf);
}

private static final Map<= String, Class<?>> PRIMITIVE_NAMES =3D new HashMap<String, Class= <?>>();
static {
PRIMITIVE_NAMES.put("boolean&q= uot;, Boolean.TYPE);
PRIMITIVE_NAMES.put("byte", Byte.TYPE);
PRIMITIVE_NAMES.put(&quo= t;char", Character.TYPE);
PRIMITIVE_NAMES.put("short", Short.TYPE);
PRIMITIVE_NAMES.put(&quo= t;int", Integer.TYPE);
= PRIMITIVE_NAMES.put("long", Long.TYPE);
PRIMITIVE_NAMES.put("flo= at", Float.TYPE);
PRIMITIVE_NAMES.put("double", Double.TYPE);
PRIMITIVE_NAMES.put("voi= d", Void.TYPE);
}

pr= ivate static class NullInstance extends Configured implements Writable {
private Class<?> d= eclaredClass;

public NullInstance() {
super(null);
}

public NullInstance(Class = declaredClass, Configuration conf) {
super(conf);
this.declaredClass =3D = declaredClass;
}

public= void readFields(DataInput in) throws IOException {
String className =3D UT= F8.readString(in);
declaredClass =3D PRIMITIVE_NAMES.get(className);
if (declaredClass =3D=3D null) {
try {
declaredClass =3D getConf().getC= lassByName(className);
<= /span>} catch (ClassNotFoundException e) {
throw new RuntimeExce= ption(e.toString());
}
}
}

public void write(DataOutp= ut out) throws IOException {
UTF8.writeString(out, declaredClass.getName());
}
}

/**
* Write a {@link Writable}, {@link String}, primitive type,= or an array of
* the preceding.
*/
public static void writeObject(DataOutput= out, Object instance,
Class declaredClass, Co= nfiguration conf) throws IOException {

if (instance =3D=3D null) { // null
instance =3D new NullIn= stance(declaredClass, conf);
declaredClass =3D Writable.class;
}

UTF8.writ= eString(out, declaredClass.getName()); // always write declared
<= br>
if (declaredCla= ss.isArray()) { // array
int length =3D Array.ge= tLength(instance);
out.writeInt(length);
for (int i =3D 0; i < length; i++) {
writeObject(out, Array= .get(instance, i),
declaredClass.getComponentType(), conf);
}

} else if= (declaredClass =3D=3D String.class) { // String
UTF8.writeString(out, (String) instance);

} else if= (declaredClass.isPrimitive()) { // primitive type

if (declaredClass =3D=3D Bo= olean.TYPE) { // boolean
out.writeBoolean(((Boo= lean) instance).booleanValue());
} else if (declaredClass =3D=3D Character.TYPE) { // char
out.writeChar(((Charac= ter) instance).charValue());
} else if (declaredClass =3D=3D Byte.TYPE) { // byte
out.writeByte(((Byte) insta= nce).byteValue());
} else if (declaredClass =3D=3D Short.TYPE) { // short
out.writeShort(((Short) instance).sh= ortValue());
} else if (declaredClas= s =3D=3D Integer.TYPE) { // int
out.writeInt(((Integer) instance).intValue());
} else if (declaredClass =3D= =3D Long.TYPE) { // long
= out.writeLong(((Long) instance).longValue());
} else if (declaredClass =3D=3D Float.T= YPE) { // float
out.writeFloat(((Float= ) instance).floatValue());
= } else if (declaredClass =3D=3D Double.TYPE) { // double
out.writeDouble(((Doub= le) instance).doubleValue());
} else if (declaredClass =3D=3D Void.TYPE) { // void
} else {
throw new IllegalArgumentException(&= quot;Not a primitive: "
+ declaredClass);
}
} else if (declaredClass.isEnum()) { // = enum
UTF8.writeStr= ing(out, ((Enum) instance).name());
} else if (Writable.clas= s.isAssignableFrom(declaredClass)) { // Writable
UTF8.writeString(out, instance.getClass().ge= tName());
((Writable) instance).w= rite(out);

= } else {
th= row new IOException("Can't write: " + instance + " as &q= uot;
+ declaredClass);
}
}

/**
* Read a {@link Writable= }, {@link String}, primitive type, or an array of
* the preceding.
*/
public static Object read= Object(DataInput in, Configuration conf)
throws IOException {
return readObject(in, null, conf);
}

/**
* Read a {@link Writable}, {@link String}, = primitive type, or an array of
* the preceding.
*/
@SuppressWarnings("unchecked")<= /div>
public static Object read= Object(DataInput in, MyWritable objectWritable,
Configuration conf) throws IOException {
String className =3D UTF= 8.readString(in);
C= lass<?> declaredClass =3D PRIMITIVE_NAMES.get(className);
if (declaredClass =3D=3D null= ) {
try {
declaredClass =3D conf.ge= tClassByName(className);
} catch (ClassNotFoundE= xception e) {
thr= ow new RuntimeException("readObject can't find class "
+ className, e);
}
}

Object instance;

if (decla= redClass.isPrimitive()) { // primitive types

if (declaredClass =3D=3D Boolean.= TYPE) { // boolean
instance =3D Boolean.v= alueOf(in.readBoolean());
= } else if (declaredClass =3D=3D Character.TYPE) { // char
instance =3D Character.valu= eOf(in.readChar());
} else if (declaredClass =3D=3D Byte.TYPE) { // byte
instance =3D Byte.valueOf(in.readByte= ());
} else if (declaredClas= s =3D=3D Short.TYPE) { // short
instance =3D Short.valueOf(in.readShort());
} else if (declaredClass =3D=3D In= teger.TYPE) { // int
instance =3D Integer.v= alueOf(in.readInt());
} else if (declaredClass =3D=3D Long.TYPE) { // long
instance =3D Long.valueOf(in.readLo= ng());
} else if (declaredClas= s =3D=3D Float.TYPE) { // float
instance =3D Float.valueOf(in.readFloat());
} else if (declaredClass =3D=3D Do= uble.TYPE) { // double
instance =3D Double.va= lueOf(in.readDouble());
<= /span>} else if (declaredClass =3D=3D Void.TYPE) { // void
instance =3D null;
} else {
throw new IllegalArgumentExcept= ion("Not a primitive: "
+ declaredClass);
}

<= div> } else if (declaredClass.= isArray()) { // array
int length =3D in.readInt();
instance =3D Array.newI= nstance(declaredClass.getComponentType(),
length);
for (int i =3D 0; i < length; i++) {
Array.set(instance, i,= readObject(in, conf));
<= /span>}

} else if (declaredClass =3D=3D String.class) { // String
instance =3D UTF8.readS= tring(in);
} else i= f (declaredClass.isEnum()) { // enum
instance =3D Enum.valueOf((Class<? extends Enum>) = declaredClass,
UTF8.readString(in));=
} else { // Writab= le
Class instanceC= lass =3D null;
String str =3D "&q= uot;;
try {
<= div> str =3D UTF8.readString= (in);
instanceClass =3D conf= .getClassByName(str);
} catch (ClassNotFoundException e) {
throw new RuntimeException(
"readObject can= 't find class " + str, e);
}

Writable writable =3D WritableFactories.newInstance(instan= ceClass,
conf);
writable.readFields(in);
instance =3D writable;

if (inst= anceClass =3D=3D NullInstance.class) { // null
declaredClass =3D ((NullInstance) instance).d= eclaredClass;
instance =3D null;
}
}

if (objectWritable !=3D null) { // store= values
objectWritable.declared= Class =3D declaredClass;
= objectWritable.instance =3D instance;
}

return in= stance;

}

= public void setConf(Configuration conf) {
this.conf =3D conf;
}

public Configuration getCon= f() {
return this.conf;
<= div> }
}
<= span>


--
--
-- Felipe Oliveira Gutierrez
-- Felipe= .o.Gutierrez@gmail.com
-- https://sites.google.com/site/lipe82/= Home/diaadia




--
= --
-- Felipe Oliveira Gutierrez
-- Felipe.o.Gutierrez@gmail.com
-- <= a href=3D"https://sites.google.com/site/lipe82/Home/diaadia" target=3D"_bla= nk">https://sites.google.com/site/lipe82/Home/diaadia
--089e011761c91926f604e25ae4d1--