hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabriel Balan <gabriel.ba...@oracle.com>
Subject Re: Accessing files in Hadoop 2.7.2 Distributed Cache
Date Tue, 21 Jun 2016 22:44:03 GMT
Hi

> My code creates a new job named "job 1" which writes something to distributed cache (say
a text file) and the job gets completed.
Just to manage expectations, you add files to the distributed cache_in the job driver_, and
the framework makes them available to maps and reducers.

    Adding a file to the distributed cache in a mapper and expecting to have it available
in other mappers/reducers or later jobs is not a supported use case.

If you want a mapper/reducer to send info back to the job driver, then use counters or write
regular files (in the job output dir).

If you want to add a file to the distributed cache and have multiple jobs use it, then:

    Note that DistributedCache.add*(Path, Conf) append the file name to internal conf properties.
(then when you submit the job, the framework will read those properties to find out what files
need to be distributed, added to the classpath, etc.).
    So DistributedCache.addCacheFile(*f*, *conf1*) will write something into *conf1*. When
you launch *job1* using *conf1*, *f* gets distributed. When you launch *job2* using *conf2*,
then *job2* may not distribute file *f*, depending on how you build *conf2* (independent of
*conf1*, or a copy/clone of *conf1*).

hth

Gabriel Balan

P.S. Btw, you don't have to copy the local file to hdfs using "IOUtils.copyBytes(in, out,
4096, true);"

Try  FileSystem.copyFromLocalFile <https://hadoop.apache.org/docs/r2.6.1/api/src-html/org/apache/hadoop/fs/FileSystem.html#line.1885>
or "hadoop -jar foo.jar MyClass -file /home/siddharth/Desktop/data/bloom_filter"


On 6/13/2016 5:14 AM, Siddharth Dawar wrote:
> Hi Jeff,
>
> Thanks for your prompt reply. Actually my problem is as follows:
>
> My code creates a new job named "job 1" which writes something to distributed cache (say
a text file) and the job gets completed.
>
> Now, I want to create some n number of jobs in while loop below, which reads the text
file written by "job 1" from the distributed cache. So my question is, "*How to share content
among multiple jobs using distributed cache*" ?
>
> *Another part of the problem *is that I don't know how to get instance of running job
from
> JobClient.runJob(conf2);
> *so that I can use job.addcachefiles(..) command/*
> **
>
>
> while (true)
> {
> JobConf  conf2   =  new  JobConf(getConf(),graphMining.class);
>
> conf2.setJobName("sid");
> conf2.setMapperClass(mapperMiner.class);
> conf2.setReducerClass(reducerMiner.class);
>
> conf2.setInputFormat(SequenceFileInputFormat.class);
> conf2.setOutputFormat(SequenceFileOutputFormat.class);
> conf2.setOutputValueClass(BytesWritable.class);
>
> conf2.setMapOutputKeyClass(Text.class);
> conf2.setMapOutputValueClass(MapWritable.class);
> conf2.setOutputKeyClass(Text.class);
>
> conf2.setNumMapTasks(Integer.parseInt(args[3]));
> conf2.setNumReduceTasks(Integer.parseInt(args[4]));
> FileInputFormat.addInputPath(conf2,  new  Path(input));
> FileOutputFormat.setOutputPath(conf2,  new  Path(output));  }
> RunningJob  job  =  JobClient.runJob(conf2); }
>
>
>
> On Wed, Jun 8, 2016 at 3:50 AM, Guttadauro, Jeff <jeff.guttadauro@here.com <mailto:jeff.guttadauro@here.com>>
wrote:
>
>     Hi, Siddharth.
>
>     I was also a bit frustrated at what I found to be scant documentation on how to use
the distributed cache in Hadoop 2.  The DistributedCache class itself was deprecated in Hadoop
2, but there don’t appear to be very clear instructions on the alternative.  I think it’s
actually much simpler to work with files on the distributed cache in Hadoop 2.  The new way
is to add files to the cache (or cacheArchive) via the Job object:
>
>     job.addCacheFile(/uriForYourFile/)
>
>     job.addCacheArchive(/uriForYourArchive/);
>
>     The cool part is that, if you set up your URI so that it has a “#/yourFileReference/”
at the end, then Hadoop will set up a symbolic link named “/yourFileReference/” in your
job’s working directory, which you can use to get at the file or archive.  So, it’s as
if the file or archive is in the working directory.  That obviates the need to even work with
the DistributedCache class in your Mapper or Reducer, since you can just work with the file
(or path using nio) directly.
>
>     Hope that helps.
>
>     -Jeff
>
>     *From:*Siddharth Dawar [mailto:siddharthdawar17@gmail.com <mailto:siddharthdawar17@gmail.com>]
>     *Sent:* Tuesday, June 07, 2016 4:06 AM
>     *To:* user@hadoop.apache.org <mailto:user@hadoop.apache.org>
>     *Subject:* Accessing files in Hadoop 2.7.2 Distributed Cache
>
>     Hi,
>
>     I want to use the distributed cache to allow my mappers to access data in Hadoop
2.7.2. In main, I'm using the command
>
>     String hdfs_path="hdfs://localhost:9000/bloomfilter";
>
>     InputStream in = new BufferedInputStream(new FileInputStream("/home/siddharth/Desktop/data/bloom_filter"));
>
>     Configuration conf = new Configuration();
>
>     fs = FileSystem.get(java.net.URI.create(hdfs_path), conf);
>
>     OutputStream out = fs.create(new Path(hdfs_path));
>
>                                                     
>
>     //Copy file from local to HDFS
>
>     IOUtils.copyBytes(in, out, 4096, true);
>
>                                                     
>
>     System.out.println(hdfs_path + " copied to HDFS");DistributedCache.addCacheFile(new
Path(hdfs_path).toUri(), conf2);
>
>     DistributedCache.addCacheFile(new Path(hdfs_path).toUri(), conf2);
>
>       
>
>     The above code adds a file present on my local file system to HDFS and adds it to
the distributed cache.
>
>     However, in my mapper code, when I try to access the file stored in distributed cache,
the Path[] P variable gets null value. d
>
>     public void configure(JobConf conf)
>
>                             {
>
>                                     this.conf = conf;
>
>                                     try {
>
>                                            Path [] p=DistributedCache.getLocalCacheFiles(conf);
>
>                                     } catch (IOException e) {
>
>                                            // TODO Auto-generated catch block
>
>                                            e.printStackTrace();
>
>                                     }
>
>                                     
>
>                                            
>
>       
>
>                             }
>
>     Even when I tried to access distributed cache from the following code
>
>     in my mapper, the code returns the error that bloomfilter file doesn't exist
>
>     strm = new DataInputStream(new FileInputStream("bloomfilter"));
>
>     // Read into our Bloom filter.
>
>     filter.readFields(strm);
>
>     strm.close();
>
>     However, I read somewhere that if we add a file to distributed cache, we can access
it
>
>     directly from its name.
>
>     Can you please help me out ?
>
>

-- 
The statements and opinions expressed here are my own and do not necessarily represent those
of Oracle Corporation.


Mime
View raw message