hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Udaya Lakshmi <udaya...@gmail.com>
Subject Re: Example for using DistributedCache class
Date Wed, 03 Feb 2010 18:31:52 GMT
Hi Nick,
  I am not able to start the following job. I have the file that has to be
passed to distributedcache in the local filesystem of the task tracker.

 Can you tell me if I am missing something?

import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.filecache.*;

import java.io.*;
import java.util.*;
import java.text.SimpleDateFormat;
import java.net.*;

public class Test extends Configured
{



  public static class MapClass extends MapReduceBase implements
Mapper<LongWritable,Text,Text,Text>
  {

   private FileSystem fs;
   private Path[] localFiles;
   private String str;
   public void configure(JobConf job)
   {
     try{
       fs = FileSystem.getLocal(new Configuration());
       localFiles = DistributedCache.getLocalCacheFiles(job);
        }
     catch(IOException e){
       System.out.println("Exception while getting cached files");
     }//catch(IOException e){

   }//public void configure(JobConf job)

    public void map(LongWritable Key,Text value,OutputCollector<Text,Text>
output,Reporter reporter) throws IOException
    {
     BufferedReader readBuffer = new BufferedReader(new
FileReader(localFiles[0].toString()));
     str = readBuffer.readLine();
     output.collect(new Text(str),new Text(str));
    }//public void map(LongWritable Key,Text
value,OutputCollector<Text,Text> output,Reporter reporter) throws
IOException

    public void close() throws IOException
    {
      fs.close();
    }//public void close() throws IOException


}//public static class MapClass extends MapReduceBase implements Mapper<>



  public static class ReduceClass extends MapReduceBase implements
Reducer<Text,Text,Text,Text>
  {
    public void reduce(Text key,Iterator<Text>
values,OutputCollector<Text,Text> output,Reporter reporter) throws
IOException
    {
    }//public void reduce(Text key,Iterator<Text>
values,OutputCollector<Text,Text> output,Reporter reporter) throws
IOException
  }//public static class ReduceClass extends MapReduceBase implements
Reducer<Text,Text,Text,Text>

  public static void main(String[] args)
  {
    JobConf conf = new JobConf(Test.class);
    JobClient client = new JobClient();
    conf.setMapperClass(Test.MapClass.class);
    conf.setReducerClass(IdentityReducer.class);
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(Text.class);
    conf.setInputPath(new Path("input"));
    conf.setOutputPath(new Path("output"));
    try{
      DistributedCache.addCacheFile(new
URI("/home/udaya/hadoop-0.18.3/file_to_distribute"), conf);
     }
     catch(URISyntaxException e)
     {}
     try{
     JobClient.runJob(conf);
       }
     catch(Exception e)
     {
      System.out.println("Error starting the job");
     }
  }//public static void main(String[] args)
}//public class Test extends Configured implements Tools

On Wed, Feb 3, 2010 at 7:27 PM, Nick Jones <nick.jones@amd.com> wrote:

> Hi Udaya,
> The following code uses already existing cache files as part of the map to
> process incoming data.  I apologize on the naming conventions, but the code
> had to be stripped.  I also removed several variable assignments, etc..
>
> public class MySpecialJob {
>  public static class MyMapper extends MapReduceBase implements
> Mapper<LongWritable, MyMapInputValueClass, MyMapOutputKeyClass,
> BigIntegerWritable> {
>
>    private Path[] dcfiles;
>    ...
>
>    public void configure(JobConf job) {
>      // Load cached files
>      dcfiles = new Path[0];
>      try {
>        dcfiles = DistributedCache.getLocalCacheFiles(job);
>      } catch (IOException ioe) {
>        System.err.println("Caught exception while getting cached files: " +
> StringUtils.stringifyException(ioe));
>      }
>    }
>
>    public void map(LongWritable key, MyMapInputValueClass value,
>      OutputCollector<MyMapOutputKeyClass, BigIntegerWritable> output,
>      Reporter reporter) throws IOException {
>      ...
>      for (Path dcfile : dcfiles) {
>        if(dcfile.getName().equalsIgnoreCase(file_match)) {
>          readbuffer = new BufferedReader(
>            new FileReader(dcfile.toString()));
>          ...
>          while((raw_line = readbuffer.readLine()) != null) {
>            ...
>
>  public static void main(String[] args) throws Exception {
>    JobConf conf = new JobConf(MySpecialJob.class);
>    ...
>    DistributedCache.addCacheFile(new URI("/path/to/file1.txt"), conf);
>    DistributedCache.addCacheFile(new URI("/path/to/file2.txt"), conf);
>    DistributedCache.addCacheFile(new URI("/path/to/file3.txt"), conf);
>    ...
>  }
> }
>
> Nick Jones
>
>
>
> Udaya Lakshmi wrote:
>
>> Hi,
>>   As a newbie to hadoop, I am not able to figure out how to use
>> DistributedCache class. Can someone give me a small code which
>> distributes file to the cluster and the show how to open and use the
>> file in the map or reduce task.
>> Thanks,
>> Udaya
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message