hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Udaya Lakshmi <udaya...@gmail.com>
Subject Re: Example for using DistributedCache class
Date Wed, 03 Feb 2010 19:26:30 GMT
Thanks nick. Its working.

Udaya

On 2/4/10, Jones, Nick <nick.jones@amd.com> wrote:
> The files for the DC need to be on HDFS.
>
> Nick Jones
> Sent by radiation.
>
> On Feb 3, 2010, at 12:32 PM, "Udaya Lakshmi" <udaya603@gmail.com> wrote:
>
>> Hi Nick,
>>  I am not able to start the following job. I have the file that has
>> to be
>> passed to distributedcache in the local filesystem of the task
>> tracker.
>>
>> Can you tell me if I am missing something?
>>
>> import org.apache.hadoop.fs.*;
>> import org.apache.hadoop.conf.*;
>> import org.apache.hadoop.mapred.*;
>> import org.apache.hadoop.io.*;
>> import org.apache.hadoop.mapred.*;
>> import org.apache.hadoop.mapred.lib.*;
>> import org.apache.hadoop.util.*;
>> import org.apache.hadoop.filecache.*;
>>
>> import java.io.*;
>> import java.util.*;
>> import java.text.SimpleDateFormat;
>> import java.net.*;
>>
>> public class Test extends Configured
>> {
>>
>>
>>
>>  public static class MapClass extends MapReduceBase implements
>> Mapper<LongWritable,Text,Text,Text>
>>  {
>>
>>   private FileSystem fs;
>>   private Path[] localFiles;
>>   private String str;
>>   public void configure(JobConf job)
>>   {
>>     try{
>>       fs = FileSystem.getLocal(new Configuration());
>>       localFiles = DistributedCache.getLocalCacheFiles(job);
>>        }
>>     catch(IOException e){
>>       System.out.println("Exception while getting cached files");
>>     }//catch(IOException e){
>>
>>   }//public void configure(JobConf job)
>>
>>    public void map(LongWritable Key,Text
>> value,OutputCollector<Text,Text>
>> output,Reporter reporter) throws IOException
>>    {
>>     BufferedReader readBuffer = new BufferedReader(new
>> FileReader(localFiles[0].toString()));
>>     str = readBuffer.readLine();
>>     output.collect(new Text(str),new Text(str));
>>    }//public void map(LongWritable Key,Text
>> value,OutputCollector<Text,Text> output,Reporter reporter) throws
>> IOException
>>
>>    public void close() throws IOException
>>    {
>>      fs.close();
>>    }//public void close() throws IOException
>>
>>
>> }//public static class MapClass extends MapReduceBase implements
>> Mapper<>
>>
>>
>>
>>  public static class ReduceClass extends MapReduceBase implements
>> Reducer<Text,Text,Text,Text>
>>  {
>>    public void reduce(Text key,Iterator<Text>
>> values,OutputCollector<Text,Text> output,Reporter reporter) throws
>> IOException
>>    {
>>    }//public void reduce(Text key,Iterator<Text>
>> values,OutputCollector<Text,Text> output,Reporter reporter) throws
>> IOException
>>  }//public static class ReduceClass extends MapReduceBase implements
>> Reducer<Text,Text,Text,Text>
>>
>>  public static void main(String[] args)
>>  {
>>    JobConf conf = new JobConf(Test.class);
>>    JobClient client = new JobClient();
>>    conf.setMapperClass(Test.MapClass.class);
>>    conf.setReducerClass(IdentityReducer.class);
>>    conf.setOutputKeyClass(Text.class);
>>    conf.setOutputValueClass(Text.class);
>>    conf.setInputPath(new Path("input"));
>>    conf.setOutputPath(new Path("output"));
>>    try{
>>      DistributedCache.addCacheFile(new
>> URI("/home/udaya/hadoop-0.18.3/file_to_distribute"), conf);
>>     }
>>     catch(URISyntaxException e)
>>     {}
>>     try{
>>     JobClient.runJob(conf);
>>       }
>>     catch(Exception e)
>>     {
>>      System.out.println("Error starting the job");
>>     }
>>  }//public static void main(String[] args)
>> }//public class Test extends Configured implements Tools
>>
>> On Wed, Feb 3, 2010 at 7:27 PM, Nick Jones <nick.jones@amd.com> wrote:
>>
>>> Hi Udaya,
>>> The following code uses already existing cache files as part of the
>>> map to
>>> process incoming data.  I apologize on the naming conventions, but
>>> the code
>>> had to be stripped.  I also removed several variable assignments,
>>> etc..
>>>
>>> public class MySpecialJob {
>>> public static class MyMapper extends MapReduceBase implements
>>> Mapper<LongWritable, MyMapInputValueClass, MyMapOutputKeyClass,
>>> BigIntegerWritable> {
>>>
>>>   private Path[] dcfiles;
>>>   ...
>>>
>>>   public void configure(JobConf job) {
>>>     // Load cached files
>>>     dcfiles = new Path[0];
>>>     try {
>>>       dcfiles = DistributedCache.getLocalCacheFiles(job);
>>>     } catch (IOException ioe) {
>>>       System.err.println("Caught exception while getting cached
>>> files: " +
>>> StringUtils.stringifyException(ioe));
>>>     }
>>>   }
>>>
>>>   public void map(LongWritable key, MyMapInputValueClass value,
>>>     OutputCollector<MyMapOutputKeyClass, BigIntegerWritable> output,
>>>     Reporter reporter) throws IOException {
>>>     ...
>>>     for (Path dcfile : dcfiles) {
>>>       if(dcfile.getName().equalsIgnoreCase(file_match)) {
>>>         readbuffer = new BufferedReader(
>>>           new FileReader(dcfile.toString()));
>>>         ...
>>>         while((raw_line = readbuffer.readLine()) != null) {
>>>           ...
>>>
>>> public static void main(String[] args) throws Exception {
>>>   JobConf conf = new JobConf(MySpecialJob.class);
>>>   ...
>>>   DistributedCache.addCacheFile(new URI("/path/to/file1.txt"), conf);
>>>   DistributedCache.addCacheFile(new URI("/path/to/file2.txt"), conf);
>>>   DistributedCache.addCacheFile(new URI("/path/to/file3.txt"), conf);
>>>   ...
>>> }
>>> }
>>>
>>> Nick Jones
>>>
>>>
>>>
>>> Udaya Lakshmi wrote:
>>>
>>>> Hi,
>>>>  As a newbie to hadoop, I am not able to figure out how to use
>>>> DistributedCache class. Can someone give me a small code which
>>>> distributes file to the cluster and the show how to open and use the
>>>> file in the map or reduce task.
>>>> Thanks,
>>>> Udaya
>>>>
>>>>
>>>
>
>

Mime
View raw message