hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Harsh J <ha...@cloudera.com>
Subject Re: Comparing input hdfs file to a distributed cache files
Date Fri, 20 Jul 2012 16:24:23 GMT
SS,

Is your job not progressing at all (i.e. continues to be at 0-progress
for hours), or does it fail after zero progress?

I'd try adding logging to various important points of the map/reduce
functions and see whats taking it so long, or if its getting stuck at
something. Logs are observable for each task attempt via the JT Web
UI/etc..

On Fri, Jul 20, 2012 at 9:43 PM, Shanu Sushmita
<shanu.sushmita16@gmail.com> wrote:
> Hi,
>
> I am trying to solve a problem where I need to computed frequencies of words
> occurring in a file1 from file 2.
>
> For example:
> text in file1:
> hadoop
> user
> hello
> world
>
> and text in file2 is:
> hadoop
> user
> hello
> world
> hadoop
> hadoop
> hadoop
> user
> world
> world
> world
> hadoop
> user
> hello
>
> so the output should be:
> hadoop 5
> user 3
> hello 2
> world 4
>
> I read that distributed caching is a good way to do such jobs.Size of my
> files are:
> File1 = 17GB
> File2 = 3 MB
>
> And here is my code:
>
>
>
>
> import org.apache.hadoop.fs.*;
> import org.apache.hadoop.conf.*;
> import org.apache.hadoop.io.*;
> import org.apache.hadoop.mapred.*;
> import org.apache.hadoop.util.*;
> import java.util.*;
> import java.io.*;
> import org.apache.hadoop.filecache.*;
> import java.net.*;
>
> public class RepeatFreqTest
> {
>
>
> public static class Map extends MapReduceBase implements
> Mapper<LongWritable, Text, Text, IntWritable> {
>             private final static IntWritable one = new IntWritable(1);
>             private Text word = new Text();
>              private HashSet<String> dyads = new HashSet<String>();
>
>
>   public void configure(JobConf conf)
>   {
>
>
>  Path[] cacheFiles = new Path[0];
>    try {
>
>
>   cacheFiles = DistributedCache.getLocalCacheFiles(conf);
>   } // end of try
>
>   catch (IOException ioe) {
>             System.err.println("Caught exception while getting cached files:
> " + StringUtils.stringifyException(ioe));
>    }// end of catch
>
>   for (Path dyadPath : cacheFiles)
>   {
>
>         loadDyads(dyadPath);
>   } // end of for cachePath
>
>
>   } // end of configure
>
> private void loadDyads(Path dyadPath)
> {
> try
> {
>                 BufferedReader wordReader = new BufferedReader(new
> FileReader(dyadPath.toString()));
>                 String line = null;
>                 while ((line = wordReader.readLine()) != null) {
>        dyads.add(line);
>
>       } // end of while
>
>       wordReader.close();
>
> }// end of try
>  catch (IOException ioe) {
>       System.err.println("IOException reading from distributed cache");
>       } // end of catch
>
> }// end of loadDyads()
>
>
>
>   /* actual map() method, etc go here */
>
>
>                         @Override
>              public void map(LongWritable key, Text value,
> OutputCollector<Text, IntWritable> output, Reporter reporter) throws
> IOException
>              {
>
>
>                         // dyad, ut and year from all dyads (big file)
> file!!!
>
>
>                         String line = value.toString();
>                         String[] tokens = line.split("\\|");
>                         String ut1 = tokens[0].trim();
>                         String dyad1 = tokens[1].trim();
>                         String year1 = tokens[2].trim();
>                         int y1 = Integer.parseInt(year1);
>
>
>                         // dyad, ut and year from sample dyads file (sample
> file stored in the memory)!!!
>
>
>                         Iterator it = dyads.iterator();
>                         while(it.hasNext())
>                         {
>                                 //Text word = new Text();
>                                 String setline = it.next().toString();
>
>                                 String[] tokens2 = setline.split("\\|");
>                                 String ut2 = tokens2[0].trim();
>                                 String dyad2 = tokens2[1].trim();
>                                 String year2 = tokens2[2].trim();
>                                 int y2 = Integer.parseInt(year2);
>
>
>
>
>                                 if(dyad1.equalsIgnoreCase(dyad2))
>                                 {
>                                         if(!(ut1.equalsIgnoreCase(ut2)))
>                                         {
>                                                 if(y1<=y2)
>                                                 {
>
>                                                         word.set(setline);
>                                                         output.collect(word,
> one);
>                                                 }
>
>                                         } // end of if ut1!=ut2
>
>                                 } //
>
>
>                         }// end of while
>
>
> } // end of override map
> } // end of big Map class
>
>
> public static class Reduce extends MapReduceBase implements Reducer<Text,
> IntWritable, Text, IntWritable> {
>
>  @Override
>              public void reduce(Text key, Iterator<IntWritable> values,
> OutputCollector<Text, IntWritable> output, Reporter reporter) throws
> IOException {
>                int sum = 0;
>                while (values.hasNext()) {
>                  sum += values.next().get();
>
>                } // end of while
>                output.collect(key, new IntWritable(sum));
>              } // end of override reduce
>            } // end of Big Reduce
>
>
> public static void main(String[] args) throws Exception {
>
> JobConf conf = new JobConf(RepeatFreqTest.class);
> conf.setJobName("Repeat");
>
> conf.setOutputKeyClass(Text.class);
> conf.setOutputValueClass(IntWritable.class);
>
> conf.setMapperClass(Map.class);
> conf.setCombinerClass(Reduce.class);
> conf.setReducerClass(Reduce.class);
>
> conf.setInputFormat(TextInputFormat.class);
> conf.setOutputFormat(TextOutputFormat.class);
>
> FileInputFormat.setInputPaths(conf, new Path(args[0]));
> FileOutputFormat.setOutputPath(conf, new Path(args[1]));
>
>
>
>  DistributedCache.addCacheFile(new
> Path("/user/ss/cacheFiles/File1.txt").toUri(), conf);
>
>  JobClient.runJob(conf);
>
> }// end of main
>
> } // end of class
>
>
> And I put my File1.txt and File2.txt in hdfs as follows:
>
> $HADOOP_HOME/bin/hadoop fs -mkdir input
> $HADOOP_HOME/bin/hadoop fs -mkdir cacheFiles
> $HADOOP_HOME/bin/hadoop fs -put /u/home/File2.txt input
> $HADOOP_HOME/bin/hadoop fs -put /u/home/File1.txt cacheFiles
>
> My problem is that my code compiles fine, but it would just not proceed from
> map 0% reduce 0% stage.
>
> What I am doing wrong?
>
> Any suggestion would be of great help.
>
> Best,
> SS



-- 
Harsh J

Mime
View raw message