hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tri Doan <trid...@k-state.edu>
Subject how reduce to combine with all element same key in pair <key, value>
Date Fri, 26 Nov 2010 06:44:21 GMT
Friday 26 Nov 2010

 i have data (single  text file ) as:  ( each character is considered to 1 node/ page and
number is probability of the page on left
 A 0.25 B C  D
 B 0.25 A D

 i try to write program that implement process in 3 stags: 
map stage will break up into pairs <"B" ,"0.25 A" >, <"C" ,"0.25 A" >,<"D"
,"0.25 A" >,  <"A" ,"0.25 B" >, <"D" ,"0.25 B" >

Combiner stage will do pre reduce task for all pair that have same key, so the only <"D","0.25
A"> and <"D","0.25 B"> will be processed by extract numeric value and do some computation,
but all pair will be output back into reverse order such as

<"B" ,"0.25 A" > will be output <"A" ,"B" >,  <"C" ,"0.25 A" > will be <"A"
,"C" >,
<"D" ,"0.25 A" > will be <"A" ,"D" >,   <"A" ,"0.25 B" > will be <"B"
,"A" > and  <"D" ,"0.25 B" > <"B" ,"D" >

Reduce stage will combine all pair having same key to contruct again original form .
the input for reduce function are the output from combiner and  the expected result will be
A   B  C  D
B  A  D

I see that only pairs having key close to each other can be grouped. since the output from
combiner is
<"B","A">, <"A","B">, <"A","C">,<"A","D">,<"B","D">  then i
can get only

it means pair <"B","A">, <"B","D"> can be grouped by its key. i could not understand
why and how to fix this problem. 
Could you help me to fix it? thank in advance

The following is my program

// this process implements step 1 of project to extract only inside <title> and <text>
by description
//Date Sat 20 nov 2010 

import java.io.BufferedReader;
import java.lang.Math;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class kiemtra {

 public static int noDoc = 4; // number of documents	 public static int noDoc = 0; // number
of documents
 public static Hashtable<String, Integer> noNode = new Hashtable<String,Integer>
 public static Hashtable<String, Double> tempPageRank = new Hashtable<String,Double>
	  public static class Map extends MapReduceBase implements Mapper<LongWritable, Text,
Text, Text> 
			    private Text id = new Text();
			    private Text value = new Text();
			    public void map(LongWritable key, Text value, OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
			        String line = value.toString();String[] temp; String delimiter = "\\s+"; temp =
			        for(int i =2; i < temp.length ; i++) 
			        	  value.set(temp[1]+" "+temp[0]); id.set(temp[i]);output.collect(id,value);
				    	   System.out.println(temp[i]+">"+temp[1]+" "+temp[0]);  
	  public static class Combiner extends MapReduceBase implements Reducer<Text, Text, Text,

		    public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text>
output, Reporter reporter) throws IOException {
		        Text id = new Text(); Text value = new Text(); String delimiter = "\\s+"; String[]
temp; String link="";
		        while (values.hasNext()) 
		        {  String s = values.next().toString(); temp=s.split(delimiter);	           
		          // reverse pair <key, value> after some updating new value for temp[0];
link=temp[0]+" "+key.toString();
		           System.out.println(key.toString()+" has "+s+" will convert to "+temp[1]+" "+link);
		           id.set(temp[1]); value.set(link); output.collect(id, value);
		         } }}
	 public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text,
		 public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text>
output, Reporter reporter) throws IOException {
			        Text text = new Text(); String link=""; String delimiter = "\\s+"; String[]temp;
			        while (values.hasNext()) 
			        {  link=link+ values.next().toString();  }
			    // compute R'[i] where i is node then add to link
			        text.set(link); output.collect(key, text);
			    } }
	public static void main(String[] args) throws Exception {
	JobConf conf = new JobConf(tri.class);
	 FileSystem.get(conf).delete(new Path(args[1]), true);
	 FileInputFormat.setInputPaths(conf, new Path(args[0]));
	 FileOutputFormat.setOutputPath(conf, new Path(args[1])); 

	 //   delete the output directory if it exists already
	FileSystem.get(conf).delete(new Path(args[1]), true);

	} }

best regard

Tri Doan
1429 Laramie Apt 3, Manhattan
KS 66502

View raw message