accumulo-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Marc Reichman <mreich...@pixelforensics.com>
Subject Re: spark with AccumuloRowInputFormat?
Date Mon, 04 May 2015 19:08:37 GMT
This is working very well, thanks Russ!

For anyone ever stuck in this predicament, using the WholeRowIterator, I
was able to get the same Iterator<Map.Entry<Key,Value>> that I can get
similarly to the AccumuloRowInputFormat as follows:

...

IteratorSetting iteratorSetting = new IteratorSetting(1,
WholeRowIterator.class);
AccumuloInputFormat.addIterator(job, iteratorSetting);

// setup RDD
JavaPairRDD<Key, Value> pairRDD =
sparkContext.newAPIHadoopRDD(job.getConfiguration(),
        AccumuloInputFormat.class,
        Key.class, Value.class);

JavaRDD<List<MyResult>> result = pairRDD
        .map(new Function<Tuple2<Key, Value>, List<MyResult>>() {
            @Override
            public List<MyResult> call(Tuple2<Key, Value>
keyValueTuple2) throws Exception {
                SortedMap<Key, Value> wholeRow =
WholeRowIterator.decodeRow(keyValueTuple2._1, keyValueTuple2._2);
                MyObject o = getMyObject(wholeRow.entrySet().iterator());
                *...*
            }
        });

Previously, I was doing this approach, which required an additional
stage of Spark calculations as well as a shuffle phase, and wasn't
nearly as quick, and also needed a helper class (AccumuloRowMapEntry,
very basic Map.Entry implementation):

JavaRDD<List<MyResult>> result = pairRDD
        .mapToPair(new PairFunction<Tuple2<Key, Value>, Text,
Map.Entry<Key, Value>>() {
            @Override
            public Tuple2<Text, Map.Entry<Key, Value>>
call(Tuple2<Key, Value> keyValueTuple2) throws Exception {
                return new Tuple2<Text, Map.Entry<Key,
Value>>(keyValueTuple2._1.getRow(), new
AccumuloRowMapEntry(keyValueTuple2._1, keyValueTuple2._2));
            }
        })
        .groupByKey()
        .map(new Function<Tuple2<Text, Iterable<Map.Entry<Key,
Value>>>, List<MyResult>>() {
            @Override
            public List<MyResult> call(Tuple2<Text,
Iterable<Map.Entry<Key, Value>>> textIterableTuple2) throws Exception
{
                MyObject o = getMyObject(textIterableTuple2._2.iterator());
                *...*
            }
        });


Thanks again for all the help.

Marc


On Mon, May 4, 2015 at 12:23 PM, Russ Weeks <rweeks@newbrightidea.com>
wrote:

> Yeah, exactly. When you put the WholeRowIterator on the scan, instead of
> seeing all the Key,Value pairs that make up a row you'll see a single
> Key,Value pair. The only part of the Key that matters is the row id. The
> Value is an encoded map of the Key,Value pairs that constitute the row.
> Call the static method WholeRowIterator.decodeRow to get at this map.
>
> The decoded Keys have all the CF, CQ, timestamp and visibility data
> populated. I'm not sure if they have the row ID populated; either way, they
> all belong to the same row that was present in the original Key.
>
> -Russ
>
>
> On Mon, May 4, 2015 at 9:51 AM, Marc Reichman <
> mreichman@pixelforensics.com> wrote:
>
>> Hi Russ,
>>
>> How exactly would this work regarding column qualifiers, etc, as those
>> are part of the key? I apologize but I'm not as familiar with the
>> WholeRowIterator use model, does it consolidate based on the rowkey, and
>> then return some Key+Value "value" which has all the original information
>> serialized?
>>
>> My rows aren't gigantic but they can occasionally get into the 10s of MB.
>>
>> On Mon, May 4, 2015 at 11:22 AM, Russ Weeks <rweeks@newbrightidea.com>
>> wrote:
>>
>>> Hi, Marc,
>>>
>>> If your rows are small you can use the WholeRowIterator to get all the
>>> values with the key in one consuming function. If your rows are big but you
>>> know up-front that you'll only need a small part of each row, you could put
>>> a filter in front of the WholeRowIterator.
>>>
>>> I expect there's a performance hit (I haven't done any benchmarks
>>> myself) because of the extra serialization/deserialization but it's a very
>>> convenient way of working with Rows in Spark.
>>>
>>> Regards,
>>> -Russ
>>>
>>> On Mon, May 4, 2015 at 8:46 AM, Marc Reichman <
>>> mreichman@pixelforensics.com> wrote:
>>>
>>>> Has anyone done any testing with Spark and AccumuloRowInputFormat? I
>>>> have no problem doing this for AccumuloInputFormat:
>>>>
>>>> JavaPairRDD<Key, Value> pairRDD = sparkContext.newAPIHadoopRDD(job.getConfiguration(),
>>>>         AccumuloInputFormat.class,
>>>>         Key.class, Value.class);
>>>>
>>>> But I run into a snag trying to do a similar thing:
>>>>
>>>> JavaPairRDD<Text, PeekingIterator<Map.Entry<Key, Value>>>
pairRDD = sparkContext.newAPIHadoopRDD(job.getConfiguration(),
>>>>         AccumuloRowInputFormat.class,
>>>>         Text.class, PeekingIterator.class);
>>>>
>>>> The compilation error is (big, sorry):
>>>>
>>>> Error:(141, 97) java: method newAPIHadoopRDD in class org.apache.spark.api.java.JavaSparkContext
cannot be applied to given types;
>>>>   required: org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>
>>>>   found: org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.accumulo.core.client.mapreduce.AccumuloRowInputFormat>,java.lang.Class<org.apache.hadoop.io.Text>,java.lang.Class<org.apache.accumulo.core.util.PeekingIterator>
>>>>   reason: inferred type does not conform to declared bound(s)
>>>>     inferred: org.apache.accumulo.core.client.mapreduce.AccumuloRowInputFormat
>>>>     bound(s): org.apache.hadoop.mapreduce.InputFormat<org.apache.hadoop.io.Text,org.apache.accumulo.core.util.PeekingIterator>
>>>>
>>>> I've tried a few things, the signature of the function is:
>>>>
>>>> public <K, V, F extends org.apache.hadoop.mapreduce.InputFormat<K,
V>> JavaPairRDD<K, V> newAPIHadoopRDD(Configuration conf, Class<F> fClass,
Class<K> kClass, Class<V> vClass)
>>>>
>>>> I guess it's having trouble with the format extending InputFormatBase
>>>> with its own additional generic parameters (the Map.Entry inside
>>>> PeekingIterator).
>>>>
>>>> This may be an issue to chase with Spark vs Accumulo, unless something
>>>> can be tweaked on the Accumulo side or I could wrap the InputFormat with
my
>>>> own somehow.
>>>>
>>>> Accumulo 1.6.1, Spark 1.3.1, JDK 7u71.
>>>>
>>>> Stopping short of this, can anyone think of a good way to use
>>>> AccumuloInputFormat to get what I'm getting from the Row version in a
>>>> performant way? It doesn't necessarily have to be an iterator approach, but
>>>> I'd need all my values with the key in one consuming function. I'm looking
>>>> into ways to do it in spark functions but trying to avoid any major
>>>> performance hits.
>>>>
>>>> Thanks,
>>>>
>>>> Marc
>>>>
>>>> p.s. The summit was absolutely great, thank you all for having it!
>>>>
>>>>
>>>
>>
>

Mime
View raw message