Return-Path: X-Original-To: apmail-accumulo-user-archive@www.apache.org Delivered-To: apmail-accumulo-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1F40BDB86 for ; Fri, 2 Nov 2012 21:21:51 +0000 (UTC) Received: (qmail 82083 invoked by uid 500); 2 Nov 2012 21:21:51 -0000 Delivered-To: apmail-accumulo-user-archive@accumulo.apache.org Received: (qmail 81928 invoked by uid 500); 2 Nov 2012 21:21:50 -0000 Mailing-List: contact user-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@accumulo.apache.org Delivered-To: mailing list user@accumulo.apache.org Received: (qmail 81919 invoked by uid 99); 2 Nov 2012 21:21:50 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Nov 2012 21:21:50 +0000 X-ASF-Spam-Status: No, hits=-0.1 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_MED,SPF_HELO_PASS,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of duane.cornish@jhuapl.edu designates 128.244.251.37 as permitted sender) Received: from [128.244.251.37] (HELO jhuapl.edu) (128.244.251.37) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Nov 2012 21:21:43 +0000 Received: from ([128.244.198.90]) by piper.jhuapl.edu with ESMTP with TLS id 5Y8HCH1.148118746; Fri, 02 Nov 2012 17:21:17 -0400 Received: from aplesstripe.dom1.jhuapl.edu ([128.244.198.211]) by aplexcas1.dom1.jhuapl.edu ([128.244.198.90]) with mapi; Fri, 2 Nov 2012 17:21:17 -0400 From: "Cornish, Duane C." To: "user@accumulo.apache.org" , "vines@apache.org" Date: Fri, 2 Nov 2012 17:21:15 -0400 Subject: RE: Accumulo Map Reduce is not distributed Thread-Topic: Accumulo Map Reduce is not distributed Thread-Index: Ac25Pa3T8+/V9Yv8QFK9uvWm4xj3UQAAEQ3Q Message-ID: References: In-Reply-To: Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: acceptlanguage: en-US Content-Type: multipart/alternative; boundary="_000_AC78983C72177B4D9D1C14F7F4AEBA21438E6B0A96aplesstripedo_" MIME-Version: 1.0 X-Virus-Checked: Checked by ClamAV on apache.org --_000_AC78983C72177B4D9D1C14F7F4AEBA21438E6B0A96aplesstripedo_ Content-Type: text/plain; charset="iso-8859-1" Content-Transfer-Encoding: quoted-printable Thanks for the prompt response John! When I say that I'm pre-splitting my table, I mean I am using the tableOper= ations().addSplits(table,splits) command. I have verified that this is cor= rectly splitting my table into 4 tablets and it is being distributed across= my cloud before I start my map reduce job. Now, I only kick off the job once, but it appears that 4 separate jobs run = (one after the other). The first one reaches 100% in its map phase (and ba= sed on my output only handled =BC of the data), then the next job starts at= 0% and reaches 100%, and so on. So I think I'm "only running one mapper a= t a time in an MR job that has 4 mappers total.". I have 2 mapper slots pe= r node. My hadoop is set up so that one machine is the namenode and the ot= her 3 are datanodes. This gives me 6 slots total. (This is not congruent = to my accumulo where the master is also a slave - giving 4 total slaves). My map reduce job is not a chain job, so all 4 tablets should be able to ru= n at the same time. Here is my job class code below: import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; import org.apache.accumulo.core.client.mapreduce.AccumuloRowInputFormat; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.log4j.Level; public class Accumulo_FE_MR_Job extends Configured implements Tool{ private void runOneTable() throws Exception { System.out.println("Running Map Reduce Feature Extraction Job"); Job job =3D new Job(getConf(), getClass().getName()); job.setJarByClass(getClass()); job.setJobName("MRFE"); job.setInputFormatClass(AccumuloRowInputFormat.class); AccumuloRowInputFormat.setZooKeeperInstance(job.getConfiguration(), HMaxConstants.INSTANCE, HMaxConstants.ZOO_SERVERS); AccumuloRowInputFormat.setInputInfo(job.getConfiguration(), HMaxConstants.USER, HMaxConstants.PASSWORD.getBytes(), HMaxConstants.FEATLESS_IMG_TABLE, new Authorizations()); AccumuloRowInputFormat.setLogLevel(job.getConfiguration(), Level.FA= TAL); job.setMapperClass(AccumuloFEMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); job.setNumReduceTasks(4); job.setReducerClass(AccumuloFEReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(AccumuloOutputFormat.class); AccumuloOutputFormat.setZooKeeperInstance(job.getConfiguration(), HMaxConstants.INSTANCE, HMaxConstants.ZOO_SERVERS); AccumuloOutputFormat.setOutputInfo(job.getConfiguration(), HMaxConstants.USER, HMaxConstants.PASSWORD.getBytes(), true, HMaxConstants.ALL_IMG_TABLE); AccumuloOutputFormat.setLogLevel(job.getConfiguration(), Level.FATA= L); job.waitForCompletion(true); if (job.isSuccessful()) { System.err.println("Job Successful"); } else { System.err.println("Job Unsuccessful"); } } @Override public int run(String[] arg0) throws Exception { runOneTable(); return 0; } } Thanks, Duane From: John Vines [mailto:vines@apache.org] Sent: Friday, November 02, 2012 5:04 PM To: user@accumulo.apache.org Subject: Re: Accumulo Map Reduce is not distributed This sounds like an issue with how your MR environment is configured and/or= how you're kicking off your mapreduce. Accumulo's input formats with automatically set the number of mappers to th= e number of tablets you have, so you should have seen your job go from 1 ma= pper to 4. What you describe is you now do 4 MR jobs instead of just one, i= s that correct? Because that doesn't make a lot of sense, unless by prespli= tting your table you meant you now have 4 different support tables. Or do y= ou mean that you're only running one mapper at a time in an MR job that has= 4 mappers total? I believe it's somewhere in your kickoff that things may be a bit misconstr= ued. Just so I'm clear, how many mapper slots do you have per node, is your= job a chain MR job, and do you mind sharing your code which sets up and ki= cks off your MR job so I have an idea of what could be kicking off 4 jobs. John On Fri, Nov 2, 2012 at 4:53 PM, Cornish, Duane C. > wrote: Hello, I apologize if this discuss should be directed to a hadoop map reduce forum= , however, I have some concern that my problem may be with my use of accumu= lo. I have a map reduce job that I want to run over data in a table. I have an= index table and a support table which contains a subset of the data in the= index table. I would like to map reduce over the support table on my smal= l 4 node cluster. I have written a map reduce job that uses the AccumuloRowInputFormat class = and sets the support table as its input table. In my mapper, I read in a row of the support table, and make a call to a st= atic function which pulls information out of the index table. Next, I use = the data pulled back from the function call as input to a call to an extern= al .so file that is stored on the name node. I then make another static fu= nction call to ingest the new data back into the index table. (I know I co= uld emit this in the reduce step, but what I'm ingesting is formatted in a = somewhat complex java object and I already had a static function that inges= ted it the way I needed it.) My reduce step is completely empty. I output print statements from my mapper to see my progress. The problem t= hat I'm getting is that my entire job appears to run in sequence not in par= allel. I am running it from the accumulo master on the 4 node system. I realized that my support table is very small and was not being split acro= ss any tables. I am now presplitting this table across all 4 nodes. Now, = when I run the map reduce job it appears that 4 separate map reduce jobs ru= n one after each other. The first map reduce job runs, gets to 100%, then = the next map reduce job runs, etc. The job is only called once, why are th= ere 4 jobs running? Why won't these jobs run in parallel? Is there any way to set the number of tasks that can run? This is possible= from the hadoop command line, is it possible from the java API? Also, coul= d my problem stem from the fact that during my mapper I am making static fu= nction calls to another class in my java project, accessing my accumulo ind= ex table, or making a call to an exteral .so library? I could restructure = the job to avoid making static function calls and I could write directly to= the Accumulo table from my map reduce job if that would fix my problem. I= can't avoid making the external .so library call. Any help would be great= ly appreciated. Thanks, Duane --_000_AC78983C72177B4D9D1C14F7F4AEBA21438E6B0A96aplesstripedo_ Content-Type: text/html; charset="iso-8859-1" Content-Transfer-Encoding: quoted-printable

Thanks fo= r the prompt response John!

When I say that I̵= 7;m pre-splitting my table, I mean I am using the tableOperations().addSpli= ts(table,splits) command.=A0 I have verified that this is correctly splitti= ng my table into 4 tablets and it is being distributed across my cloud befo= re I start my map reduce job.

 

Now, I only kic= k off the job once, but it appears that 4 separate jobs run (one after the = other).=A0 The first one reaches 100% in its map phase (and based on my out= put only handled =BC of the data), then the next job starts at 0% and reach= es 100%, and so on.=A0 So I think I’m “only running one = mapper at a time in an MR job that has 4 mappers total.”.=A0 I= have 2 mapper slots per node.=A0 My hadoop is set up so that one machine i= s the namenode and the other 3 are datanodes.=A0 This gives me 6 slots tota= l.=A0 (This is not congruent to my accumulo where the master is also a slav= e – giving 4 total slaves).=A0

 

My map = reduce job is not a chain job, so all 4 tablets should be able to run at th= e same time.

 

Here is my job class code below:=

 

import org.apache.acc= umulo.core.security.Authorizations;

import org.apache.accumulo.core.client.mapreduce.AccumuloO= utputFormat;

<= span style=3D'font-size:10.0pt;font-family:Consolas;color:#7F0055'>import= org.apache.accumulo= .core.client.mapreduce.AccumuloRowInputFormat;

import org.apache.hadoop.conf.Configured= ;

import<= span style=3D'font-size:10.0pt;font-family:Consolas;color:black'> org.apach= e.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.util.= Tool;

import org.ap= ache.log4j.Level;

 

 

public class Accumulo_FE_MR_J= ob extends Configured implements Tool{

=A0=A0=A0=A0=A0=A0

=A0=A0=A0=A0=A0=A0 private void runOneTable() throws Exception {

=A0= =A0=A0=A0=A0=A0=A0 System.out.println("Running Map Reduce Feature= Extraction Job");=A0=A0=A0=A0=A0=A0

 

= =A0=A0=A0=A0=A0=A0=A0 Job job=A0 =3D new Job(getConf(), getClass().get= Name());<= /o:p>

 

<= p class=3DMsoNormal style=3D'text-autospace:none'>=A0=A0=A0=A0=A0=A0=A0 job.setJarBy= Class(getClass());

=A0=A0= =A0=A0=A0=A0=A0 job.setJobName("MRFE");

 

=A0=A0=A0=A0=A0=A0=A0 job.setInputFormatClass(AccumuloRowInputFormat.class);

=A0=A0=A0=A0=A0=A0=A0 AccumuloRowInputFormat.setZooKeeperInstance(job.getConfiguration(),

=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 HMaxConstants.INSTANCE,

=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 HMaxConstants.ZO= O_SERVERS);<= o:p>

 =

=A0=A0=A0=A0=A0=A0=A0 AccumuloRowInputFormat.setInputInfo(job.getConfiguration(),

=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 HMaxConstants.USER,

=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0HMaxConstants.PAS= SWORD.getBytes(),

=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0HMaxConstants.FEATLESS_IMG_= TABLE,<= /o:p>

=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 new Authorizations());

=A0=A0=A0=A0=A0=A0=A0

=A0=A0=A0=A0=A0=A0=A0=A0AccumuloRowInputFormat.setL= ogLevel(job.getConfiguration(), Level.FATAL);

 

=A0=A0=A0=A0=A0=A0=A0 job.setMapperClass(AccumuloFEMapper.cl= ass);

=A0=A0=A0=A0=A0=A0= =A0 job.setMapOutputKeyClass(Text.class);

=A0=A0=A0=A0=A0=A0=A0 job.setMapOutputValueClass(Doubl= eWritable.class);

 

=A0=A0=A0=A0=A0=A0= =A0 job.setNumReduceTasks(4);

=A0=A0=A0=A0=A0=A0=A0 job.setReducerClass(AccumuloFEReducer.<= span style=3D'font-size:10.0pt;font-family:Consolas;color:#7F0055'>class)= ;

=A0=A0=A0=A0=A0=A0=A0 jo= b.setOutputKeyClass(Text.class);

=A0=A0=A0=A0=A0=A0=A0 job.setOutputValueClass(Text.class);=

 

=A0=A0=A0=A0=A0=A0=A0 job.se= tOutputFormatClass(AccumuloOutputFormat.class);

=A0=A0=A0=A0=A0=A0=A0 AccumuloOutputFormat.se= tZooKeeperInstance(job.getConfiguration(),

=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 HMaxConstants.INSTANCE,

=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 HMaxCo= nstants.ZOO_SERVERS);

=A0=A0=A0=A0=A0=A0=A0 AccumuloOutputFormat.setOutputInfo(job.getCon= figuration(),<= o:p>

=A0=A0=A0=A0= =A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 HMaxConstants.USER= ,=

=A0=A0=A0=A0=A0=A0=A0 =A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 HMaxConstants.PASSWORD.getBytes(),<= /p>

=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0 true,

=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 HMaxConstants.ALL_IMG_TAB= LE);

 

=A0=A0=A0=A0=A0=A0=A0 AccumuloOutput= Format.setLogLevel(job.getConfiguration(), Level.FATAL= );=

 

=A0=A0=A0=A0=A0=A0=A0 job.waitForCompletion(tr= ue);

=A0=A0=A0=A0=A0=A0= =A0 if (job.isSuccessful()) {

=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 System.err.println(<= span style=3D'font-size:10.0pt;font-family:Consolas;color:#2A00FF'>"Jo= b Successful");

=A0=A0= =A0=A0=A0=A0=A0 } else {

=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 System.err.println("Job U= nsuccessful");

=A0=A0= =A0=A0=A0=A0=A0 }

=A0=A0= =A0=A0 }<= /o:p>

=A0=A0=A0=A0=A0= =A0

=A0=A0=A0=A0=A0=A0 <= /span>@= Override<= /o:p>

=A0=A0=A0=A0=A0= =A0 public int run(String[] arg0) throws Exception {<= span style=3D'font-size:10.0pt;font-family:Consolas'>

=

=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 runOneTable();

=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 return 0;

=A0=A0=A0=A0=A0=A0 }

}

 =

Thanks,

Duane

 

From: John Vines [mailto:vi= nes@apache.org]
Sent: Friday, November 02, 2012 5:04 PM
To= : user@accumulo.apache.org
Subject: Re: Accumulo Map Reduce i= s not distributed

 

This sounds like an issue with how your MR envir= onment is configured and/or how you're kicking off your mapreduce.

A= ccumulo's input formats with automatically set the number of mappers to the= number of tablets you have, so you should have seen your job go from 1 map= per to 4. What you describe is you now do 4 MR jobs instead of just one, is= that correct? Because that doesn't make a lot of sense, unless by presplit= ting your table you meant you now have 4 different support tables. Or do yo= u mean that you're only running one mapper at a time in an MR job that has = 4 mappers total?

I believe it's somewhere in your kickoff that thing= s may be a bit misconstrued. Just so I'm clear, how many mapper slots do yo= u have per node, is your job a chain MR job, and do you mind sharing your c= ode which sets up and kicks off your MR job so I have an idea of what could= be kicking off 4 jobs.

John

 

On Fri, Nov 2, 2012 at 4:53 PM, Cornish, Duane C. <Duane.Cornish@jhuapl.edu&g= t; wrote:

Hello,

 =

I apologize if this discuss should be directed to a h= adoop map reduce forum, however, I have some concern that my problem may be= with my use of accumulo. 

 

I have a map reduce job that I want to run over data in a table.&n= bsp; I have an index table and a support table which contains a subset of t= he data in the index table.  I would like to map reduce over the suppo= rt table on my small 4 node cluster. 

 

I have written a map reduce job that uses the AccumuloRow= InputFormat class and sets the support table as its input table.=

 

In my mapper, I read in a row of the= support table, and make a call to a static function which pulls informatio= n out of the index table.  Next, I use the data pulled back from the f= unction call as input to a call to an external .so file that is stored on t= he name node.  I then make another static function call to ingest the = new data back into the index table.  (I know I could emit this in the = reduce step, but what I’m ingesting is formatted in a somewhat comple= x java object and I already had a static function that ingested it the way = I needed it.)  My reduce step is completely empty.

 

I output print statements from my mapper to s= ee my progress.  The problem that I’m getting is that my entire = job appears to run in sequence not in parallel.  I am running it from = the accumulo master on the 4 node system. 

 = ;

I realized that my support table is very small and w= as not being split across any tables.  I am now presplitting this tabl= e across all 4 nodes.  Now, when I run the map reduce job it appears t= hat 4 separate map reduce jobs run one after each other.  The first ma= p reduce job runs, gets to 100%, then the next map reduce job runs, etc.&nb= sp; The job is only called once, why are there 4 jobs running?  Why wo= n’t these jobs run in parallel?

 

Is there any way to set the number of tasks that can run? = This is possible from the hadoop command line, is it possible from the jav= a API? Also, could my problem stem from the fact that during my mapper I am= making static function calls to another class in my java project, accessin= g my accumulo index table, or making a call to an exteral .so library? = ; I could restructure the job to avoid making static function calls and I c= ould write directly to the Accumulo table from my map reduce job if that wo= uld fix my problem.  I can’t avoid making the external .so libra= ry call.  Any help would be greatly appreciated. 

=

 

Thanks,

Duane<= /o:p>

 

=
= --_000_AC78983C72177B4D9D1C14F7F4AEBA21438E6B0A96aplesstripedo_--