Return-Path: X-Original-To: apmail-accumulo-user-archive@www.apache.org Delivered-To: apmail-accumulo-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6A742181EF for ; Mon, 4 May 2015 19:41:38 +0000 (UTC) Received: (qmail 40401 invoked by uid 500); 4 May 2015 19:41:38 -0000 Delivered-To: apmail-accumulo-user-archive@accumulo.apache.org Received: (qmail 40349 invoked by uid 500); 4 May 2015 19:41:38 -0000 Mailing-List: contact user-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@accumulo.apache.org Delivered-To: mailing list user@accumulo.apache.org Received: (qmail 40339 invoked by uid 99); 4 May 2015 19:41:38 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 May 2015 19:41:38 +0000 X-ASF-Spam-Status: No, hits=1.0 required=5.0 tests=SPF_SOFTFAIL X-Spam-Check-By: apache.org Received-SPF: softfail (athena.apache.org: transitioning domain of josh.elser@gmail.com does not designate 54.191.145.13 as permitted sender) Received: from [54.191.145.13] (HELO mx1-us-west.apache.org) (54.191.145.13) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 May 2015 19:41:32 +0000 Received: from mail-qc0-f169.google.com (mail-qc0-f169.google.com [209.85.216.169]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 5770124BF6 for ; Mon, 4 May 2015 19:41:12 +0000 (UTC) Received: by qcvz3 with SMTP id z3so28181171qcv.0 for ; Mon, 04 May 2015 12:40:26 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=message-id:date:from:user-agent:mime-version:to:subject:references :in-reply-to:content-type:content-transfer-encoding; bh=xXWlRaVq4u/nIRu95w7az8nEcDaCoVCg6EejD0fUDPY=; b=toIs7QwezEwb4tbPqR5XzHNw72iEdynFjHQNhW+AfWBxoCLfdgYq5RE0seu5EsVexM HPuYsAZQx/SPF1bGwf7oahBtM0wv/Sqo3CqOvBPdJrNF3O6D9glPxSiJccjLPwmP6iyE FJZFP2weFrZc+NWfVhs3iTpDeE6ihkTzBd2u0G11pFrV3VVHDhdINQSkrD+bGTCpjx3p N+D9CoYzdKi1TEJBn7H+RXapt6s3YPWsSgDjd6Bwkh2gdphqmpPN5rK8g5wU/fV0anre rbw3Iav28sNlgChLezaouwRNy5HiFD4oQoD4Y851JWnuDosCKtxoopTha6qmNsLRG8ye m7sw== X-Received: by 10.140.233.140 with SMTP id e134mr26580687qhc.63.1430768426460; Mon, 04 May 2015 12:40:26 -0700 (PDT) Received: from hw10447.local (pool-72-81-135-153.bltmmd.fios.verizon.net. [72.81.135.153]) by mx.google.com with ESMTPSA id y18sm10566700qgd.24.2015.05.04.12.40.25 for (version=TLSv1 cipher=ECDHE-RSA-RC4-SHA bits=128/128); Mon, 04 May 2015 12:40:25 -0700 (PDT) Message-ID: <5547CB28.8070502@gmail.com> Date: Mon, 04 May 2015 15:40:24 -0400 From: Josh Elser User-Agent: Postbox 3.0.11 (Macintosh/20140602) MIME-Version: 1.0 To: user@accumulo.apache.org Subject: Re: spark with AccumuloRowInputFormat? References: In-Reply-To: Content-Type: text/plain; charset=UTF-8; format=flowed Content-Transfer-Encoding: 7bit X-Virus-Checked: Checked by ClamAV on apache.org Thanks _so_ much for taking the time to write this up, Marc! It's a good example. One note, you probably want to use an priority greater than 20 for the IteratorSetting. The VersioningIterator is set on Accumulo tables by default at priority 20. In most cases, you'd want to see the state of the table _after_ the VersioningIterator filters things. Marc Reichman wrote: > This is working very well, thanks Russ! > > For anyone ever stuck in this predicament, using the WholeRowIterator, I > was able to get the same Iterator> that I can get > similarly to the AccumuloRowInputFormat as follows: > > ... > > IteratorSetting iteratorSetting =newIteratorSetting(1, WholeRowIterator.class); > AccumuloInputFormat.addIterator(job, iteratorSetting); > > // setup RDD > JavaPairRDD pairRDD = sparkContext.newAPIHadoopRDD(job.getConfiguration(), > AccumuloInputFormat.class, > Key.class, Value.class); > > JavaRDD> result = pairRDD > .map(newFunction, List>() { > @Override > publicList call(Tuple2 keyValueTuple2)throwsException { > SortedMap wholeRow = WholeRowIterator.decodeRow(keyValueTuple2._1, keyValueTuple2._2); > MyObject o = getMyObject(wholeRow.entrySet().iterator()); > *...* > } > }); > > Previously, I was doing this approach, which required an additional stage of Spark calculations as well as a shuffle phase, and wasn't nearly as quick, and also needed a helper class (AccumuloRowMapEntry, very basic Map.Entry implementation): > > JavaRDD> result = pairRDD > .mapToPair(newPairFunction, Text, Map.Entry>() { > @Override > publicTuple2> call(Tuple2 keyValueTuple2)throwsException { > return newTuple2>(keyValueTuple2._1.getRow(),newAccumuloRowMapEntry(keyValueTuple2._1, keyValueTuple2._2)); > } > }) > .groupByKey() > .map(newFunction>>, List>() { > @Override > publicList call(Tuple2>> textIterableTuple2)throwsException { > MyObject o = getMyObject(textIterableTuple2._2.iterator()); > *...* > } > }); > > > Thanks again for all the help. > > Marc > > > On Mon, May 4, 2015 at 12:23 PM, Russ Weeks > wrote: > > Yeah, exactly. When you put the WholeRowIterator on the scan, > instead of seeing all the Key,Value pairs that make up a row you'll > see a single Key,Value pair. The only part of the Key that matters > is the row id. The Value is an encoded map of the Key,Value pairs > that constitute the row. Call the static method > WholeRowIterator.decodeRow to get at this map. > > The decoded Keys have all the CF, CQ, timestamp and visibility data > populated. I'm not sure if they have the row ID populated; either > way, they all belong to the same row that was present in the > original Key. > > -Russ > > > On Mon, May 4, 2015 at 9:51 AM, Marc Reichman > > > wrote: > > Hi Russ, > > How exactly would this work regarding column qualifiers, etc, as > those are part of the key? I apologize but I'm not as familiar > with the WholeRowIterator use model, does it consolidate based > on the rowkey, and then return some Key+Value "value" which has > all the original information serialized? > > My rows aren't gigantic but they can occasionally get into the > 10s of MB. > > On Mon, May 4, 2015 at 11:22 AM, Russ Weeks > > wrote: > > Hi, Marc, > > If your rows are small you can use the WholeRowIterator to > get all the values with the key in one consuming function. > If your rows are big but you know up-front that you'll only > need a small part of each row, you could put a filter in > front of the WholeRowIterator. > > I expect there's a performance hit (I haven't done any > benchmarks myself) because of the extra > serialization/deserialization but it's a very convenient way > of working with Rows in Spark. > > Regards, > -Russ > > On Mon, May 4, 2015 at 8:46 AM, Marc Reichman > > wrote: > > Has anyone done any testing with Spark and > AccumuloRowInputFormat? I have no problem doing this for > AccumuloInputFormat: > > JavaPairRDD pairRDD = sparkContext.newAPIHadoopRDD(job.getConfiguration(), > AccumuloInputFormat.class, > Key.class, Value.class); > > But I run into a snag trying to do a similar thing: > > JavaPairRDD>> pairRDD = sparkContext.newAPIHadoopRDD(job.getConfiguration(), > AccumuloRowInputFormat.class, > Text.class, PeekingIterator.class); > > The compilation error is (big, sorry): > > Error:(141, 97) java: method newAPIHadoopRDD in class org.apache.spark.api.java.JavaSparkContext cannot be applied to given types; > required: org.apache.hadoop.conf.Configuration,java.lang.Class,java.lang.Class,java.lang.Class > found: org.apache.hadoop.conf.Configuration,java.lang.Class,java.lang.Class,java.lang.Class > reason: inferred type does not conform to declared bound(s) > inferred: org.apache.accumulo.core.client.mapreduce.AccumuloRowInputFormat > bound(s): org.apache.hadoop.mapreduce.InputFormat > > I've tried a few things, the signature of the function is: > > public> JavaPairRDD newAPIHadoopRDD(Configuration conf, Class fClass, Class kClass, Class vClass) > > I guess it's having trouble with the format extending InputFormatBase with its own additional generic parameters (the Map.Entry inside PeekingIterator). > > This may be an issue to chase with Spark vs Accumulo, unless something can be tweaked on the Accumulo side or I could wrap the InputFormat with my own somehow. > > Accumulo 1.6.1, Spark 1.3.1, JDK 7u71. > > Stopping short of this, can anyone think of a good way to use AccumuloInputFormat to get what I'm getting from the Row version in a performant way? It doesn't necessarily have to be an iterator approach, but I'd need all my values with the key in one consuming function. I'm looking into ways to do it in spark functions but trying to avoid any major performance hits. > > Thanks, > > Marc > > p.s. The summit was absolutely great, thank you all for having it! > > > > >