Return-Path: X-Original-To: apmail-cassandra-user-archive@www.apache.org Delivered-To: apmail-cassandra-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 19D561018D for ; Tue, 4 Feb 2014 21:40:00 +0000 (UTC) Received: (qmail 64782 invoked by uid 500); 4 Feb 2014 21:39:57 -0000 Delivered-To: apmail-cassandra-user-archive@cassandra.apache.org Received: (qmail 64733 invoked by uid 500); 4 Feb 2014 21:39:56 -0000 Mailing-List: contact user-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@cassandra.apache.org Delivered-To: mailing list user@cassandra.apache.org Received: (qmail 64725 invoked by uid 99); 4 Feb 2014 21:39:56 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Feb 2014 21:39:56 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of edlinuxguru@gmail.com designates 74.125.82.172 as permitted sender) Received: from [74.125.82.172] (HELO mail-we0-f172.google.com) (74.125.82.172) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Feb 2014 21:39:49 +0000 Received: by mail-we0-f172.google.com with SMTP id p61so4867350wes.17 for ; Tue, 04 Feb 2014 13:39:29 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=bcRvLdZisIlJJNG8DIxvWCm7RsBGJDE1iykBrafDBWI=; b=bqaOpWhn2pEcnGDmiTD4GDaN/0Tk25n/ODmE0e6G/l7bPDtoRPZCclMRx7WGEyEHxm dlMx9V/+UjayBfISI3HFjcbvmCggzp2saN/IG0cmY4JWSNtPIwqIfTpafcOZ/9ZuxStC 9IdLJr8+TNpiD2W2gz3fbAk/iGitCLNaxa96NUSXHCJftJcTKc+9L8za9AaNaXPr7uiz fsXhUxWN4wmW5WA0jcEPhrB0PzlNZjH+DKL4bpN6cUZmfqSZwe4pbYsnRFV+4N8CI8gh MMwaumNtx/JYREck3/OGr8YN/TtkhEQR0Jy/3M2NSHmXGEwaaXtHz22W1Ydz0Vo2qMy8 SgvA== MIME-Version: 1.0 X-Received: by 10.194.2.110 with SMTP id 14mr43009wjt.96.1391549969121; Tue, 04 Feb 2014 13:39:29 -0800 (PST) Received: by 10.194.220.105 with HTTP; Tue, 4 Feb 2014 13:39:29 -0800 (PST) In-Reply-To: References: Date: Tue, 4 Feb 2014 16:39:29 -0500 Message-ID: Subject: Re: Ultra wide row anti pattern From: Edward Capriolo To: "user@cassandra.apache.org" Content-Type: multipart/alternative; boundary=047d7b33db8698c52504f19b79c5 X-Virus-Checked: Checked by ClamAV on apache.org --047d7b33db8698c52504f19b79c5 Content-Type: text/plain; charset=ISO-8859-1 You could use another column of CAS as a management layer. You only have to consult it when picking up new rows. On Tue, Feb 4, 2014 at 3:45 PM, DuyHai Doan wrote: > Great idea for implementing queue pattern. Thank you Edward. > > However with your design there are still corner cases for 2 consumers to > read from the same queue. Reading and writing with QUORUM does not prevent > race conditions. I believe the new CAS feature of C* 2.0 might be useful > here but with the expense of reduced throughput (because of the Paxos round) > > > > > On Tue, Feb 4, 2014 at 4:50 PM, Edward Capriolo wrote: > >> I have actually been building something similar in my space time. You can >> hang around and wait for it or build your own. Here is the basics. Not >> perfect but it will work. >> >> Create column family queue with gc_grace_period=[1 day] >> >> set queue [timeuuid()] ["z"+timeuuid()] = [ work do do] >> >> The producer can decide how it wants to role over the row key and the >> column key it does not matter. >> >> Supposing there are N consumers. We need a way for the consumers to not >> do the same work. We can use something like the bakery algorithm. Remember >> at QUORUM a reader sees writes. >> >> A consumer needs an identifier (it could be another uuid or an ip >> address) >> A consumer calls get_range_slice on the queue the slice is from new >> byte[] to byte[] limit 100 >> >> The consumer sees data like this. >> >> [1234] [z-$timeuuid] = data >> >> Now we register that this consumer wants to consume this queue >> >> set [1234] [a-$[ip}] at quorum >> >> Now we do a slice >> get_slice [1234] from new byte [] to ' b' >> >> There are a few possible returns. >> 1) 1 bidder... >> [1234] [a-$myip] >> You won start consuming >> >> 2) 2 bidders >> [1234] [a-$myip] >> [1234] [a-$otherip] >> compare $myip vs $otherip higher wins >> >> Whoever wins can then start consuming the columns in the queue and delete >> them when done. >> >> >> >> >> >> >> On Friday, January 31, 2014, DuyHai Doan wrote: >> > Thanks Nat for your ideas. >> >>This could be as simple as adding year and month to the primary key (in >> the form >'yyyymm'). Alternatively, you could add this in the partition in >> the definition. Either way, it >then becomes pretty easy to re-generate >> these based on the query parameters. >> > >> > The thing is that it's not that simple. My customer has a very BAD >> idea, using Cassandra as a queue (the perfect anti-pattern ever). >> > Before trying to tell them to redesign their entire architecture and >> put in some queueing system like ActiveMQ or something similar, I would >> like to see how I can use wide rows to meet the requirements. >> > The functional need is quite simple: >> > 1) A process A loads users into Cassandra and sets the status on this >> user to be 'TODO'. When using the bucketing technique, we can limit a row >> width to, let's say 100 000 columns. So at the end of the current row, >> process A knows that it should move to next bucket. Bucket is coded using >> composite partition key, in our example it would be 'TODO:1', 'TODO:2' .... >> etc >> > >> > 2) A process B reads the wide row for 'TODO' status. It starts at >> bucket 1 so it will read row with partition key 'TODO:1'. The users are >> processed and inserted in a new row 'PROCESSED:1' for example to keep track >> of the status. After retrieving 100 000 columns, it will switch >> automatically to the next bucket. Simple. Fair enough >> > >> > 3) Now what sucks it that some time, process B does not have enough >> data to perform functional logic on the user it fetched from the wide row, >> so it has to REPUT some users back into the 'TODO' status rather than >> transitioning to 'PROCESSED' status. That's exactly a queue behavior. >> > A simplistic idea would be to insert again those m users with >> 'TODO:n', with n higher than the current bucket number so it can be >> processed later. But then it screws up all the counting system. Process A >> which inserts data will not know that there are already m users in row n, >> so will happily add 100 000 columns, making the row size grow to 100 000 + >> m. When process B reads back again this row, it will stop at the first 100 >> 000 columns and skip the trailing m elements . >> > That 's the main reason for which I dropped the idea of bucketing >> (which is quite smart in normal case) to trade for ultra wide row. >> > Any way, I'll follow your advice and play around with the parameters >> of SizeTiered >> > Regards >> > Duy Hai DOAN >> > >> > On Fri, Jan 31, 2014 at 9:23 PM, Nate McCall >> wrote: >> >>> >> >>> The only drawback for ultra wide row I can see is point 1). But if I >> use leveled compaction with a sufficiently large value for >> "sstable_size_in_mb" (let's say 200Mb), will my read performance be >> impacted as the row grows ? >> >> >> >> For this use case, you would want to use SizeTieredCompaction and play >> around with the configuration a bit to keep a small number of large >> SSTables. Specifically: keep min|max_threshold really low, set bucket_low >> and bucket_high closer together maybe even both to 1.0, and maybe a larger >> min_sstable_size. >> >> YMMV though - per Rob's suggestion, take the time to run some tests >> tweaking these options. >> >> >> >>> >> >>> Of course, splitting wide row into several rows using bucketing >> technique is one solution but it forces us to keep track of the bucket >> number and it's not convenient. We have one process (jvm) that insert data >> and another process (jvm) that read data. Using bucketing, we need to >> synchronize the bucket number between the 2 processes. >> >> >> >> This could be as simple as adding year and month to the primary key >> (in the form 'yyyymm'). Alternatively, you could add this in the partition >> in the definition. Either way, it then becomes pretty easy to re-generate >> these based on the query parameters. >> >> >> >> >> >> -- >> >> ----------------- >> >> Nate McCall >> >> Austin, TX >> >> @zznate >> >> >> >> Co-Founder & Sr. Technical Consultant >> >> Apache Cassandra Consulting >> >> http://www.thelastpickle.com >> > >> > > --047d7b33db8698c52504f19b79c5 Content-Type: text/html; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable
You could use another column of CAS as a management layer.= You only have to consult it when picking up new rows.


