hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Swathi V <swat...@zinniasystems.com>
Subject Re: Re: Re: How do I set the intermediate output path when I use 2 mapreduce jobs?
Date Fri, 23 Sep 2011 17:37:54 GMT
Hi JunTun,

1. Distributed Cache in new API usage:

 // Setting up the cache for the application

     1. Copy the requisite files to the FileSystem:

     $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat
     $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip
     $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
     $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
     $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
     $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz

     2. Setup the application's JobConf:

     JobConf job = new JobConf();
     DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),
                                   job);
     DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
     DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
     DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
     DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
     DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);

     3. Use the cached files in the Mapper
<http://hadoop.apache.org/common/docs/r0.20.1/api/org/apache/hadoop/mapred/Mapper.html>
     or Reducer
<http://hadoop.apache.org/common/docs/r0.20.1/api/org/apache/hadoop/mapred/Reducer.html>:

     public static class MapClass extends MapReduceBase
     implements Mapper<K, V, K, V> {

       private Path[] localArchives;
       private Path[] localFiles;

       public void configure(JobConf job) {
         // Get the cached archives/files
         localArchives = DistributedCache.getLocalCacheArchives(job);
         localFiles = DistributedCache.getLocalCacheFiles(job);
       }

       public void map(K key, V value,
                       OutputCollector<K, V> output, Reporter reporter)
       throws IOException {
         // Use data from the cached archives/files here
         // ...
         // ...
         output.collect(k, v);
       }
     }


2. without distributed cache in simple terms if you are interested i can
help you with the code.



2011/9/23 谭军 <tanjun_2525@163.com>

> Hi Swathi.V.,
> I think my code below would work:
>
>         Configuration conf1 = new Configuration();
>         Job job1 = new Job(conf1, "Retrieval1");
>         job1.setJarByClass(Retrieval.class);
>         job1.addCacheFile(new URI(args[0]));   // problem here
>         conf1.set("keyNodeFile", args[0]);         //try to set key node
> file path and get file path in mapper1
>         job1.setOutputKeyClass(Text.class);
>         job1.setOutputValueClass(Text.class);
>         job1.setMapperClass(RetrievalMapper.class);
>         job1.setReducerClass(RetrievalReducer.class);
>         FileInputFormat.addInputPath(job1, new Path(args[1]));
>         String out = args[2] + System.nanoTime();
>
>         FileOutputFormat.setOutputPath(job1, new Path(out));
>         job1.waitForCompletion(true);
>
>         Configuration conf2 = new Configuration();
>         Job job2 = new Job(conf2, "Retrieval2");
>         job2.setJarByClass(Retrieval.class);
>         conf2.set("newKeyNodeFile", out);   // try to set new key node file
> path and get it in mapper2
>         DistributedCache.addCacheFile(new URI(out));  // problem here
>         job2.setOutputKeyClass(Text.class);
>         job2.setOutputValueClass(Text.class);
>         job2.setMapperClass(RetrievalMapper2.class);
>         job2.setReducerClass(RetrievalReducer2.class);
>         FileInputFormat.addInputPath(job2, new Path(args[1]));
>         FileOutputFormat.setOutputPath(job2, new Path(args[2]));
>         System.exit(job2.waitForCompletion(true) ? 0 : 1);
>
> But nullpointer exception was reported when I tried to get file by using
> distributed cache file.
> How to use distributed cache file in new APIs ?
> I also tried to deliver file path by setting global parameters, however,
> failed either.
> How can I read "args[0]" file in mapper1 and intermediate file in mapper2
> use new APIs?
> Thanks!
>
>
> --
>
> Regards!
>
> Jun Tan
>
> At 2011-09-23 19:06:50,"Swathi V" <swathiv@zinniasystems.com> wrote:
>
> Hi Jun Tan,
>
> Yes i use 0.21.0 version. So i have used those. Well the Hadoop Definitive
> Guide has job dependency examples for 0.20.x.
>
> Thank You,
>
> 2011/9/23 谭军 <tanjun_2525@163.com>
>
>>  Swathi.V.,
>> ControlledJob cannot be resolved in my eclipse.
>> My hadoop version is 0.20.2
>> ControlledJob can only be resolved in hadoop 0.21.0 (+)?
>> Or I need some certain plugins?
>> Thanks
>>
>> --
>>
>> Regards!
>>
>> Jun Tan
>>
>> At 2011-09-22 00:56:54,"Swathi V" <swathiv@zinniasystems.com> wrote:
>>
>>
>> Hi,
>>
>> This code might help you
>> //JobDependancies.java snippet
>>
>> Configuration conf = new Configuration();
>>     Job job1 = new Job(conf, "job1");
>>     job1.setJarByClass(JobDependancies.class);
>>     job1.setMapperClass(WordMapper.class);
>>     job1.setReducerClass(WordReducer.class);
>>     job1.setOutputKeyClass(Text.class);
>>     job1.setOutputValueClass(IntWritable.class);
>>     FileInputFormat.addInputPath(job1, new Path(args[0]));
>>     String out=args[1]+System.nanoTime();
>>     FileOutputFormat.setOutputPath(job1, new Path(out));
>>
>>
>>
>>     Configuration conf2 = new Configuration();
>>     Job job2  = new Job(conf2, "job2");
>>     job2.setJarByClass(JobDependancies.class);
>>     job2.setOutputKeyClass(IntWritable.class);
>>     job2.setOutputValueClass(Text.class);
>>     job2.setMapperClass(SortWordMapper.class);
>>     job2.setReducerClass(Reducer.class);
>>     FileInputFormat.addInputPath(job2, new Path(out+"/part-r-00000"));
>>     FileOutputFormat.setOutputPath(job2, new Path(args[1]));
>>
>>     ControlledJob controlledJob1 = new
>> ControlledJob(job1.getConfiguration());
>>     ControlledJob controlledJob2 = new
>> ControlledJob(job2.getConfiguration());
>>     controlledJob2.addDependingJob(controlledJob1);
>>     JobControl jobControl= new JobControl("control");
>>
>>     jobControl.addJob(controlledJob1);
>>     jobControl.addJob(controlledJob2);
>>
>>     Thread thread = new Thread(jobControl);
>>     thread.start();
>>     while(!jobControl.allFinished())
>>     {
>>      try {
>>      Thread.sleep(10000);
>>      } catch (InterruptedException e) {
>>      // TODO Auto-generated catch block
>>      e.printStackTrace();
>>      }
>>     }
>>     jobControl.stop();
>>     }
>> }
>>
>>
>> wordcount output => job1 is given to sort=> job2
>> Irrespective of mappers and reducers, above mentioned is the way to handle
>> many jobs.
>>
>> 2011/9/21 谭军 <tanjun_2525@163.com>
>>
>>> Hi,
>>> I want to use 2 MR jobs sequentially.
>>> And the first job produces intermediate result to a temp file.
>>> The second job reads the result in temp file but not the FileInputPath.
>>> I tried, but FileNotFoundException reported.
>>> Then I checked the datanodes, temp file was created.
>>> The first job was executed correctly.
>>> Why the second job cannot find the file? The file was created before the
>>> second job was executed.
>>> Thanks!
>>>
>>> --
>>>
>>> Regards!
>>>
>>> Jun Tan
>>>
>>>
>>>
>>
>>
>> --
>> Regards,
>> Swathi.V.
>>
>>
>>
>>
>
>
> --
> Regards,
> Swathi.V.
>
>
>
>


-- 
Regards,
Swathi.V.

Mime
View raw message