cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stefania (JIRA)" <>
Subject [jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times
Date Fri, 06 May 2016 10:22:13 GMT


Stefania commented on CASSANDRA-11542:

I've fixed the benchmark to ensure all jobs have a similar number of spark tasks. This was
accomplished by reducing the number of HDFS blocks and decreasing the Cassandra split size
in the connector. All jobs now have approximately 35 Spark tasks, of which 10 execute in parallel
(5 nodes with 2 executors each).

Here are the results for schema 1 and 3:

||15M rows||SCHEMA 1|| ||SCHEMA 3|| ||
||Test||Time||Std. Dev||Time||Std. Dev||

As expected, the Cassandra numbers have improved significantly due to the increased parallelism.

I've also performed some client side optimizations that followed by analyzing the JFR files
recorded during the last run, the results are as follows:

||15M rows||SCHEMA 1|| ||SCHEMA 3|| ||
||Test||Time||Std. Dev||Time||Std. Dev||

When both streaming and client optimizations are in place, the performance is considerably
better. The overall percentage improvements by test and schema are as follows:

||Percentage reduction in time after optimization||SCHEMA 1|| ||SCHEMA 3|| ||
|| ||RDD||DF||RDD||DF||
|Client optimizations|52.13%|32.75%|32.48%|15.27%|
|Streaming (after client optimizations)|40.17%|38.51%|19.10%|26.22%|

Raw result data is attached as [^].

The client improvements are available [here|],
this is a quick summary:

* The cache of type decoders in the driver is extremely slow, saving the decoders in an array
for each RDD computation is by far the most significant factor
* {{GettableData}} is creating a map, [{{_indexOf}}|],
for each single row. However, this map does not change across rows. In the optimizations I've
replaced {{CassandraRow}} with {{GettableByIndexData}} for case classes but this needs further
* For the non streaming case, the query may not necessarily end up to a replica (token aware
routing only works with partition keys, not with tokens and only the first set of tasks end
up to the preferred Spark location, following tasks may not).
* The Spark metrics have been disabled as they contributed somewhat to the total decoding
time, we need to calculate the binary row size without iterating the row column BBs again.
* Scala for loops are implemented as {{range.foreach}} and the closure passed to the foreach
is not inlined as far as I understand so I think we need to be careful with for loops in the
critical path. They definitely show up quite high in JFR, but I cannot say that replacing
a single for loop does cause an impact in the total final time.

Further client size optimization is possible, I would suggest integrating the connector with
the driver at a very low level in order to convert {{ByteBuffers}} directly into Scala types.
In addition, the improvements I've already implemented will need further refinment. However,
in the JFR recordings taken after these optimizations, the Spark executors now spend 66% of
the time waiting and 33% of the time processing rows, whilst the NIO workers spend 95% of
their time waiting. Therefore I suggest moving on to server side optimizations, starting with

This [benchmark|] is ready for review as it will
be used to measure any future improvement. [~rspitzer] would you be able to quickly review
it or find someone interested?

> Create a benchmark to compare HDFS and Cassandra bulk read times
> ----------------------------------------------------------------
>                 Key: CASSANDRA-11542
>                 URL:
>             Project: Cassandra
>          Issue Type: Sub-task
>          Components: Testing
>            Reporter: Stefania
>            Assignee: Stefania
>             Fix For: 3.x
>         Attachments:,,,
> I propose creating a benchmark for comparing Cassandra and HDFS bulk reading performance.
Simple Spark queries will be performed on data stored in HDFS or Cassandra, and the entire
duration will be measured. An example query would be the max or min of a column or a count\(*\).
> This benchmark should allow determining the impact of:
> * partition size
> * number of clustering columns
> * number of value columns (cells)

This message was sent by Atlassian JIRA

View raw message