I use PHP, and phpCassa to talk to cassandra from within my app.  I'm using the below script's structure as a way to run a local mutation on each of my nodes:

=======
<?php
require_once('PATH/TO/phpcassa-1.0.a.6/lib/autoload.php');

use phpcassa\ColumnFamily;
use phpcassa\Connection\ConnectionPool;
use phpcassa\SystemManager;

try {
    //i'm sure there is a cleaner way of doing this, but for me I can't connect as localhost, I need to use the AWS private IP
    $ip = exec("/sbin/ifconfig  | grep 'inet addr:'| grep -v '127.0.0.1' | cut -d: -f2 | awk '{ print $1}'")
    $localCassandra = "$ip:9160";
    $keyspace = "";
    $cf = "";

    $systemManager = new SystemManager($localCassandra);
    $ring = $systemManager->describe_ring($keyspace);
    $startToken = null;
    $endToken = null;
    foreach ($ring as $ringDetails) {
        //There is an endpoint per RF, with the 1st == "owner"
        foreach ($ringDetails->endpoints as $endpoint) {
            if ($endpoint == $ip) {
                $startToken = $ringDetails->start_token;
                $endToken = $ringDetails->end_token;
            }
            break;
        }
        if ($startToken != null && $endToken != null) {
            break;
        }
    }

    if ($startToken == null || $endToken == null) {
        fwrite(STDERR, "My ip[$ip] not in ring = " . print_r($ring, true));
        exit(1);
    }

    $pool = new ConnectionPool($keyspace, array($localCassandra));
    $column_family = new ColumnFamily($pool, $cf);
    //I patched my local phpCassa to support null == iterate over all rows.
    //    if my pull request is accepted it will be in the git repo.  otherwise put 
    //    a large int as arg 3, something hopefully larger than all rows on a node...
    foreach ($column_family->get_range_by_token($startToken, $endToken, null) as $key => $columns) {
        foreach ($columns as $cn => $cv) {
            //I track information here
        }
        //and do an optional mutation here based on the row
    }
} catch (Exception $e) {
    fwrite(STDERR, $e);
}
============


On Fri, Apr 4, 2014 at 1:40 PM, William Oberman <oberman@civicscience.com> wrote:
Looking at the code, cassandra.input.split.size==Pig URL split_size, right?  But, in cassandra 1.2.15 I'm wondering if there is a bug that would make the hadoop conf setting cassandra.input.split.size not be used unless you manually set the URI to splitSize=0 (because the abstract class defaults the splitSize to 64k instead of 0)?  Long story short though, I've messed with that setting in the direction you suggested (decreasing), and I'm confident hadoop/pig was picking it up (I eventually decreased it too far, which caused an server side error of too much memory used).

I'm stuck in a "rock & a hard place" on the mappers.  At 20 tasks, based on the delete rate before time out failures happen, it was going to take 1-2 days to run the deletes (I was seeing ~10k deletes/sec across all 20 task threads).   But, this is going to be be my plan at this point: less tasks at once, even if it takes a week (of hopefully unsupervised time).

Thanks for the feedback!



 


On Fri, Apr 4, 2014 at 12:57 PM, Paulo Ricardo Motta Gomes <paulo.motta@chaordicsystems.com> wrote:
You said you have tried the Pig URL split_size, but have you actually tried decreasing the value of cassandra.input.split.size hadoop property? The default is 65536, so you may want to decrease that to see if the number of mappers increase. But at some point, even if you lower that value it will stop decreasing the number of mappers but I don't know exactly why, probably because it hits the minimum number of rows per token.

Another suggestion is to decrease the number of simultaneous mappers of your job, so it doesn't hit cassandra too hard, and you'll get less TimedOutExceptions, but your job will take longer to complete.

On Fri, Apr 4, 2014 at 1:24 PM, William Oberman <oberman@civicscience.com> wrote:
Hi,

I have some history with cassandra + hadoop:
1.) Single DC + integrated hadoop = Was "ok" until I needed steady performance (the single DC was used in a production environment)
2.) Two DC's + integrated hadoop on 1 of 2 DCs = Was "ok" until my data grew and in AWS compute is expensive compared to data storage... e.g. running a 24x7 DC was a lot more expensive than the following solution...
3.) Single DC + a constant "ETL" to S3 = Is still ok, I can spawn an "arbitrarily large" EMR cluster.  And 24x7 data storage + transient EMR is cost effective.

But, one of my CF's has had a change of usage pattern making a large %, but not all of the data, fairly pointless to store.  I thought I'd write a Pig UDF that could peek at a row of data and delete if it fails my criteria.  And it "works" in terms of logic, but not in terms of practical execution.  The CF in question has O(billion) keys, and afterwards it will have ~10% of that at most.

I basically keep losing the jobs due to too many task failures, all rooted in:
Caused by: TimedOutException()
at org.apache.cassandra.thrift.Cassandra$get_range_slices_result.read(Cassandra.java:13020)

And yes, I've messed around with:
-Number of failures for map/reduce/tracker (in the hadoop confs)
-split_size (on the URL)
-cassandra.range.batch.size

But it hasn't helped.  My failsafe is to roll my own distributed process, rather than falling into a pit of internal hadoop settings.  But I feel like I'm close.

The problem in my opinion, watching how things are going, is the correlation of splits <-> tasks.  I'm obviously using Pig, so this part of the process is fairly opaque to me at the moment.  But, "something somewhere" is picking 20 tasks for my job, and this is fairly independent of the # of task slots (I've booted EMR cluster with different #'s and always get 20).  Why does this matter?  When a task fails, it retries from the start, which is a killer for me as I "delete as I go", making that pointless work and massively increasing the odds of an overall job failure.  If hadoop/pig chose a large number of tasks, the retries would be much less of a burden.  But, I don't see where/what lets me mess with that logic.

Pig gives the ability to mess with reducers (PARALLEL), but I'm in the load path, which is all mappers.  I've never jumped to the lower, raw hadoop level before.  But, I'm worried that will be the "falling into a pit" issue...

I'm using Cassandra 1.2.15.

will



--
Paulo Motta