incubator-cassandra-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From William Oberman <>
Subject Re: using hadoop + cassandra for CF mutations (delete)
Date Tue, 08 Apr 2014 13:13:23 GMT
I use PHP, and phpCassa to talk to cassandra from within my app.  I'm using
the below script's structure as a way to run a local mutation on each of my


use phpcassa\ColumnFamily;
use phpcassa\Connection\ConnectionPool;
use phpcassa\SystemManager;

try {
    //i'm sure there is a cleaner way of doing this, but for me I can't
connect as localhost, I need to use the AWS private IP
    $ip = exec("/sbin/ifconfig  | grep 'inet addr:'| grep -v '' |
cut -d: -f2 | awk '{ print $1}'")
    $localCassandra = "$ip:9160";
    $keyspace = "";
    $cf = "";

    $systemManager = new SystemManager($localCassandra);
    $ring = $systemManager->describe_ring($keyspace);
    $startToken = null;
    $endToken = null;
    foreach ($ring as $ringDetails) {
        //There is an endpoint per RF, with the 1st == "owner"
        foreach ($ringDetails->endpoints as $endpoint) {
            if ($endpoint == $ip) {
                $startToken = $ringDetails->start_token;
                $endToken = $ringDetails->end_token;
        if ($startToken != null && $endToken != null) {

    if ($startToken == null || $endToken == null) {
        fwrite(STDERR, "My ip[$ip] not in ring = " . print_r($ring, true));

    $pool = new ConnectionPool($keyspace, array($localCassandra));
    $column_family = new ColumnFamily($pool, $cf);
    //I patched my local phpCassa to support null == iterate over all rows.
    //    if my pull request is accepted it will be in the git repo.
 otherwise put
    //    a large int as arg 3, something hopefully larger than all rows on
a node...
    foreach ($column_family->get_range_by_token($startToken, $endToken,
null) as $key => $columns) {
        foreach ($columns as $cn => $cv) {
            //I track information here
        //and do an optional mutation here based on the row
} catch (Exception $e) {
    fwrite(STDERR, $e);

On Fri, Apr 4, 2014 at 1:40 PM, William Oberman <>wrote:

> Looking at the code, cassandra.input.split.size==Pig URL split_size,
> right?  But, in cassandra 1.2.15 I'm wondering if there is a bug that would
> make the hadoop conf setting cassandra.input.split.size not be used
> unless you manually set the URI to splitSize=0 (because the abstract class
> defaults the splitSize to 64k instead of 0)?  Long story short though,
> I've messed with that setting in the direction you suggested (decreasing),
> and I'm confident hadoop/pig was picking it up (I eventually decreased it
> too far, which caused an server side error of too much memory used).
> I'm stuck in a "rock & a hard place" on the mappers.  At 20 tasks, based
> on the delete rate before time out failures happen, it was going to take
> 1-2 days to run the deletes (I was seeing ~10k deletes/sec across all 20
> task threads).   But, this is going to be be my plan at this point: less
> tasks at once, even if it takes a week (of hopefully unsupervised time).
> Thanks for the feedback!
> On Fri, Apr 4, 2014 at 12:57 PM, Paulo Ricardo Motta Gomes <
>> wrote:
>> You said you have tried the Pig URL split_size, but have you actually
>> tried decreasing the value of cassandra.input.split.size hadoop property?
>> The default is 65536, so you may want to decrease that to see if the number
>> of mappers increase. But at some point, even if you lower that value it
>> will stop decreasing the number of mappers but I don't know exactly why,
>> probably because it hits the minimum number of rows per token.
>> Another suggestion is to decrease the number of simultaneous mappers of
>> your job, so it doesn't hit cassandra too hard, and you'll get less
>> TimedOutExceptions, but your job will take longer to complete.
>> On Fri, Apr 4, 2014 at 1:24 PM, William Oberman <
>> > wrote:
>>> Hi,
>>> I have some history with cassandra + hadoop:
>>> 1.) Single DC + integrated hadoop = Was "ok" until I needed steady
>>> performance (the single DC was used in a production environment)
>>> 2.) Two DC's + integrated hadoop on 1 of 2 DCs = Was "ok" until my data
>>> grew and in AWS compute is expensive compared to data storage... e.g.
>>> running a 24x7 DC was a lot more expensive than the following solution...
>>> 3.) Single DC + a constant "ETL" to S3 = Is still ok, I can spawn an
>>> "arbitrarily large" EMR cluster.  And 24x7 data storage + transient EMR is
>>> cost effective.
>>> But, one of my CF's has had a change of usage pattern making a large %,
>>> but not all of the data, fairly pointless to store.  I thought I'd write a
>>> Pig UDF that could peek at a row of data and delete if it fails my
>>> criteria.  And it "works" in terms of logic, but not in terms of practical
>>> execution.  The CF in question has O(billion) keys, and afterwards it will
>>> have ~10% of that at most.
>>> I basically keep losing the jobs due to too many task failures, all
>>> rooted in:
>>> Caused by: TimedOutException()
>>> at
>>> org.apache.cassandra.thrift.Cassandra$
>>> And yes, I've messed around with:
>>> -Number of failures for map/reduce/tracker (in the hadoop confs)
>>> -split_size (on the URL)
>>> -cassandra.range.batch.size
>>> But it hasn't helped.  My failsafe is to roll my own distributed
>>> process, rather than falling into a pit of internal hadoop settings.  But I
>>> feel like I'm close.
>>> The problem in my opinion, watching how things are going, is the
>>> correlation of splits <-> tasks.  I'm obviously using Pig, so this part
>>> the process is fairly opaque to me at the moment.  But, "something
>>> somewhere" is picking 20 tasks for my job, and this is fairly independent
>>> of the # of task slots (I've booted EMR cluster with different #'s and
>>> always get 20).  Why does this matter?  When a task fails, it retries from
>>> the start, which is a killer for me as I "delete as I go", making that
>>> pointless work and massively increasing the odds of an overall job failure.
>>>  If hadoop/pig chose a large number of tasks, the retries would be much
>>> less of a burden.  But, I don't see where/what lets me mess with that logic.
>>> Pig gives the ability to mess with reducers (PARALLEL), but I'm in the
>>> load path, which is all mappers.  I've never jumped to the lower, raw
>>> hadoop level before.  But, I'm worried that will be the "falling into a
>>> pit" issue...
>>> I'm using Cassandra 1.2.15.
>>> will
>> --
>> *Paulo Motta*
>> Chaordic | *Platform*
>> * <>*
>> +55 48 3232.3200
>> +55 83 9690-1314

View raw message