cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From " Brian Hess (JIRA)" <>
Subject [jira] [Created] (CASSANDRA-9259) Bulk Reading from Cassandra
Date Tue, 28 Apr 2015 22:31:06 GMT
 Brian Hess created CASSANDRA-9259:

             Summary: Bulk Reading from Cassandra
                 Key: CASSANDRA-9259
             Project: Cassandra
          Issue Type: Improvement
          Components: Core
            Reporter:  Brian Hess

This ticket is following on from the 2015 NGCC.  This ticket is designed to be a place for
discussing and designing an approach to bulk reading.

The goal is to have a bulk reading path for Cassandra.  That is, a path optimized to grab
a large portion of the data for a table (potentially all of it).  This is a core element in
the Spark integration with Cassandra, and the speed at which Cassandra can deliver bulk data
to Spark is limiting the performance of Spark-plus-Cassandra operations.  This is especially
of importance as Cassandra will (likely) leverage Spark for internal operations (for example

The core CQL to consider is the following:
SELECT a, b, c FROM myKs.myTable WHERE Token(partitionKey) > X AND Token(partitionKey)
<= Y

Here, we choose X and Y to be contained within one token range (perhaps considering the primary
range of a node without vnodes, for example).  This query pushes 50K-100K rows/sec, which
is not very fast if we are doing bulk operations via Spark (or other processing frameworks
- ETL, etc).  There are a few causes (e.g., inefficient paging).

There are a few approaches that could be considered.  First, we consider a new "Streaming
Compaction" approach.  The key observation here is that a bulk read from Cassandra is a lot
like a major compaction, though instead of outputting a new SSTable we would output CQL rows
to a stream/socket/etc.  This would be similar to a CompactionTask, but would strip out some
unnecessary things in there (e.g., some of the indexing, etc). Predicates and projections
could also be encapsulated in this new "StreamingCompactionTask", for example.

Another approach would be an alternate storage format.  For example, we might employ Parquet
(just as an example) to store the same data as in the primary Cassandra storage (aka SSTables).
 This is akin to Global Indexes (an alternate storage of the same data optimized for a particular
query).  Then, Cassandra can choose to leverage this alternate storage for particular CQL
queries (e.g., range scans).

These are just 2 suggestions to get the conversation going.

One thing to note is that it will be useful to have this storage segregated by token range
so that when you extract via these mechanisms you do not get replications-factor numbers of
copies of the data.  That will certainly be an issue for some Spark operations (e.g., counting).
 Thus, we will want per-token-range storage (even for single disks), so this will likely leverage
CASSANDRA-6696 (though, we'll want to also consider the single disk case).

It is also worth discussing what the success criteria is here.  It is unlikely to be as fast
as EDW or HDFS performance (though, that is still a good goal), but being within some percentage
of that performance should be set as success.  For example, 2x as long as doing bulk operations
on HDFS with similar node count/size/etc.

This message was sent by Atlassian JIRA

View raw message