hadoop-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Guttadauro, Jeff" <jeff.guttada...@here.com>
Subject RE: Accessing files in Hadoop 2.7.2 Distributed Cache
Date Mon, 20 Jun 2016 15:24:44 GMT
Hi, Siddharth.

Not sure I fully understand your problem.  I think you are saying that you would like to run
an initial M/R job to create some data that n jobs after that will be able to use, and you
are saying you’d like to use the distributed cache for that.  I think you may not need the
distributed cache for that though, since I would assume your initial M/R job would write its
data out to HDFS (or possibly s3).  In that case, the data is available to subsequent jobs
since it is right there in HDFS (or s3).  So, you could read the files in the setup code of
your subsequent jobs’ Mapper or Reducer or even use it as an input directory for the subsequent
jobs (can use MultipleInputs) depending on what you’re trying to do.

If you really want to use the distributed cache though, I think you need to write your driver
code in such a way to read in the output from your initial job from HDFS (or s3) and then,
for each of your subsequent jobs, you would put that on the distributed cache there.

Regarding your question about the Job object and your example code, I think you may have an
easier time of it if you work with the Job object instead of the deprecated JobConf object.
 Here’s an example of that:


   Job job = Job.getInstance(getConf());
   job.setJobName(jobName);
   job.setJarByClass(getClass());
   job.setMapperClass(YourMapper.class);
   job.setMapOutputKeyClass(NullWritable.class);
   job.setMapOutputValueClass(Writable.class);
   job.setNumReduceTasks(0);
   job.setMapSpeculativeExecution(false);
   job.setReduceSpeculativeExecution(false);
   job.addCacheFile(uriForDistCacheFile);
   FileInputFormat.setInputPaths(job, yourJobInputPath);
   FileOutputFormat.setOutputPath(job, yourJobOutputPath);
   FileOutputFormat.setCompressOutput(job, true);
   FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
   job.waitForCompletion(true);

Hope this helps!
-Jeff

From: Siddharth Dawar [mailto:siddharthdawar17@gmail.com]
Sent: Monday, June 13, 2016 4:15 AM
To: Guttadauro, Jeff <jeff.guttadauro@here.com>; user@hadoop.apache.org
Subject: Re: Accessing files in Hadoop 2.7.2 Distributed Cache

Hi Jeff,
Thanks for your prompt reply. Actually my problem is as follows:
My code creates a new job named "job 1" which writes something to distributed cache (say a
text file) and the job gets completed.
Now, I want to create some n number of jobs in while loop below, which reads the text file
written by "job 1" from the distributed cache. So my question is, "How to share content among
multiple jobs using distributed cache" ?
Another part of the problem is that I don't know how to get instance of running job from

JobClient.runJob(conf2);

so that I can use job.addcachefiles(..) command/

while (true)

{

JobConf conf2  = new JobConf(getConf(),graphMining.class);



conf2.setJobName("sid");

conf2.setMapperClass(mapperMiner.class);

conf2.setReducerClass(reducerMiner.class);



conf2.setInputFormat(SequenceFileInputFormat.class);

conf2.setOutputFormat(SequenceFileOutputFormat.class);

conf2.setOutputValueClass(BytesWritable.class);



conf2.setMapOutputKeyClass(Text.class);

conf2.setMapOutputValueClass(MapWritable.class);

conf2.setOutputKeyClass(Text.class);

conf2.setNumMapTasks(Integer.parseInt(args[3]));

conf2.setNumReduceTasks(Integer.parseInt(args[4]));

FileInputFormat.addInputPath(conf2, new Path(input));

FileOutputFormat.setOutputPath(conf2, new Path(output)); }

RunningJob job = JobClient.runJob(conf2);
}



On Wed, Jun 8, 2016 at 3:50 AM, Guttadauro, Jeff <jeff.guttadauro@here.com<mailto:jeff.guttadauro@here.com>>
wrote:
Hi, Siddharth.

I was also a bit frustrated at what I found to be scant documentation on how to use the distributed
cache in Hadoop 2.  The DistributedCache class itself was deprecated in Hadoop 2, but there
don’t appear to be very clear instructions on the alternative.  I think it’s actually
much simpler to work with files on the distributed cache in Hadoop 2.  The new way is to add
files to the cache (or cacheArchive) via the Job object:

job.addCacheFile(uriForYourFile)
job.addCacheArchive(uriForYourArchive);

The cool part is that, if you set up your URI so that it has a “#yourFileReference” at
the end, then Hadoop will set up a symbolic link named “yourFileReference” in your job’s
working directory, which you can use to get at the file or archive.  So, it’s as if the
file or archive is in the working directory.  That obviates the need to even work with the
DistributedCache class in your Mapper or Reducer, since you can just work with the file (or
path using nio) directly.

Hope that helps.
-Jeff
From: Siddharth Dawar [mailto:siddharthdawar17@gmail.com<mailto:siddharthdawar17@gmail.com>]
Sent: Tuesday, June 07, 2016 4:06 AM
To: user@hadoop.apache.org<mailto:user@hadoop.apache.org>
Subject: Accessing files in Hadoop 2.7.2 Distributed Cache

Hi,
I want to use the distributed cache to allow my mappers to access data in Hadoop 2.7.2. In
main, I'm using the command

String hdfs_path="hdfs://localhost:9000/bloomfilter";

InputStream in = new BufferedInputStream(new FileInputStream("/home/siddharth/Desktop/data/bloom_filter"));

Configuration conf = new Configuration();

fs = FileSystem.get(java.net.URI.create(hdfs_path), conf);

OutputStream out = fs.create(new Path(hdfs_path));



//Copy file from local to HDFS

IOUtils.copyBytes(in, out, 4096, true);



System.out.println(hdfs_path + " copied to HDFS");DistributedCache.addCacheFile(new Path(hdfs_path).toUri(),
conf2);

DistributedCache.addCacheFile(new Path(hdfs_path).toUri(), conf2);



The above code adds a file present on my local file system to HDFS and adds it to the distributed
cache.

However, in my mapper code, when I try to access the file stored in distributed cache, the
Path[] P variable gets null value. d

public void configure(JobConf conf)

                       {

                               this.conf = conf;

                               try {

                                      Path [] p=DistributedCache.getLocalCacheFiles(conf);

                               } catch (IOException e) {

                                      // TODO Auto-generated catch block

                                      e.printStackTrace();

                               }







                       }

Even when I tried to access distributed cache from the following code

in my mapper, the code returns the error that bloomfilter file doesn't exist

strm = new DataInputStream(new FileInputStream("bloomfilter"));

// Read into our Bloom filter.

filter.readFields(strm);

strm.close();

However, I read somewhere that if we add a file to distributed cache, we can access it

directly from its name.

Can you please help me out ?


Mime
View raw message