hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Swathi V <swat...@zinniasystems.com>
Subject Re: Re: Re: Re: How do I set the intermediate output path when I use 2 mapreduce jobs?
Date Sat, 24 Sep 2011 15:03:46 GMT
Hi Jun Tun,

Yeah ! surely...
Well the code i gave is the new API.

2011/9/24 谭军 <tanjun_2525@163.com>

> Hi Swathi.V.,
> Thank you very much.
> It's very kind of you to do that.
> I think the code you gave is implemented in old APIs.
> I made it several days ago. What I can't is by new APIs.
> I just get started to mapreduce programming and get some problems with my
> code.
> When you get time we can talk online.
> Thanks!
>
> --
>
> Regards!
>
> Jun Tan
>
> At 2011-09-24 01:37:54,"Swathi V" <swathiv@zinniasystems.com> wrote:
>
> Hi JunTun,
>
> 1. Distributed Cache in new API usage:
>
>  // Setting up the cache for the application
>
>      1. Copy the requisite files to the FileSystem:
>
>      $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat
>      $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip
>      $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
>      $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
>      $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
>      $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz
>
>      2. Setup the application's JobConf:
>
>      JobConf job = new JobConf();
>      DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),
>                                    job);
>      DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
>      DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
>      DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
>      DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
>      DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
>
>      3. Use the cached files in the Mapper <http://hadoop.apache.org/common/docs/r0.20.1/api/org/apache/hadoop/mapred/Mapper.html>
>      or Reducer <http://hadoop.apache.org/common/docs/r0.20.1/api/org/apache/hadoop/mapred/Reducer.html>:
>
>      public static class MapClass extends MapReduceBase
>      implements Mapper<K, V, K, V> {
>
>        private Path[] localArchives;
>        private Path[] localFiles;
>
>        public void configure(JobConf job) {
>          // Get the cached archives/files
>          localArchives = DistributedCache.getLocalCacheArchives(job);
>          localFiles = DistributedCache.getLocalCacheFiles(job);
>        }
>
>        public void map(K key, V value,
>                        OutputCollector<K, V> output, Reporter reporter)
>        throws IOException {
>          // Use data from the cached archives/files here
>          // ...
>          // ...
>          output.collect(k, v);
>        }
>      }
>
>
> 2. without distributed cache in simple terms if you are interested i can
> help you with the code.
>
>
>
> 2011/9/23 谭军 <tanjun_2525@163.com>
>
>> Hi Swathi.V.,
>> I think my code below would work:
>>
>>         Configuration conf1 = new Configuration();
>>         Job job1 = new Job(conf1, "Retrieval1");
>>         job1.setJarByClass(Retrieval.class);
>>         job1.addCacheFile(new URI(args[0]));   // problem here
>>         conf1.set("keyNodeFile", args[0]);         //try to set key node
>> file path and get file path in mapper1
>>         job1.setOutputKeyClass(Text.class);
>>         job1.setOutputValueClass(Text.class);
>>         job1.setMapperClass(RetrievalMapper.class);
>>         job1.setReducerClass(RetrievalReducer.class);
>>         FileInputFormat.addInputPath(job1, new Path(args[1]));
>>         String out = args[2] + System.nanoTime();
>>
>>         FileOutputFormat.setOutputPath(job1, new Path(out));
>>         job1.waitForCompletion(true);
>>
>>         Configuration conf2 = new Configuration();
>>         Job job2 = new Job(conf2, "Retrieval2");
>>         job2.setJarByClass(Retrieval.class);
>>         conf2.set("newKeyNodeFile", out);   // try to set new key node
>> file path and get it in mapper2
>>         DistributedCache.addCacheFile(new URI(out));  // problem here
>>         job2.setOutputKeyClass(Text.class);
>>         job2.setOutputValueClass(Text.class);
>>         job2.setMapperClass(RetrievalMapper2.class);
>>         job2.setReducerClass(RetrievalReducer2.class);
>>         FileInputFormat.addInputPath(job2, new Path(args[1]));
>>         FileOutputFormat.setOutputPath(job2, new Path(args[2]));
>>         System.exit(job2.waitForCompletion(true) ? 0 : 1);
>>
>> But nullpointer exception was reported when I tried to get file by using
>> distributed cache file.
>> How to use distributed cache file in new APIs ?
>> I also tried to deliver file path by setting global parameters, however,
>> failed either.
>> How can I read "args[0]" file in mapper1 and intermediate file in mapper2
>> use new APIs?
>> Thanks!
>>
>>
>> --
>>
>> Regards!
>>
>> Jun Tan
>>
>> At 2011-09-23 19:06:50,"Swathi V" <swathiv@zinniasystems.com> wrote:
>>
>> Hi Jun Tan,
>>
>> Yes i use 0.21.0 version. So i have used those. Well the Hadoop Definitive
>> Guide has job dependency examples for 0.20.x.
>>
>> Thank You,
>>
>> 2011/9/23 谭军 <tanjun_2525@163.com>
>>
>>>  Swathi.V.,
>>> ControlledJob cannot be resolved in my eclipse.
>>> My hadoop version is 0.20.2
>>> ControlledJob can only be resolved in hadoop 0.21.0 (+)?
>>> Or I need some certain plugins?
>>> Thanks
>>>
>>> --
>>>
>>> Regards!
>>>
>>> Jun Tan
>>>
>>> At 2011-09-22 00:56:54,"Swathi V" <swathiv@zinniasystems.com> wrote:
>>>
>>>
>>> Hi,
>>>
>>> This code might help you
>>> //JobDependancies.java snippet
>>>
>>> Configuration conf = new Configuration();
>>>     Job job1 = new Job(conf, "job1");
>>>     job1.setJarByClass(JobDependancies.class);
>>>     job1.setMapperClass(WordMapper.class);
>>>     job1.setReducerClass(WordReducer.class);
>>>     job1.setOutputKeyClass(Text.class);
>>>     job1.setOutputValueClass(IntWritable.class);
>>>     FileInputFormat.addInputPath(job1, new Path(args[0]));
>>>     String out=args[1]+System.nanoTime();
>>>     FileOutputFormat.setOutputPath(job1, new Path(out));
>>>
>>>
>>>
>>>     Configuration conf2 = new Configuration();
>>>     Job job2  = new Job(conf2, "job2");
>>>     job2.setJarByClass(JobDependancies.class);
>>>     job2.setOutputKeyClass(IntWritable.class);
>>>     job2.setOutputValueClass(Text.class);
>>>     job2.setMapperClass(SortWordMapper.class);
>>>     job2.setReducerClass(Reducer.class);
>>>     FileInputFormat.addInputPath(job2, new Path(out+"/part-r-00000"));
>>>     FileOutputFormat.setOutputPath(job2, new Path(args[1]));
>>>
>>>     ControlledJob controlledJob1 = new
>>> ControlledJob(job1.getConfiguration());
>>>     ControlledJob controlledJob2 = new
>>> ControlledJob(job2.getConfiguration());
>>>     controlledJob2.addDependingJob(controlledJob1);
>>>     JobControl jobControl= new JobControl("control");
>>>
>>>     jobControl.addJob(controlledJob1);
>>>     jobControl.addJob(controlledJob2);
>>>
>>>     Thread thread = new Thread(jobControl);
>>>     thread.start();
>>>     while(!jobControl.allFinished())
>>>     {
>>>      try {
>>>      Thread.sleep(10000);
>>>      } catch (InterruptedException e) {
>>>      // TODO Auto-generated catch block
>>>      e.printStackTrace();
>>>      }
>>>     }
>>>     jobControl.stop();
>>>     }
>>> }
>>>
>>>
>>> wordcount output => job1 is given to sort=> job2
>>> Irrespective of mappers and reducers, above mentioned is the way to
>>> handle many jobs.
>>>
>>> 2011/9/21 谭军 <tanjun_2525@163.com>
>>>
>>>> Hi,
>>>> I want to use 2 MR jobs sequentially.
>>>> And the first job produces intermediate result to a temp file.
>>>> The second job reads the result in temp file but not the FileInputPath.
>>>> I tried, but FileNotFoundException reported.
>>>> Then I checked the datanodes, temp file was created.
>>>> The first job was executed correctly.
>>>> Why the second job cannot find the file? The file was created before the
>>>> second job was executed.
>>>> Thanks!
>>>>
>>>> --
>>>>
>>>> Regards!
>>>>
>>>> Jun Tan
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Regards,
>>> Swathi.V.
>>>
>>>
>>>
>>>
>>
>>
>> --
>> Regards,
>> Swathi.V.
>>
>>
>>
>>
>
>
> --
> Regards,
> Swathi.V.
>
>
>
>


-- 
Regards,
Swathi.V.

Mime
View raw message