Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 072D3200CDF for ; Thu, 17 Aug 2017 22:02:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 03C7A16BA5F; Thu, 17 Aug 2017 20:02:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C912316BA5C for ; Thu, 17 Aug 2017 22:02:04 +0200 (CEST) Received: (qmail 11020 invoked by uid 500); 17 Aug 2017 20:02:03 -0000 Mailing-List: contact user-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@cassandra.apache.org Delivered-To: mailing list user@cassandra.apache.org Received: (qmail 11010 invoked by uid 99); 17 Aug 2017 20:02:03 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Aug 2017 20:02:03 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 8CACB1807D3 for ; Thu, 17 Aug 2017 20:02:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.39 X-Spam-Level: ** X-Spam-Status: No, score=2.39 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, T_KAM_HTML_FONT_INVALID=0.01, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id qqF-WMc8dl-3 for ; Thu, 17 Aug 2017 20:02:00 +0000 (UTC) Received: from mail-oi0-f54.google.com (mail-oi0-f54.google.com [209.85.218.54]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 2AD5C60E0C for ; Thu, 17 Aug 2017 20:02:00 +0000 (UTC) Received: by mail-oi0-f54.google.com with SMTP id x3so77452597oia.1 for ; Thu, 17 Aug 2017 13:02:00 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=GSbwFixZV7QE5HguIhRHXxud0fXGIrewyTSnFU637DA=; b=BShHSWx2BYhePNOT1JHULgsnznfv3CL9LzUSaMlih/YmtVXUASBEjnuzwak3K2d9gy og952U2ycXBdva9f462P3JUZrwSSn9SarUKlsfAc88Ef6Nbk6b0rjuGOapxwibnwaEkP +7MCANl57quVkpHq9Bt09we0dXPsWh0rpRyIxO3DGTApjV1mO8nbbFtGy+eLGgEVA3BW Ml6uT5wG1Xy80dLQUFltCS6VYMz3T+dLbRkRS8l3anAI5lU25zn+DlK24+73t6r3ogYP PCwSlGpiLQ48TfWFXBJz+qUIVEl51JMZ5PtlLTwpQegePgjGKN96e3SJIKMM7u4kBdlg pdNg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=GSbwFixZV7QE5HguIhRHXxud0fXGIrewyTSnFU637DA=; b=gzilZkbHfTi2iyv1Uud50p/jrX3HWsqSD8QpXoJSdYmYivSyUsL0zvuRp5gU25yuoR qZXWTXyCA7uXvWUnaUrhZmF6cR6N781KJn5pvCwzc1YGgzkVhSTzXRLRUDpcHcVQlL8N 1W1YdaGxqrwQPpq3HySpD9YNnkETa6h6kIfmLXjoeJm6JqxysEVJI/FR/SCCl7O0lutf 1YuUaC4h8aAKHFFzNQGoQgRIy9rKNU2mVc3SpyU4kB7ksfcgl+z1UoZIh3Glep4rl8SZ sn/vCCPrYwqw3D2cM4Tjg/OKVKqe2kg4g2T7ewfZjbhUbe3j2FRxbAb0kGWkCiQObZe2 61aA== X-Gm-Message-State: AHYfb5ipyfU2CBtRv81ohNW+2ldI3afO+pjIxFsDpHflozthvNwHq5kc RcBpk/qkFDIGbE48ESzklswNtdbD+lvc X-Received: by 10.202.189.195 with SMTP id n186mr8174579oif.251.1503000118556; Thu, 17 Aug 2017 13:01:58 -0700 (PDT) MIME-Version: 1.0 Received: by 10.202.187.197 with HTTP; Thu, 17 Aug 2017 13:01:37 -0700 (PDT) In-Reply-To: References: From: Dmitry Saprykin Date: Thu, 17 Aug 2017 16:01:37 -0400 Message-ID: Subject: Re: Full table scan with cassandra To: user@cassandra.apache.org Content-Type: multipart/alternative; boundary="001a113ddb7629fee50556f87c9d" archived-at: Thu, 17 Aug 2017 20:02:06 -0000 --001a113ddb7629fee50556f87c9d Content-Type: text/plain; charset="UTF-8" Hi Alex, How do you generate you subrange set for running queries? It may happen that some of your ranges intersect data ownership range borders (check it running 'nodetool describering [keyspace_name]') Those range queries will be highly ineffective in that case and that could explain your results. Also you can think using LOCAL_ONE consistency for your full scans. You may lose some consistency but will gain a log of performance improvements. Kind regards, Dmitry Saprykin On Thu, Aug 17, 2017 at 12:36 PM, Alex Kotelnikov < alex.kotelnikov@diginetica.com> wrote: > Dor, > > I believe, I tried it in many ways and the result is quite disappointing. > I've run my scans on 3 different clusters, one of which was using on VMs > and I was able to scale it up and down (3-5-7 VMs, 8 to 24 cores) to see, > how this affects the performance. > > I also generated the flow from spark cluster ranging from 4 to 40 parallel > tasks as well as just multi-threaded client. > > The surprise is that trivial fetch of all records using token ranges takes > pretty much the same time in all setups. > > The only beneficial thing I've learned is that it is much more efficient > to create a MATERIALIZED VIEW than to filter (even using secondary index). > > Say, I have a typical dataset, around 3Gb of data, 1M records. And I have > a trivial scan practice: > > String.format("SELECT token(user_id), user_id, events FROM user_events > WHERE token(user_id) >= %d ", start) + (end != null ? String.format(" AND > token(user_id) < %d ", end) : "") > > I split all tokens into start-end ranges (except for last range, which > only has start) and query ranges in multiple threads, up to 40. > > Whole process takes ~40s on 3 VMs cluster 2+2+4 cores, 16Gb RAM each 1 > virtual disk. And it takes ~30s on real hardware clusters > 8servers*8cores*32Gb. Level of the concurrency does not matter pretty much > at all. Util it is too high or too low. > Size of tokens range matters, but here I see the rule "make it larger, but > avoid cassandra timeouts". > I also tried spark connector to validate that my test multithreaded app is > not the bottleneck. It is not. > > I expected some kind of elasticity, I see none. Feels like I do something > wrong... > > > > On 17 August 2017 at 00:19, Dor Laor wrote: > >> Hi Alex, >> >> You probably didn't get the paralelism right. Serial scan has >> a paralelism of one. If the paralelism isn't large enough, perf will be >> slow. >> If paralelism is too large, Cassandra and the disk will trash and have too >> many context switches. >> >> So you need to find your cluster's sweet spot. We documented the procedure >> to do it in this blog: http://www.scylladb.com/ >> 2017/02/13/efficient-full-table-scans-with-scylla-1-6/ >> and the results are here: http://www.scylladb.com/ >> 2017/03/28/parallel-efficient-full-table-scan-scylla/ >> The algorithm should translate to Cassandra but you'll have to use >> different rules of the thumb. >> >> Best, >> Dor >> >> >> On Wed, Aug 16, 2017 at 9:50 AM, Alex Kotelnikov < >> alex.kotelnikov@diginetica.com> wrote: >> >>> Hey, >>> >>> we are trying Cassandra as an alternative for storage huge stream of >>> data coming from our customers. >>> >>> Storing works quite fine, and I started to validate how retrieval does. >>> We have two types of that: fetching specific records and bulk retrieval for >>> general analysis. >>> Fetching single record works like charm. But it is not so with bulk >>> fetch. >>> >>> With a moderately small table of ~2 million records, ~10Gb raw data I >>> observed very slow operation (using token(partition key) ranges). It takes >>> minutes to perform full retrieval. We tried a couple of configurations >>> using virtual machines, real hardware and overall looks like it is not >>> possible to all table data in a reasonable time (by reasonable I mean that >>> since we have 1Gbit network 10Gb can be transferred in a couple of minutes >>> from one server to another and when we have 10+ cassandra servers and 10+ >>> spark executors total time should be even smaller). >>> >>> I tried datastax spark connector. Also I wrote a simple test case using >>> datastax java driver and see how fetch of 10k records takes ~10s so I >>> assume that "sequential" scan will take 200x more time, equals ~30 minutes. >>> >>> May be we are totally wrong trying to use Cassandra this way? >>> >>> -- >>> >>> Best Regards, >>> >>> >>> *Alexander Kotelnikov* >>> >>> *Team Lead* >>> >>> DIGINETICA >>> Retail Technology Company >>> >>> m: +7.921.915.06.28 <+7%20921%20915-06-28> >>> >>> *www.diginetica.com * >>> >> >> > > > -- > > Best Regards, > > > *Alexander Kotelnikov* > > *Team Lead* > > DIGINETICA > Retail Technology Company > > m: +7.921.915.06.28 <+7%20921%20915-06-28> > > *www.diginetica.com * > --001a113ddb7629fee50556f87c9d Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Alex,

How do you generate you subran= ge set for running queries?
It may happen that some of your range= s intersect data ownership range borders (check it running 'nodetool de= scribering [keyspace_name]')
Those range queries will be high= ly ineffective in that case and that could explain your results.
=
Also you can think using=C2=A0LOCAL_ONE consistency for your= full scans. You may lose some consistency but will gain a log of performan= ce improvements.

Kind regards,
Dmitry Sa= prykin

On Thu, Aug 17, 2017 at 12:36 PM, Alex Kotelnikov <alex.kote= lnikov@diginetica.com> wrote:
Dor,

I believe, I tried it in many w= ays and the result is quite disappointing.
I've run my scans = on 3 different clusters, one of which was using on VMs and I was able to sc= ale it up and down (3-5-7 VMs, 8 to 24 cores) to see, how this affects the = performance.

I also generated the flow from spark = cluster ranging from 4 to 40 parallel tasks as well as just multi-threaded = client.

The surprise is that trivial fetch of all = records using token ranges takes pretty much the same time in all setups.= =C2=A0

The only beneficial thing I've learned = is that it is much more efficient to create a MATERIALIZED VIEW than to fil= ter (even using secondary index).

Say, I have a ty= pical dataset, around 3Gb of data, 1M records. And I have a trivial scan pr= actice:

String.format("SELECT token(user= _id), user_id, events FROM user_events WHERE token(user_id) >=3D %d &quo= t;, start) + (end !=3D null ? String.format(" AND token(user_id) < = %d ", end) : "")

I split all = tokens into start-end ranges (except for last range, which only has start) = and query ranges in multiple threads, up to 40.

Wh= ole process takes ~40s on 3 VMs cluster =C2=A02+2+4 cores, 16Gb RAM each 1 = virtual disk. And it takes ~30s on real hardware clusters 8servers*8cores*3= 2Gb. Level of the concurrency does not matter pretty much at all. Util it i= s too high or too low.
Size of tokens range matters, but here I s= ee the rule "make it larger, but avoid cassandra timeouts".
=
I also tried spark connector to validate that my test multithreaded ap= p is not the bottleneck. It is not.

I expected som= e kind of elasticity, I see none. Feels like I do something wrong...
<= div>


On 17 August 2017 at 00:19, Dor Laor <dor@scylladb.com><= /span> wrote:
Hi Alex,
You probably didn't get the paralelism right. Serial = scan has
a paralelism of one. If the paralelism isn't large e= nough, perf will be slow.
If paralelism is too large, Cassandra a= nd the disk will trash and have too
many context switches.=C2=A0<= /div>

So you need to find your cluster's sweet spot.= We documented the procedure
The = algorithm should translate to Cassandra but you'll have to use differen= t rules of the thumb.

Best,
Dor





--
=

Best Regards,

Alexander Kotelnikov
Team Lead

DIGINETICA

Retail Techno= logy Company

m: +7.921.915.06.28

= www.diginetica.com


--001a113ddb7629fee50556f87c9d--