accumulo-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Cornish, Duane C." <Duane.Corn...@jhuapl.edu>
Subject RE: Accumulo Map Reduce is not distributed
Date Mon, 05 Nov 2012 13:56:15 GMT
Hi William,

Thanks for helping me out and sorry I didn't get back to you sooner, I was away for the weekend.
 I am only callying ToolRunner.run once.

public static void ExtractFeaturesFromNewImages() throws Exception{
              String[] parameters = new String[1];
              parameters[0] = "foo";
              InitializeFeatureExtractor();
              ToolRunner.run(CachedConfiguration.getInstance(), new Accumulo_FE_MR_Job(),
parameters);
       }

Another indicator that I'm only calling it once is that before I was pre-splitting the table,
I was just getting one larger map-reduce job with only 1 mapper.  Based on my print statements,
the job was running in sequence (which I guess makes sense because the table only existed
on one node in my cluster.  Then after pre-splitting my table, I was getting one job that
had 4 mappers.  Each was running one after the other.  I hadn't changed any code (other than
adding in the splits).  So, I'm only calling ToolRunner.run once.  Furthermore, my run function
in my job class is provided below:

       @Override
       public int run(String[] arg0) throws Exception {
              runOneTable();
              return 0;
       }

Thanks,
Duane
From: William Slacum [mailto:wilhelm.von.cloud@accumulo.net]
Sent: Friday, November 02, 2012 8:48 PM
To: user@accumulo.apache.org
Subject: Re: Accumulo Map Reduce is not distributed

What about the main method that calls ToolRunner.run? If you have 4 jobs being created, then
you're calling run(String[]) or runOneTable() 4 times.
On Fri, Nov 2, 2012 at 5:21 PM, Cornish, Duane C. <Duane.Cornish@jhuapl.edu<mailto:Duane.Cornish@jhuapl.edu>>
wrote:
Thanks for the prompt response John!
When I say that I'm pre-splitting my table, I mean I am using the tableOperations().addSplits(table,splits)
command.  I have verified that this is correctly splitting my table into 4 tablets and it
is being distributed across my cloud before I start my map reduce job.

Now, I only kick off the job once, but it appears that 4 separate jobs run (one after the
other).  The first one reaches 100% in its map phase (and based on my output only handled
ΒΌ of the data), then the next job starts at 0% and reaches 100%, and so on.  So I think I'm
"only running one mapper at a time in an MR job that has 4 mappers total.".  I have 2 mapper
slots per node.  My hadoop is set up so that one machine is the namenode and the other 3 are
datanodes.  This gives me 6 slots total.  (This is not congruent to my accumulo where the
master is also a slave - giving 4 total slaves).

My map reduce job is not a chain job, so all 4 tablets should be able to run at the same time.

Here is my job class code below:

import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.client.mapreduce.AccumuloRowInputFormat;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.log4j.Level;


public class Accumulo_FE_MR_Job extends Configured implements Tool{

