Return-Path: X-Original-To: apmail-cassandra-user-archive@www.apache.org Delivered-To: apmail-cassandra-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 854BB17ECA for ; Thu, 12 Feb 2015 07:32:43 +0000 (UTC) Received: (qmail 9636 invoked by uid 500); 12 Feb 2015 07:32:40 -0000 Delivered-To: apmail-cassandra-user-archive@cassandra.apache.org Received: (qmail 9590 invoked by uid 500); 12 Feb 2015 07:32:40 -0000 Mailing-List: contact user-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@cassandra.apache.org Delivered-To: mailing list user@cassandra.apache.org Received: (qmail 9576 invoked by uid 99); 12 Feb 2015 07:32:40 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Feb 2015 07:32:40 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of horky@avast.com designates 74.125.82.45 as permitted sender) Received: from [74.125.82.45] (HELO mail-wg0-f45.google.com) (74.125.82.45) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Feb 2015 07:32:13 +0000 Received: by mail-wg0-f45.google.com with SMTP id k14so5118775wgh.4 for ; Wed, 11 Feb 2015 23:32:12 -0800 (PST) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:message-id:date:from:user-agent:mime-version:to :subject:references:in-reply-to:content-type; bh=owyJC5naJQd3A/9iP9a9tReAmJWD/+UUp5N+lr5gzmk=; b=GRjUwkiKo5V9yMOKAPDJ++DOwhKzH8jEHTu17+WzjAuWy4H98wv6Hfz3ujiZfQpDqu Ydv4jYNBv0ylnN03IxLG5vSBzcQOH+7rZwYufpQEvMb56YpdQKF3RwpbXQffiJ6OX1sw assdwUmg/BOyuMeEPw/iL8x9G/wPZgDXa5EYWUukbdYERed5RsFNTLHcN33Y/kP0rqdB RIh+0dHWUfnwFDMI8SM/ZSMlCDQv3CxofHarpOzDsT+gRho0icjkOqvEVX0elIinZQjN IOFtWpgbMdHZjD4CgXw0M58PmVt9RPuOmtu+kB08OG+Ovumuif+uhrV2R4OaUiSWmx+h Qsyw== X-Gm-Message-State: ALoCoQnDG8ebmMfwkBow36vhYIHQc2D2ZPQ7nvVMT4vA2fIVbOXKt2aP7N3wloyL0iVOyD4K0P4HP9uJ+YnCbGhwNOMDhIvXLO+QPF8uAWhoMUup+DEWgSzgBDvnoIzJFeHyf7qmnVVlZ3e4oBj2o5gCvcnmojeCqGRPWi4ni81UzbdwAlhll8Y= X-Received: by 10.180.81.134 with SMTP id a6mr3392153wiy.88.1423726332006; Wed, 11 Feb 2015 23:32:12 -0800 (PST) Received: from ?IPv6:2001:67c:284:32:f6aa:ccbb:9f1c:5720? ([2001:67c:284:32:f6aa:ccbb:9f1c:5720]) by mx.google.com with ESMTPSA id cf12sm4466090wjb.10.2015.02.11.23.32.10 (version=TLSv1.2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Wed, 11 Feb 2015 23:32:11 -0800 (PST) Message-ID: <54DC56F9.3080003@avast.com> Date: Thu, 12 Feb 2015 08:32:09 +0100 From: Jiri Horky User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:31.0) Gecko/20100101 Thunderbird/31.3.0 MIME-Version: 1.0 To: Ja Sam , user@cassandra.apache.org Subject: Re: How to speed up SELECT * query in Cassandra References: <54DB386B.2070309@avast.com> In-Reply-To: Content-Type: multipart/alternative; boundary="------------070305050501040401060702" X-Virus-Checked: Checked by ClamAV on apache.org This is a multi-part message in MIME format. --------------070305050501040401060702 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit Hi, here are some snippets of code in scala which should get you started. Jirka H. loop {lastRow =>val query = lastRow match {case Some(row) => nextPageQuery(row, upperLimit)case None => initialQuery(lowerLimit)}session.execute(query).all} private def nextPageQuery(row: Row, upperLimit: String): String = {val tokenPart = "token(%s) > token(0x%s) and token(%s) < %s".format(rowKeyName, hex(row.getBytes(rowKeyName)), rowKeyName, upperLimit)basicQuery.format(tokenPart)} private def initialQuery(lowerLimit: String): String = {val tokenPart = "token(%s) >= %s".format(rowKeyName, lowerLimit)basicQuery.format(tokenPart)}private def calculateRanges: (BigDecimal, BigDecimal, IndexedSeq[(BigDecimal, BigDecimal)]) = {tokenRange match {case Some((start, end)) =>Logger.info("Token range given: {}", "<" + start.underlying.toPlainString + ", " + end.underlying.toPlainString + ">")val tokenSpaceSize = end - startval rangeSize = tokenSpaceSize / concurrencyval ranges = for (i <- 0 until concurrency) yield (start + (i * rangeSize), start + ((i + 1) * rangeSize))(tokenSpaceSize, rangeSize, ranges)case None =>val tokenSpaceSize = partitioner.max - partitioner.minval rangeSize = tokenSpaceSize / concurrencyval ranges = for (i <- 0 until concurrency) yield (partitioner.min + (i * rangeSize), partitioner.min + ((i + 1) * rangeSize))(tokenSpaceSize, rangeSize, ranges)}} private val basicQuery = {"select %s, %s, %s, writetime(%s) from %s where %s%s limit %d%s".format(rowKeyName,columnKeyName,columnValueName,columnValueName,columnFamily,"%s", // templatewhereCondition,pageSize,if (cqlAllowFiltering) " allow filtering" else "")} case object Murmur3 extends Partitioner {override val min = BigDecimal(-2).pow(63)override val max = BigDecimal(2).pow(63) - 1}case object Random extends Partitioner {override val min = BigDecimal(0)override val max = BigDecimal(2).pow(127) - 1} On 02/11/2015 02:21 PM, Ja Sam wrote: > Your answer looks very promising > > How do you calculate start and stop? > > On Wed, Feb 11, 2015 at 12:09 PM, Jiri Horky > wrote: > > The fastest way I am aware of is to do the queries in parallel to > multiple cassandra nodes and make sure that you only ask them for keys > they are responsible for. Otherwise, the node needs to resend your > query > which is much slower and creates unnecessary objects (and thus GC > pressure). > > You can manually take advantage of the token range information, if the > driver does not get this into account for you. Then, you can play with > concurrency and batch size of a single query against one node. > Basically, what you/driver should do is to transform the query to > series > of "SELECT * FROM TABLE WHERE TOKEN IN (start, stop)". > > I will need to look up the actual code, but the idea should be > clear :) > > Jirka H. > > > On 02/11/2015 11:26 AM, Ja Sam wrote: > > Is there a simple way (or even a complicated one) how can I speed up > > SELECT * FROM [table] query? > > I need to get all rows form one table every day. I split tables, and > > create one for each day, but still query is quite slow (200 millions > > of records) > > > > I was thinking about run this query in parallel, but I don't know if > > it is possible > > --------------070305050501040401060702 Content-Type: text/html; charset=utf-8 Content-Transfer-Encoding: 8bit Hi,

here are some snippets of code in scala which should get you started.

Jirka H.

loop { lastRow => val query = lastRow match { case Some(row) => nextPageQuery(row, upperLimit) case None => initialQuery(lowerLimit) } session.execute(query).all }


private def nextPageQuery(row: Row, upperLimit: String): String = { val tokenPart = "token(%s) > token(0x%s) and token(%s) < %s".format(rowKeyName, hex(row.getBytes(rowKeyName)), rowKeyName, upperLimit) basicQuery.format(tokenPart) }


private def initialQuery(lowerLimit: String): String = { val tokenPart = "token(%s) >= %s".format(rowKeyName, lowerLimit) basicQuery.format(tokenPart) } private def calculateRanges: (BigDecimal, BigDecimal, IndexedSeq[(BigDecimal, BigDecimal)]) = { tokenRange match { case Some((start, end)) => Logger.info("Token range given: {}", "<" + start.underlying.toPlainString + ", " + end.underlying.toPlainString + ">") val tokenSpaceSize = end - start val rangeSize = tokenSpaceSize / concurrency val ranges = for (i <- 0 until concurrency) yield (start + (i * rangeSize), start + ((i + 1) * rangeSize)) (tokenSpaceSize, rangeSize, ranges) case None => val tokenSpaceSize = partitioner.max - partitioner.min val rangeSize = tokenSpaceSize / concurrency val ranges = for (i <- 0 until concurrency) yield (partitioner.min + (i * rangeSize), partitioner.min + ((i + 1) * rangeSize)) (tokenSpaceSize, rangeSize, ranges) } }

private val basicQuery = { "select %s, %s, %s, writetime(%s) from %s where %s%s limit %d%s".format( rowKeyName, columnKeyName, columnValueName, columnValueName, columnFamily, "%s", // template whereCondition, pageSize, if (cqlAllowFiltering) " allow filtering" else "" ) }


case object Murmur3 extends Partitioner { override val min = BigDecimal(-2).pow(63) override val max = BigDecimal(2).pow(63) - 1 } case object Random extends Partitioner { override val min = BigDecimal(0) override val max = BigDecimal(2).pow(127) - 1 }


On 02/11/2015 02:21 PM, Ja Sam wrote:
Your answer looks very promising

 How do you calculate start and stop?

On Wed, Feb 11, 2015 at 12:09 PM, Jiri Horky <horky@avast.com> wrote:
The fastest way I am aware of is to do the queries in parallel to
multiple cassandra nodes and make sure that you only ask them for keys
they are responsible for. Otherwise, the node needs to resend your query
which is much slower and creates unnecessary objects (and thus GC pressure).

You can manually take advantage of the token range information, if the
driver does not get this into account for you. Then, you can play with
concurrency and batch size of a single query against one node.
Basically, what you/driver should do is to transform the query to series
of "SELECT * FROM TABLE WHERE TOKEN IN (start, stop)".

I will need to look up the actual code, but the idea should be clear :)

Jirka H.


On 02/11/2015 11:26 AM, Ja Sam wrote:
> Is there a simple way (or even a complicated one) how can I speed up
> SELECT * FROM [table] query?
> I need to get all rows form one table every day. I split tables, and
> create one for each day, but still query is quite slow (200 millions
> of records)
>
> I was thinking about run this query in parallel, but I don't know if
> it is possible



--------------070305050501040401060702--