Return-Path: X-Original-To: apmail-cassandra-user-archive@www.apache.org Delivered-To: apmail-cassandra-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1429D10A46 for ; Fri, 4 Apr 2014 16:58:15 +0000 (UTC) Received: (qmail 58702 invoked by uid 500); 4 Apr 2014 16:58:12 -0000 Delivered-To: apmail-cassandra-user-archive@cassandra.apache.org Received: (qmail 58680 invoked by uid 500); 4 Apr 2014 16:58:11 -0000 Mailing-List: contact user-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@cassandra.apache.org Delivered-To: mailing list user@cassandra.apache.org Received: (qmail 58670 invoked by uid 99); 4 Apr 2014 16:58:10 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Apr 2014 16:58:10 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of paulo.motta@chaordicsystems.com designates 209.85.220.49 as permitted sender) Received: from [209.85.220.49] (HELO mail-pa0-f49.google.com) (209.85.220.49) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Apr 2014 16:58:06 +0000 Received: by mail-pa0-f49.google.com with SMTP id lj1so3711597pab.22 for ; Fri, 04 Apr 2014 09:57:43 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:content-type; bh=YJTHcVWR3uZeMTf7I8qOqKz+7jh+T5Cg89EZhRnW7pY=; b=LWDeNxfRPY7m7YbxFTm/C5UibLY0FWY7d47tllcAdZMi1y5Vh18aZnrO4TBymSA3Mf +Zf6RJey18AlyXFEVKhE8XjS3TuuFiAMch+c0ZZ1LSP90QMhPuXUt0y3eJ/i+5X9l3SF vxeLq5FzJEbLAmVOAoemrYvBDed2j7v4Y8ZVDkcq4Lj6PRSA/THf0impRDvSkc5DZ6my 0G7QHjJzliW9SznzTAPQs7DpXlRNnWVKn/ZLFsqs2FyYZTVfYMG0w1xkHVwvRnRBx+Ke /VLBjuIqM9Ydu1rFIdsd+StG3E1bptMd2pLPEaRXjg/V1SfBcAHLAGSU2zJTqr7MUZhQ m74g== X-Gm-Message-State: ALoCoQm7uHuSk+BoKyBhq3D8JylF/3yLgDeE+fBmqQ6DoNOKUgJkNZVD00TpFfziE7JNQjpIJ3BX X-Received: by 10.68.170.36 with SMTP id aj4mr16395000pbc.54.1396630663665; Fri, 04 Apr 2014 09:57:43 -0700 (PDT) MIME-Version: 1.0 Received: by 10.69.0.193 with HTTP; Fri, 4 Apr 2014 09:57:23 -0700 (PDT) In-Reply-To: References: From: Paulo Ricardo Motta Gomes Date: Fri, 4 Apr 2014 13:57:23 -0300 Message-ID: Subject: Re: using hadoop + cassandra for CF mutations (delete) To: user@cassandra.apache.org Content-Type: multipart/alternative; boundary=047d7bacb21c97318404f63a6a87 X-Virus-Checked: Checked by ClamAV on apache.org --047d7bacb21c97318404f63a6a87 Content-Type: text/plain; charset=ISO-8859-1 You said you have tried the Pig URL split_size, but have you actually tried decreasing the value of cassandra.input.split.size hadoop property? The default is 65536, so you may want to decrease that to see if the number of mappers increase. But at some point, even if you lower that value it will stop decreasing the number of mappers but I don't know exactly why, probably because it hits the minimum number of rows per token. Another suggestion is to decrease the number of simultaneous mappers of your job, so it doesn't hit cassandra too hard, and you'll get less TimedOutExceptions, but your job will take longer to complete. On Fri, Apr 4, 2014 at 1:24 PM, William Oberman wrote: > Hi, > > I have some history with cassandra + hadoop: > 1.) Single DC + integrated hadoop = Was "ok" until I needed steady > performance (the single DC was used in a production environment) > 2.) Two DC's + integrated hadoop on 1 of 2 DCs = Was "ok" until my data > grew and in AWS compute is expensive compared to data storage... e.g. > running a 24x7 DC was a lot more expensive than the following solution... > 3.) Single DC + a constant "ETL" to S3 = Is still ok, I can spawn an > "arbitrarily large" EMR cluster. And 24x7 data storage + transient EMR is > cost effective. > > But, one of my CF's has had a change of usage pattern making a large %, > but not all of the data, fairly pointless to store. I thought I'd write a > Pig UDF that could peek at a row of data and delete if it fails my > criteria. And it "works" in terms of logic, but not in terms of practical > execution. The CF in question has O(billion) keys, and afterwards it will > have ~10% of that at most. > > I basically keep losing the jobs due to too many task failures, all rooted > in: > Caused by: TimedOutException() > at > org.apache.cassandra.thrift.Cassandra$get_range_slices_result.read(Cassandra.java:13020) > > And yes, I've messed around with: > -Number of failures for map/reduce/tracker (in the hadoop confs) > -split_size (on the URL) > -cassandra.range.batch.size > > But it hasn't helped. My failsafe is to roll my own distributed process, > rather than falling into a pit of internal hadoop settings. But I feel > like I'm close. > > The problem in my opinion, watching how things are going, is the > correlation of splits <-> tasks. I'm obviously using Pig, so this part of > the process is fairly opaque to me at the moment. But, "something > somewhere" is picking 20 tasks for my job, and this is fairly independent > of the # of task slots (I've booted EMR cluster with different #'s and > always get 20). Why does this matter? When a task fails, it retries from > the start, which is a killer for me as I "delete as I go", making that > pointless work and massively increasing the odds of an overall job failure. > If hadoop/pig chose a large number of tasks, the retries would be much > less of a burden. But, I don't see where/what lets me mess with that logic. > > Pig gives the ability to mess with reducers (PARALLEL), but I'm in the > load path, which is all mappers. I've never jumped to the lower, raw > hadoop level before. But, I'm worried that will be the "falling into a > pit" issue... > > I'm using Cassandra 1.2.15. > > will > -- *Paulo Motta* Chaordic | *Platform* *www.chaordic.com.br * +55 48 3232.3200 +55 83 9690-1314 --047d7bacb21c97318404f63a6a87 Content-Type: text/html; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable
You said you have tried the Pig URL split_size, but h= ave you actually tried decreasing the value of cassandra.input.split.size h= adoop property? The default is 65536, so you may want to decrease that to s= ee if the number of mappers increase. But at some point, even if you lower = that value it will stop decreasing the number of mappers but I don't kn= ow exactly why, probably because it hits the minimum number of rows per tok= en.

