incubator-crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: using multiple HBase sources
Date Wed, 05 Dec 2012 16:37:21 GMT
Hey Jacob,

The challenge with consuming multiple sources of the same type is that the
Configuration information will conflict (e.g., the name of the table has to
be written to a certain field in a Configuration object, so writing it
twice will overwrite the previous value.) This was a limitation in earlier
revisions of Crunch, but we came up with the InputBundle abstraction
(o.a.c.io.impl.InputBundle) to work around it, and I just haven't gotten
around to updating the HBaseSourceTarget to take advantage of it.

We would want to move the InputBundle from o.a.c.io.impl to o.a.c.io and
make it user-facing, and then update crunch-hbase to take advantage of it.
If you file a JIRA for it, I'll make sure that it gets done.

J


On Wed, Dec 5, 2012 at 8:28 AM, Williams,Jacob <Jacob.Williams@cerner.com>wrote:

>   When attempting to join or cogroup collections that originate from two
> different HBaseSourceTargets, I get the error below. HBaseSourceTarget does
> not call CrunchInputs.addInputPath, so when CrunchInputs.getFormatNodeMap
> is called, RuntimeParameters.MULTI_INPUTS is null in the configuration. Any
> thoughts on how to fix or work around this?
>
>  Thanks,
> Jacob Williams
>
>
>  Error:
>
>  12/12/05 09:56:02 INFO exec.CrunchJob: java.lang.NullPointerException
> at
> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:187)
> at com.google.common.base.Splitter.split(Splitter.java:371)
> at
> org.apache.crunch.impl.mr.run.CrunchInputs.getFormatNodeMap(CrunchInputs.java:51)
> at
> org.apache.crunch.impl.mr.run.CrunchInputFormat.getSplits(CrunchInputFormat.java:44)
> at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:1014)
> at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1031)
> at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:172)
> at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:943)
> at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:896)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:396)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
> at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:896)
> at org.apache.hadoop.mapreduce.Job.submit(Job.java:531)
> at
> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.submit(CrunchControlledJob.java:331)
> at org.apache.crunch.impl.mr.exec.CrunchJob.submit(CrunchJob.java:126)
> at
> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.startReadyJobs(CrunchJobControl.java:247)
> at
> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.run(CrunchJobControl.java:275)
> at java.lang.Thread.run(Thread.java:662)
>
>
>  Example which produces this error:
>
>  import java.io.IOException;
>
>  import org.apache.crunch.MapFn;
> import org.apache.crunch.PCollection;
> import org.apache.crunch.PTable;
> import org.apache.crunch.Pair;
> import org.apache.crunch.io.hbase.HBaseSourceTarget;
> import org.apache.crunch.Pipeline;
> import org.apache.crunch.impl.mr.MRPipeline;
> import org.apache.crunch.io.hbase.HBaseSourceTarget;
> import org.apache.crunch.types.writable.Writables;
>
>  import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.hbase.HBaseConfiguration;
> import org.apache.hadoop.hbase.client.Result;
> import org.apache.hadoop.hbase.client.Scan;
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
>
>  public class Main {
>   public static void main(String[] args) throws IOException {
>     Configuration config = HBaseConfiguration.create();
>
>      Pipeline pipeline = new MRPipeline(Main.class, "join-problem",
> config);
>
>      PTable<ImmutableBytesWritable, Result> results1 = pipeline.read(new
> HBaseSourceTarget("some_table", new Scan()));
>     PTable<ImmutableBytesWritable, Result> results2 = pipeline.read(new
> HBaseSourceTarget("some_table", new Scan()));
>     pipeline.writeTextFile(results1.join(results2), "join-test");
>     pipeline.run();
>   }
> }
>  CONFIDENTIALITY NOTICE This message and any included attachments are
> from Cerner Corporation and are intended only for the addressee. The
> information contained in this message is confidential and may constitute
> inside or non-public information under international, federal, or state
> securities laws. Unauthorized forwarding, printing, copying, distribution,
> or use of such information is strictly prohibited and may be unlawful. If
> you are not the addressee, please promptly delete this message and notify
> the sender of the delivery error by e-mail or you may call Cerner's
> corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message