       private void runOneTable() throws Exception {
        System.out.println("Running Map Reduce Feature Extraction Job");

        Job job  = new Job(getConf(), getClass().getName());

        job.setJarByClass(getClass());
        job.setJobName("MRFE");

        job.setInputFormatClass(AccumuloRowInputFormat.class);
        AccumuloRowInputFormat.setZooKeeperInstance(job.getConfiguration(),
                HMaxConstants.INSTANCE,
                HMaxConstants.ZOO_SERVERS);

        AccumuloRowInputFormat.setInputInfo(job.getConfiguration(),
                     HMaxConstants.USER,
                HMaxConstants.PASSWORD.getBytes(),
                HMaxConstants.FEATLESS_IMG_TABLE,
                new Authorizations());

        AccumuloRowInputFormat.setLogLevel(job.getConfiguration(), Level.FATAL);

        job.setMapperClass(AccumuloFEMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);

        job.setNumReduceTasks(4);
        job.setReducerClass(AccumuloFEReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setOutputFormatClass(AccumuloOutputFormat.class);
        AccumuloOutputFormat.setZooKeeperInstance(job.getConfiguration(),
                     HMaxConstants.INSTANCE,
                     HMaxConstants.ZOO_SERVERS);
        AccumuloOutputFormat.setOutputInfo(job.getConfiguration(),
                     HMaxConstants.USER,
                     HMaxConstants.PASSWORD.getBytes(),
                true,
                HMaxConstants.ALL_IMG_TABLE);

        AccumuloOutputFormat.setLogLevel(job.getConfiguration(), Level.FATAL);

        job.waitForCompletion(true);
        if (job.isSuccessful()) {
            System.err.println("Job Successful");
        } else {
            System.err.println("Job Unsuccessful");
        }
     }

       @Override
       public int run(String[] arg0) throws Exception {
              runOneTable();
              return 0;
       }
}

Thanks,
Duane

From: John Vines [mailto:vines@apache.org<mailto:vines@apache.org>]
Sent: Friday, November 02, 2012 5:04 PM
To: user@accumulo.apache.org<mailto:user@accumulo.apache.org>
Subject: Re: Accumulo Map Reduce is not distributed

This sounds like an issue with how your MR environment is configured and/or how you're kicking
off your mapreduce.

Accumulo's input formats with automatically set the number of mappers to the number of tablets
you have, so you should have seen your job go from 1 mapper to 4. What you describe is you
now do 4 MR jobs instead of just one, is that correct? Because that doesn't make a lot of
sense, unless by presplitting your table you meant you now have 4 different support tables.
Or do you mean that you're only running one mapper at a time in an MR job that has 4 mappers
total?

I believe it's somewhere in your kickoff that things may be a bit misconstrued. Just so I'm
clear, how many mapper slots do you have per node, is your job a chain MR job, and do you
mind sharing your code which sets up and kicks off your MR job so I have an idea of what could
be kicking off 4 jobs.

John

On Fri, Nov 2, 2012 at 4:53 PM, Cornish, Duane C. <Duane.Cornish@jhuapl.edu<mailto:Duane.Cornish@jhuapl.edu>>
wrote:
Hello,

I apologize if this discuss should be directed to a hadoop map reduce forum, however, I have
some concern that my problem may be with my use of accumulo.

I have a map reduce job that I want to run over data in a table.  I have an index table and
a support table which contains a subset of the data in the index table.  I would like to map
reduce over the support table on my small 4 node cluster.

I have written a map reduce job that uses the AccumuloRowInputFormat class and sets the support
table as its input table.

In my mapper, I read in a row of the support table, and make a call to a static function which
pulls information out of the index table.  Next, I use the data pulled back from the function
call as input to a call to an external .so file that is stored on the name node.  I then make
another static function call to ingest the new data back into the index table.  (I know I
could emit this in the reduce step, but what I'm ingesting is formatted in a somewhat complex
java object and I already had a static function that ingested it the way I needed it.)  My
reduce step is completely empty.

I output print statements from my mapper to see my progress.  The problem that I'm getting
is that my entire job appears to run in sequence not in parallel.  I am running it from the
accumulo master on the 4 node system.

I realized that my support table is very small and was not being split across any tables.
 I am now presplitting this table across all 4 nodes.  Now, when I run the map reduce job
it appears that 4 separate map reduce jobs run one after each other.  The first map reduce
job runs, gets to 100%, then the next map reduce job runs, etc.  The job is only called once,
why are there 4 jobs running?  Why won't these jobs run in parallel?

Is there any way to set the number of tasks that can run?  This is possible from the hadoop
command line, is it possible from the java API? Also, could my problem stem from the fact
that during my mapper I am making static function calls to another class in my java project,
accessing my accumulo index table, or making a call to an exteral .so library?  I could restructure
the job to avoid making static function calls and I could write directly to the Accumulo table
from my map reduce job if that would fix my problem.  I can't avoid making the external .so
library call.  Any help would be greatly appreciated.

Thanks,
Duane



Mime
View raw message