Another suggestion is to decrease the number of simulta= neous mappers of your job, so it doesn't hit cassandra too hard, and yo= u'll get less TimedOutExceptions, but your job will take longer to comp= lete.

On Fri, Apr 4, 20= 14 at 1:24 PM, William Oberman <oberman@civicscience.com> wrote:
Hi,

I have some histor= y with cassandra + hadoop:
1.) Single DC + integrated hadoop =3D Was "ok" until I neede= d steady performance (the single DC was used in a production environment)
2.) Two DC's + integrated hadoop on 1 of 2 DCs =3D Was "ok&qu= ot; until my data grew and in AWS compute is expensive compared to data sto= rage... e.g. running a 24x7 DC was a lot more expensive than the following = solution...
3.) Single DC + a constant "ETL" to S3 =3D Is still ok, I ca= n spawn an "arbitrarily large" EMR cluster. =A0And 24x7 data stor= age + transient EMR is cost effective.

But, one of= my CF's has had a change of usage pattern making a large %, but not al= l of the data, fairly pointless to store. =A0I thought I'd write a Pig = UDF that could peek at a row of data and delete if it fails my criteria. = =A0And it "works" in terms of logic, but not in terms of practica= l execution. =A0The CF in question has O(billion) keys, and afterwards it w= ill have ~10% of that at most.

I basically keep losing the jobs due to too many task f= ailures, all rooted in:
Caused by: TimedOutException()
=
at org.apache.cassandra.thrift.Cassandra$get_range_slices_result.read(= Cassandra.java:13020)

And yes, I've messed around with:
-= Number of failures for map/reduce/tracker (in the hadoop confs)
-= split_size (on the URL)
-cassandra.range.batch.size

But it hasn't helped. =A0My failsafe is to roll my own d= istributed process, rather than falling into a pit of internal hadoop setti= ngs. =A0But I feel like I'm close.

The problem= in my opinion, watching how things are going, is the correlation of splits= <-> tasks. =A0I'm obviously using Pig, so this part of the proce= ss is fairly opaque to me at the moment. =A0But, "something somewhere&= quot; is picking 20 tasks for my job, and this is fairly independent of the= # of task slots (I've booted EMR cluster with different #'s and al= ways get 20). =A0Why does this matter? =A0When a task fails, it retries fro= m the start, which is a killer for me as I "delete as I go", maki= ng that pointless work and massively increasing the odds of an overall job = failure. =A0If hadoop/pig chose a large number of tasks, the retries would = be much less of a burden. =A0But, I don't see where/what lets me mess w= ith that logic.

Pig gives the ability to mess with reducers (PARALLEL),= but I'm in the load path, which is all mappers. =A0I've never jump= ed to the lower, raw hadoop level before. =A0But, I'm worried that will= be the "falling into a pit" issue...

I'm using Cassandra 1.2.15.

will



--
Paulo Motta

Chaordic | Platform
www.chaordic.com.br
+55 48 3232.3200
+55 83 9690-1314
--047d7bacb21c97318404f63a6a87--