Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 40C4B182B0 for ; Wed, 20 Apr 2016 03:47:26 +0000 (UTC) Received: (qmail 14252 invoked by uid 500); 20 Apr 2016 03:47:26 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 14200 invoked by uid 500); 20 Apr 2016 03:47:26 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 14182 invoked by uid 99); 20 Apr 2016 03:47:26 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Apr 2016 03:47:26 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id E76492C1F5A for ; Wed, 20 Apr 2016 03:47:25 +0000 (UTC) Date: Wed, 20 Apr 2016 03:47:25 +0000 (UTC) From: "Stefania (JIRA)" To: commits@cassandra.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249213#comment-15249213 ] Stefania commented on CASSANDRA-11542: -------------------------------------- I've run [the benchmark|https://github.com/stef1927/spark-load-perf/tree/master] described above on a 5-node GCE {{n1-standard-8}} cluster (30 GB RAM and 8 virtual cores per node, HDDs). The following schemas were tested: * {{CREATE TABLE ks.schema1 (id TEXT, timestamp BIGINT, val1 INT, val2 INT, val3 INT, val4 INT, val5 INT, val6 INT, val7 INT, val8 INT, val9 INT, val10 INT, PRIMARY KEY (id, timestamp))}} * {{CREATE TABLE ks.schema2 (id TEXT, timestamp BIGINT, val1 INT, val2 INT, val3 INT, val4 INT, val5 INT, val6 INT, val7 INT, val8 INT, val9 INT, val10 INT, PRIMARY KEY ((id, timestamp)))}} * {{CREATE TABLE ks.schema3 (id TEXT, timestamp BIGINT, data TEXT, PRIMARY KEY (id, timestamp))}} * {{CREATE TABLE ks.schama4 (id TEXT, timestamp BIGINT, data TEXT, PRIMARY KEY ((id, timestamp)))}} The first two schemas are identical except that the second schema uses a composite partition key whist the first one uses a clustering key. The same is true for the third and forth schemas. The difference between the first two schemas and the last twos is that the 10 integer values are encoded into a string in the last two schemas. This was done to measure the impact of reading multiple values from Cassandra, whilst the impact of clustering rows can be determined by comparing schemas one and two or three and four. 15 million rows of random data were generated and stored in the following sources: * Cassandra * A CSV file stored in HDFS * A Parquet file stored in HDFS After generating the data, the Cassandra tables were flushed and compacted. The OS page cache was also flushed after generating the data, and after every test run, via {{sync && echo 3 | sudo tee /proc/sys/vm/drop_caches}}. The HDFS files were divided into 1000 partitions due to how the data was generated. The benchmark either retrieves a Spark RDD (Resilient Distributed Datasets) or a DF (Data Frame). The difference between the two is that the RDD contains the entire table or file data, whilst the data frame only contains the two columns that are used to produce the final result. The following tests were performed in random order: * *Cassandra RDD:* the entire Cassandra table is loaded into an RDD via {{sc.cassandraTable}}; * *CSV RDD:* the CSV data is loaded into an RDD via {{sc.textFile}}; * *Parquet RDD:* the Parquet data is loaded into an RDD via {{sqlContext.read.parquet}} * *Cassandra DF:* a SELECT predicate is pushed to the server via {{CassandraSQLContext}} to retrieve two columns that are saved into a data frame; * *CSV DF:* the CSV data is loaded into a DF via the spark SQL context using {{com.databricks.spark.csv}} as the format, and two columns are saved in a data frame; * *Parquet DF:* a SELECT predicate is used via {{SQLContext}} to retrieve two columns that are saved into a data frame. RDD or DF are iterated and the result is calculated by selecting the global maximum of the maximum of two columns for each row. The time taken to create either RDD or DF and to iterate them is then measured. h3. RDD Results *Schema1* ||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet|| |Run 1|3.494837|5.478472|43.423967|8|12| |Run 2|2.845326|5.167405|47.170665|9|17| |Run 3|2.613721|4.904634|48.451015|10|19| |Average|2.98|5.18|46.35|9|16| |Std. Dev.|0.46|0.29|2.61| | | *Schema2* ||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet|| |Run 1|3.486563|5.635907|46.00437|8|13| |Run 2|2.68518|5.13979|46.108184|9|17| |Run 3|2.673291|5.035654|46.076284|9|17| |Average|2.95|5.27|46.06|9|16| |Std. Dev.|0.47|0.32|0.05| | | *Schema3* ||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet|| |Run 1|6.122885|6.79348|29.643609|4|5| |Run 2|5.826286|6.563861|32.900336|5|6| |Run 3|5.751427|6.41375|33.176358|5|6| |Average|5.90|6.59|31.91|5|5| |Std. Dev.|0.20|0.19|1.96| | | *Schema4* ||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet|| |Run 1|6.137645|7.511649|29.518883|4|5| |Run 2|5.984526|6.569239|30.723268|5|5| |Run 3|5.763102|6.590789|30.789137|5|5| |Average|5.96|6.89|30.34|4|5| |Std. Dev.|0.19|0.54|0.72| | | h3. DF Results *Schema1* ||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet|| |Run 1|2.843182|15.651141|37.997299|2|13| |Run 2|2.357436|11.582413|30.836383|3|13| |Run 3|2.386732|11.75583|30.061433|3|13| |Average|2.53|13.00|32.97|3|13| |Std. Dev.|0.27|2.30|4.38| | | *Schema2* ||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet|| |Run 1|3.016107|12.484605|95.12199|8|32| |Run 2|2.455694|12.13422|37.583736|3|15| |Run 3|2.329835|12.007215|34.966389|3|15| |Average|2.60|12.21|55.89|5|21| |Std. Dev.|0.37|0.25|34.00| | | *Schema3* ||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet|| |Run 1|5.544086|14.641745|22.944184|2|4| |Run 2|4.306315|13.056165|22.287305|2|5| |Run 3|4.265064|12.736621|23.91004|2|6| |Average|4.71|13.48|23.05|2|5| |Std. Dev.|0.73|1.02|0.82| | | *Schema4* ||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet|| |Run 1|5.057719|13.460405|26.966471|2|5| |Run 2|4.440278|13.090147|25.700175|2|6| |Run 3|3.956446|12.57829|24.537605|2|6| |Average|4.48|13.04|25.73|2|6| |Std. Dev.|0.55|0.44|1.21| | | I've attached a zip file with the raw data, [^spark-load-perf-results-001.zip]. More details on the benchmark are available in the [README|https://github.com/stef1927/spark-load-perf/tree/master]. h3. Conclusions In the RDD tests, Cassandra is 9 times slower than CSV for the first two schemas and 4-5 times slower for the last two schemas. When compared to Parquet, it is 16 times slower for the first two schemas and 5 times slower for the last two schemas. In the DF tests, Cassandra is 3 times slower than CSV for the first two schemas and about 2 times slower for the third and forth schemas. When compared to Parquet, it is 13-15 times slower for the first two schemas and 5-6 times slower for the last two schemas. I've excluded an outlying value for the DF tests of schema 2, 95.12. Extremely large values for Cassandra were observed 3 times, the other 2 values were excluded from the final data due to other problems with the benchmark. Storing data into a text string, with the client decoding the values, is faster by approximately a factor 1.5. Clustering columns vs composite partition keys seem to have only a small impact on performance. h3. Next steps I intend to modify the Spark Connector to support the streaming proof of concept delivered in the parent ticket CASSANDRA-9259. If the impact of streaming is as significant as hoped (factor 3 or 4 improvement), then I intend to implement the [approach|https://issues.apache.org/jira/browse/CASSANDRA-11521?focusedCommentId=15232701&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15232701] suggested in CASSANDRA-11521 and compare the two. I would also like to spend some time understanding the difference in performance between the first two schemas and the last two, specifically how much of this is due to encoding CQL values vs. reading cells from disk. Further, if large outlying values continue to be observed, we need to understand the reason for them. > Create a benchmark to compare HDFS and Cassandra bulk read times > ---------------------------------------------------------------- > > Key: CASSANDRA-11542 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11542 > Project: Cassandra > Issue Type: Sub-task > Components: Testing > Reporter: Stefania > Assignee: Stefania > Fix For: 3.x > > Attachments: spark-load-perf-results-001.zip > > > I propose creating a benchmark for comparing Cassandra and HDFS bulk reading performance. Simple Spark queries will be performed on data stored in HDFS or Cassandra, and the entire duration will be measured. An example query would be the max or min of a column or a count\(*\). > This benchmark should allow determining the impact of: > * partition size > * number of clustering columns > * number of value columns (cells) -- This message was sent by Atlassian JIRA (v6.3.4#6332)