On Tue, Feb 4, 2014 at = 3:45 PM, DuyHai Doan <doanduyhai@gmail.com> wrote:
Great idea for im= plementing queue pattern. Thank you Edward.

However with your = design there are still corner cases for 2 consumers to read from the same q= ueue. Reading and writing with QUORUM does not prevent race conditions. I b= elieve the new CAS feature of C* 2.0 might be useful here but with the expe= nse of reduced throughput (because of the Paxos round)




On Tue, Feb 4, 2014 at 4:50 PM, Edward Capriolo &l= t;edlinuxguru@gm= ail.com> wrote:
I have actually been b= uilding something similar in my space time. You can hang around and wait fo= r it or build your own. Here is the basics. Not perfect but it will work.
Create column family queue with gc_grace_period=3D[1 day]

set queue [timeuuid()] ["z"+timeuuid()] =3D [ work= do do]

The producer can decide how it wants to role ove= r the row key and the column key it does not matter.

Supposing there are N consumers. We need a way for the consumers= to not do the same work. We can use something like the bakery algorithm. R= emember at QUORUM a reader sees writes.

A consumer needs = an identifier (it could be another uuid or an ip address)
A consumer calls get_range_slice on the queue the slice is from = new byte[] to byte[] limit 100

The consumer se= es data like this.

