accumulo-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Marc Reichman <mreich...@pixelforensics.com>
Subject Re: spark with AccumuloRowInputFormat?
Date Mon, 04 May 2015 19:50:38 GMT
Thanks Josh. I will make that change to be safe, though in these
experiments I use a maxversions of 1 anyway.

I look forward to seeing the definitive Accumulo + Spark guide some day,
glad to help where I can if there are specific things to fill in.

On Mon, May 4, 2015 at 2:40 PM, Josh Elser <josh.elser@gmail.com> wrote:

> Thanks _so_ much for taking the time to write this up, Marc! It's a good
> example.
>
> One note, you probably want to use an priority greater than 20 for the
> IteratorSetting. The VersioningIterator is set on Accumulo tables by
> default at priority 20. In most cases, you'd want to see the state of the
> table _after_ the VersioningIterator filters things.
>
> Marc Reichman wrote:
>
>> This is working very well, thanks Russ!
>>
>> For anyone ever stuck in this predicament, using the WholeRowIterator, I
>> was able to get the same Iterator<Map.Entry<Key,Value>> that I can get
>> similarly to the AccumuloRowInputFormat as follows:
>>
>> ...
>>
>> IteratorSetting iteratorSetting =newIteratorSetting(1,
>> WholeRowIterator.class);
>> AccumuloInputFormat.addIterator(job, iteratorSetting);
>>
>> // setup RDD
>> JavaPairRDD<Key, Value>  pairRDD =
>> sparkContext.newAPIHadoopRDD(job.getConfiguration(),
>>          AccumuloInputFormat.class,
>>          Key.class, Value.class);
>>
>> JavaRDD<List<MyResult>>  result = pairRDD
>>          .map(newFunction<Tuple2<Key, Value>, List<MyResult>>()
{
>>              @Override
>>              publicList<MyResult>  call(Tuple2<Key, Value>
>> keyValueTuple2)throwsException {
>>                  SortedMap<Key, Value>  wholeRow =
>> WholeRowIterator.decodeRow(keyValueTuple2._1, keyValueTuple2._2);
>>                  MyObject o = getMyObject(wholeRow.entrySet().iterator());
>>                  *...*
>>              }
>>          });
>>
>> Previously, I was doing this approach, which required an additional stage
>> of Spark calculations as well as a shuffle phase, and wasn't nearly as
>> quick, and also needed a helper class (AccumuloRowMapEntry, very basic
>> Map.Entry implementation):
>>
>> JavaRDD<List<MyResult>>  result = pairRDD
>>          .mapToPair(newPairFunction<Tuple2<Key, Value>, Text,
>> Map.Entry<Key, Value>>() {
>>              @Override
>>              publicTuple2<Text, Map.Entry<Key, Value>>  call(Tuple2<Key,
>> Value>  keyValueTuple2)throwsException {
>>                  return newTuple2<Text, Map.Entry<Key,
>> Value>>(keyValueTuple2._1.getRow(),newAccumuloRowMapEntry(keyValueTuple2._1,
>> keyValueTuple2._2));
>>              }
>>          })
>>          .groupByKey()
>>          .map(newFunction<Tuple2<Text, Iterable<Map.Entry<Key, Value>>>,
>> List<MyResult>>() {
>>              @Override
>>              publicList<MyResult>  call(Tuple2<Text,
>> Iterable<Map.Entry<Key, Value>>>  textIterableTuple2)throwsException
{
>>                  MyObject o =
>> getMyObject(textIterableTuple2._2.iterator());
>>                  *...*
>>              }
>>          });
>>
>>
>> Thanks again for all the help.
>>
>> Marc
>>
>>
>> On Mon, May 4, 2015 at 12:23 PM, Russ Weeks <rweeks@newbrightidea.com
>> <mailto:rweeks@newbrightidea.com>> wrote:
>>
>>     Yeah, exactly. When you put the WholeRowIterator on the scan,
>>     instead of seeing all the Key,Value pairs that make up a row you'll
>>     see a single Key,Value pair. The only part of the Key that matters
>>     is the row id. The Value is an encoded map of the Key,Value pairs
>>     that constitute the row. Call the static method
>>     WholeRowIterator.decodeRow to get at this map.
>>
>>     The decoded Keys have all the CF, CQ, timestamp and visibility data
>>     populated. I'm not sure if they have the row ID populated; either
>>     way, they all belong to the same row that was present in the
>>     original Key.
>>
>>     -Russ
>>
>>
>>     On Mon, May 4, 2015 at 9:51 AM, Marc Reichman
>>     <mreichman@pixelforensics.com <mailto:mreichman@pixelforensics.com>>
>>     wrote:
>>
>>         Hi Russ,
>>
>>         How exactly would this work regarding column qualifiers, etc, as
>>         those are part of the key? I apologize but I'm not as familiar
>>         with the WholeRowIterator use model, does it consolidate based
>>         on the rowkey, and then return some Key+Value "value" which has
>>         all the original information serialized?
>>
>>         My rows aren't gigantic but they can occasionally get into the
>>         10s of MB.
>>
>>         On Mon, May 4, 2015 at 11:22 AM, Russ Weeks
>>         <rweeks@newbrightidea.com <mailto:rweeks@newbrightidea.com>>
>> wrote:
>>
>>             Hi, Marc,
>>
>>             If your rows are small you can use the WholeRowIterator to
>>             get all the values with the key in one consuming function.
>>             If your rows are big but you know up-front that you'll only
>>             need a small part of each row, you could put a filter in
>>             front of the WholeRowIterator.
>>
>>             I expect there's a performance hit (I haven't done any
>>             benchmarks myself) because of the extra
>>             serialization/deserialization but it's a very convenient way
>>             of working with Rows in Spark.
>>
>>             Regards,
>>             -Russ
>>
>>             On Mon, May 4, 2015 at 8:46 AM, Marc Reichman
>>             <mreichman@pixelforensics.com
>>             <mailto:mreichman@pixelforensics.com>> wrote:
>>
>>                 Has anyone done any testing with Spark and
>>                 AccumuloRowInputFormat? I have no problem doing this for
>>                 AccumuloInputFormat:
>>
>>                 JavaPairRDD<Key, Value>  pairRDD =
>> sparkContext.newAPIHadoopRDD(job.getConfiguration(),
>>                          AccumuloInputFormat.class,
>>                          Key.class, Value.class);
>>
>>                 But I run into a snag trying to do a similar thing:
>>
>>                 JavaPairRDD<Text, PeekingIterator<Map.Entry<Key,
>> Value>>>  pairRDD = sparkContext.newAPIHadoopRDD(job.getConfiguration(),
>>                          AccumuloRowInputFormat.class,
>>                          Text.class, PeekingIterator.class);
>>
>>                 The compilation error is (big, sorry):
>>
>>                 Error:(141, 97) java: method newAPIHadoopRDD in class
>> org.apache.spark.api.java.JavaSparkContext cannot be applied to given types;
>>                    required:
>> org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>
>>                    found:
>> org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.accumulo.core.client.mapreduce.AccumuloRowInputFormat>,java.lang.Class<org.apache.hadoop.io.Text>,java.lang.Class<org.apache.accumulo.core.util.PeekingIterator>
>>                    reason: inferred type does not conform to declared
>> bound(s)
>>                      inferred:
>> org.apache.accumulo.core.client.mapreduce.AccumuloRowInputFormat
>>                      bound(s):
>> org.apache.hadoop.mapreduce.InputFormat<org.apache.hadoop.io.Text,org.apache.accumulo.core.util.PeekingIterator>
>>
>>                 I've tried a few things, the signature of the function is:
>>
>>                 public<K, V,
>> Fextendsorg.apache.hadoop.mapreduce.InputFormat<K, V>>  JavaPairRDD<K,
V>
>> newAPIHadoopRDD(Configuration conf, Class<F>  fClass, Class<K>  kClass,
>> Class<V>  vClass)
>>
>>                 I guess it's having trouble with the format extending
>> InputFormatBase with its own additional generic parameters (the Map.Entry
>> inside PeekingIterator).
>>
>>                 This may be an issue to chase with Spark vs Accumulo,
>> unless something can be tweaked on the Accumulo side or I could wrap the
>> InputFormat with my own somehow.
>>
>>                 Accumulo 1.6.1, Spark 1.3.1, JDK 7u71.
>>
>>                 Stopping short of this, can anyone think of a good way to
>> use AccumuloInputFormat to get what I'm getting from the Row version in a
>> performant way? It doesn't necessarily have to be an iterator approach, but
>> I'd need all my values with the key in one consuming function. I'm looking
>> into ways to do it in spark functions but trying to avoid any major
>> performance hits.
>>
>>                 Thanks,
>>
>>                 Marc
>>
>>                 p.s. The summit was absolutely great, thank you all for
>> having it!
>>
>>
>>
>>
>>
>>

Mime
View raw message