hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jones, Nick" <nick.jo...@amd.com>
Subject Re: Example for using DistributedCache class
Date Wed, 03 Feb 2010 19:05:08 GMT
The files for the DC need to be on HDFS.

Nick Jones
Sent by radiation.

On Feb 3, 2010, at 12:32 PM, "Udaya Lakshmi" <udaya603@gmail.com> wrote:

> Hi Nick,
>  I am not able to start the following job. I have the file that has  
> to be
> passed to distributedcache in the local filesystem of the task  
> tracker.
>
> Can you tell me if I am missing something?
>
> import org.apache.hadoop.fs.*;
> import org.apache.hadoop.conf.*;
> import org.apache.hadoop.mapred.*;
> import org.apache.hadoop.io.*;
> import org.apache.hadoop.mapred.*;
> import org.apache.hadoop.mapred.lib.*;
> import org.apache.hadoop.util.*;
> import org.apache.hadoop.filecache.*;
>
> import java.io.*;
> import java.util.*;
> import java.text.SimpleDateFormat;
> import java.net.*;
>
> public class Test extends Configured
> {
>
>
>
>  public static class MapClass extends MapReduceBase implements
> Mapper<LongWritable,Text,Text,Text>
>  {
>
>   private FileSystem fs;
>   private Path[] localFiles;
>   private String str;
>   public void configure(JobConf job)
>   {
>     try{
>       fs = FileSystem.getLocal(new Configuration());
>       localFiles = DistributedCache.getLocalCacheFiles(job);
>        }
>     catch(IOException e){
>       System.out.println("Exception while getting cached files");
>     }//catch(IOException e){
>
>   }//public void configure(JobConf job)
>
>    public void map(LongWritable Key,Text  
> value,OutputCollector<Text,Text>
> output,Reporter reporter) throws IOException
>    {
>     BufferedReader readBuffer = new BufferedReader(new
> FileReader(localFiles[0].toString()));
>     str = readBuffer.readLine();
>     output.collect(new Text(str),new Text(str));
>    }//public void map(LongWritable Key,Text
> value,OutputCollector<Text,Text> output,Reporter reporter) throws
> IOException
>
>    public void close() throws IOException
>    {
>      fs.close();
>    }//public void close() throws IOException
>
>
> }//public static class MapClass extends MapReduceBase implements  
> Mapper<>
>
>
>
>  public static class ReduceClass extends MapReduceBase implements
> Reducer<Text,Text,Text,Text>
>  {
>    public void reduce(Text key,Iterator<Text>
> values,OutputCollector<Text,Text> output,Reporter reporter) throws
> IOException
>    {
>    }//public void reduce(Text key,Iterator<Text>
> values,OutputCollector<Text,Text> output,Reporter reporter) throws
> IOException
>  }//public static class ReduceClass extends MapReduceBase implements
> Reducer<Text,Text,Text,Text>
>
>  public static void main(String[] args)
>  {
>    JobConf conf = new JobConf(Test.class);
>    JobClient client = new JobClient();
>    conf.setMapperClass(Test.MapClass.class);
>    conf.setReducerClass(IdentityReducer.class);
>    conf.setOutputKeyClass(Text.class);
>    conf.setOutputValueClass(Text.class);
>    conf.setInputPath(new Path("input"));
>    conf.setOutputPath(new Path("output"));
>    try{
>      DistributedCache.addCacheFile(new
> URI("/home/udaya/hadoop-0.18.3/file_to_distribute"), conf);
>     }
>     catch(URISyntaxException e)
>     {}
>     try{
>     JobClient.runJob(conf);
>       }
>     catch(Exception e)
>     {
>      System.out.println("Error starting the job");
>     }
>  }//public static void main(String[] args)
> }//public class Test extends Configured implements Tools
>
> On Wed, Feb 3, 2010 at 7:27 PM, Nick Jones <nick.jones@amd.com> wrote:
>
>> Hi Udaya,
>> The following code uses already existing cache files as part of the  
>> map to
>> process incoming data.  I apologize on the naming conventions, but  
>> the code
>> had to be stripped.  I also removed several variable assignments,  
>> etc..
>>
>> public class MySpecialJob {
>> public static class MyMapper extends MapReduceBase implements
>> Mapper<LongWritable, MyMapInputValueClass, MyMapOutputKeyClass,
>> BigIntegerWritable> {
>>
>>   private Path[] dcfiles;
>>   ...
>>
>>   public void configure(JobConf job) {
>>     // Load cached files
>>     dcfiles = new Path[0];
>>     try {
>>       dcfiles = DistributedCache.getLocalCacheFiles(job);
>>     } catch (IOException ioe) {
>>       System.err.println("Caught exception while getting cached  
>> files: " +
>> StringUtils.stringifyException(ioe));
>>     }
>>   }
>>
>>   public void map(LongWritable key, MyMapInputValueClass value,
>>     OutputCollector<MyMapOutputKeyClass, BigIntegerWritable> output,
>>     Reporter reporter) throws IOException {
>>     ...
>>     for (Path dcfile : dcfiles) {
>>       if(dcfile.getName().equalsIgnoreCase(file_match)) {
>>         readbuffer = new BufferedReader(
>>           new FileReader(dcfile.toString()));
>>         ...
>>         while((raw_line = readbuffer.readLine()) != null) {
>>           ...
>>
>> public static void main(String[] args) throws Exception {
>>   JobConf conf = new JobConf(MySpecialJob.class);
>>   ...
>>   DistributedCache.addCacheFile(new URI("/path/to/file1.txt"), conf);
>>   DistributedCache.addCacheFile(new URI("/path/to/file2.txt"), conf);
>>   DistributedCache.addCacheFile(new URI("/path/to/file3.txt"), conf);
>>   ...
>> }
>> }
>>
>> Nick Jones
>>
>>
>>
>> Udaya Lakshmi wrote:
>>
>>> Hi,
>>>  As a newbie to hadoop, I am not able to figure out how to use
>>> DistributedCache class. Can someone give me a small code which
>>> distributes file to the cluster and the show how to open and use the
>>> file in the map or reduce task.
>>> Thanks,
>>> Udaya
>>>
>>>
>>


Mime
View raw message