[1234] [z-$timeuuid] =3D data

Now we register that this consumer wants to consume this queue
set [1234] [a-$[ip}] at quorum

Now we do= a slice
get_slice [1234]=A0 from new byte [] to ' b'=

There are a few possible returns.
1) 1 bidder.= ..
[1234] [a-$myip]
You won start consuming

2)=A0 2 bidders
[1234] [a-$myip]
[1234] [= a-$otherip]
compare $myip vs $otherip higher wins

Whoever = wins can then start consuming the columns in the queue and delete them when= done.






On Friday, January 31, 2014, DuyHai Doan <doanduyhai@gmail.com> wrote:
>= Thanks Nat for your ideas.
>>This could be as simple as adding year and month to the primary key= (in the form >'yyyymm'). Alternatively, you could add this in t= he partition in the definition. Either way, it >then becomes pretty easy= to re-generate these based on the query parameters.=A0
>
> =A0The thing is that it's not that simple. My customer has= a very BAD idea, using Cassandra as a queue (the perfect anti-pattern ever= ).
> =A0Before trying to tell them to redesign their entire architect= ure and put in some queueing system like ActiveMQ or something similar, I w= ould like to see how I can use wide rows to meet the requirements.
> =A0The functional need is quite simple:
> =A01) A process A load= s users into Cassandra and sets the status on this user to be 'TODO'= ;. When using the bucketing technique, we can limit a row width to, let'= ;s say 100 000 columns. So at the end of the current row, process A knows t= hat it should move to next bucket. Bucket is coded using composite partitio= n key, in our example it would be 'TODO:1', 'TODO:2' .... e= tc
>
> =A02) A process B reads the wide row for 'TODO' status= . It starts at bucket 1 so it will read row with partition key 'TODO:1&= #39;. The users are processed and inserted in a new row 'PROCESSED:1= 9; for example to keep track of the status. After retrieving 100 000 column= s, it will switch automatically to the next bucket. Simple. Fair enough
>
> =A03) Now what sucks it that some time, process B does not hav= e enough data to perform functional logic on the user it fetched from the w= ide row, so it has to REPUT some users back into the 'TODO' status = rather than transitioning to 'PROCESSED' status. That's exactly= a queue behavior.
> =A0A simplistic idea would be to insert again those m users with '= TODO:n', with n higher than the current bucket number so it can be proc= essed later. But then it screws up all the counting system. Process A which= inserts data will not know that there are already m users in row n, so wil= l happily=A0add 100 000 columns, making the row size grow to =A0100 000 + m= . When process B reads back again this row, it will stop at the first 100 0= 00 columns and skip the trailing=A0m elements .
> =A0 That 's the main reason for which I dropped the idea of bucket= ing (which is quite smart in normal case) to trade for ultra wide row.
&= gt; =A0Any way, I'll follow your advice and play around with the parame= ters of SizeTiered
> =A0Regards
> =A0Duy Hai DOAN
>
> On Fri, Jan 31, 201= 4 at 9:23 PM, Nate McCall <nate@thelastpickle.com> wrote:
>>>
>= ;>> =A0The only drawback for ultra wide row I can see is point 1). Bu= t if I use leveled compaction with a sufficiently large value for "sst= able_size_in_mb" (let's say 200Mb), will my read performance be im= pacted as the row grows ?
>>
>> For this use case, you would want to use SizeTieredCom= paction and play around with the configuration a bit to keep a small number= of large SSTables. Specifically: keep min|max_threshold really low, set bu= cket_low and bucket_high closer together maybe even both to 1.0, and maybe = a larger min_sstable_size.=A0
>> YMMV though - per Rob's suggestion, take the time to run some = tests tweaking these options.
>> =A0
>>>
>>&g= t; =A0Of course, splitting wide row into several rows using bucketing techn= ique is one solution but it forces us to keep track of the bucket number an= d it's not convenient. We have one process (jvm) that insert data and a= nother process (jvm) that read data. Using bucketing, we need to synchroniz= e the bucket number between the 2 processes.
>>
>> This could be as simple as adding year and month to th= e primary key (in the form 'yyyymm'). Alternatively, you could add = this in the partition in the definition. Either way, it then becomes pretty= easy to re-generate these based on the query parameters. =A0
>> =A0
>>
>> --
>> -----------------
&g= t;> Nate McCall
>> Austin, TX
>> @zznate
>>>> Co-Founder & Sr. Technical Consultant
>> Apache Cas= sandra Consulting
>> http://= www.thelastpickle.com
>


--047d7b33db8698c52504f19b79c5--