Hi all,
We want to share our experiences we got during our Cassandra plus Hadoop Map/Reduce evaluation.
Our question was whether Cassandra is suitable for massive distributed data writes using Hadoop's
Map/Reduce feature.
Our setup is described in the attached file 'cassandra_stress_setup.txt'.
The stress test uses 800 map-tasks to generate data and store it into cassandra.
Each map task writes 500.000 items (i.e. rows) resulting in totally 400.000.000 items.
There are max. 8 map tasks in parallel on each node. An item contains (beside the key) two
long and two double values,
so that items are a few 100 bytes in size. This leads to a total data size of approximately
120GB.
The Map-Tasks uses the Hector API. Hector is "feeded" with all three data nodes. The data
is written in chunks of 1000 items.
The ConsitencyLevel is set to ONE.
We ran the stress tests in several runs with different configuration settings (for example
I started with cassandra's default configuration and I used Pelops for another test).
Our observations are like this:
1) Cassandra is really fast - we are really impressed about the huge write throughput. A map
task writing 500.000 items (appr. 200MB) usually finishes under 5 minutes.
2) However - unfortunately all tests failed in the end
In the beginning there are no problems. The first 100 (in some tests the first 300(!)) map
tasks are looking fine. But then the trouble starts.
Hadoop's sample output after ~15 minutes:
Kind % Complete Num Tasks Pending Running Complete Killed Failed/Killed Task Attempts
map 14.99% 800 680 24 96 0 0 / 0
reduce 3.99% 1 0 1 0 0 0 / 0
Some stats:
>top
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
31159 xxxx 20 0 2569m 2.2g 9.8m S 450 18.6 61:44.73 java
>vmstat 1 5
procs -----------memory---------- ---swap-- -----io---- -system-- ----cpu----
r b swpd free buff cache si so bi bo in cs us sy id wa
2 1 36832 353688 242820 6837520 0 0 15 73 3 2 3 0 96 0
11 1 36832 350992 242856 6852136 0 0 1024 20900 4508 11738 19 1 74 6
8 0 36832 339728 242876 6859828 0 0 0 1068 45809 107008 69 10 20 0
1 0 36832 330212 242884 6868520 0 0 0 80 42112 92930 71 8 21 0
2 0 36832 311888 242908 6887708 0 0 1024 0 20277 46669 46 7 47 0
>cassandra/bin/nodetool -h tirdata1 -p 28080 ring
Address Status State Load Owns Token
113427455640312821154458202477256070484
192.168.11.198 Up Normal 6.72 GB 33.33% 0
192.168.11.199 Up Normal 6.72 GB 33.33% 56713727820156410577229101238628035242
192.168.11.202 Up Normal 6.68 GB 33.33% 113427455640312821154458202477256070484
Hadoop's sample output after ~20 minutes:
Kind % Complete Num Tasks Pending Running Complete Killed Failed/Killed Task Attempts
map 15.49% 800 673 24 103 0 6 / 0
reduce 4.16% 1 0 1 0 0 0 / 0
What went wrong? It's always the same. The clients cannot reach the nodes anymore.
java.lang.RuntimeException: work failed
at com.zfabrik.hadoop.impl.HadoopProcessRunner.work(HadoopProcessRunner.java:109)
at com.zfabrik.hadoop.impl.DelegatingMapper.run(DelegatingMapper.java:40)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:625)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at com.zfabrik.hadoop.impl.HadoopProcessRunner.work(HadoopProcessRunner.java:107)
... 4 more
Caused by: java.lang.RuntimeException: me.prettyprint.hector.api.exceptions.HUnavailableException:
: May not be enough replicas present to handle consistency level.
at com.zfabrik.hadoop.impl.DelegatingMapper$1.run(DelegatingMapper.java:47)
at com.zfabrik.work.WorkUnit.work(WorkUnit.java:342)
at com.zfabrik.impl.launch.ProcessRunnerImpl.work(ProcessRunnerImpl.java:189)
... 9 more
Caused by: me.prettyprint.hector.api.exceptions.HUnavailableException: : May not be enough
replicas present to handle consistency level.
at me.prettyprint.cassandra.service.ExceptionsTranslatorImpl.translate(ExceptionsTranslatorImpl.java:52)
at me.prettyprint.cassandra.service.KeyspaceServiceImpl$1.execute(KeyspaceServiceImpl.java:95)
at me.prettyprint.cassandra.service.KeyspaceServiceImpl$1.execute(KeyspaceServiceImpl.java:88)
at me.prettyprint.cassandra.service.Operation.executeAndSetResult(Operation.java:101)
at me.prettyprint.cassandra.connection.HConnectionManager.operateWithFailover(HConnectionManager.java:221)
at me.prettyprint.cassandra.service.KeyspaceServiceImpl.operateWithFailover(KeyspaceServiceImpl.java:129)
at me.prettyprint.cassandra.service.KeyspaceServiceImpl.batchMutate(KeyspaceServiceImpl.java:100)
at me.prettyprint.cassandra.service.KeyspaceServiceImpl.batchMutate(KeyspaceServiceImpl.java:106)
at me.prettyprint.cassandra.model.MutatorImpl$2.doInKeyspace(MutatorImpl.java:203)
at me.prettyprint.cassandra.model.MutatorImpl$2.doInKeyspace(MutatorImpl.java:200)
at me.prettyprint.cassandra.model.KeyspaceOperationCallback.doInKeyspaceAndMeasure(KeyspaceOperationCallback.java:20)
at me.prettyprint.cassandra.model.ExecutingKeyspace.doExecute(ExecutingKeyspace.java:85)
at me.prettyprint.cassandra.model.MutatorImpl.execute(MutatorImpl.java:200)
at sample.cassandra.itemrepo.mapreduce.HectorBasedMassItemGenMapper._flush(HectorBasedMassItemGenMapper.java:122)
at sample.cassandra.itemrepo.mapreduce.HectorBasedMassItemGenMapper.map(HectorBasedMassItemGenMapper.java:103)
at sample.cassandra.itemrepo.mapreduce.HectorBasedMassItemGenMapper.map(HectorBasedMassItemGenMapper.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at com.zfabrik.hadoop.impl.DelegatingMapper$1.run(DelegatingMapper.java:45)
... 11 more
Caused by: UnavailableException()
at org.apache.cassandra.thrift.Cassandra$batch_mutate_result.read(Cassandra.java:16485)
at org.apache.cassandra.thrift.Cassandra$Client.recv_batch_mutate(Cassandra.java:916)
at org.apache.cassandra.thrift.Cassandra$Client.batch_mutate(Cassandra.java:890)
at me.prettyprint.cassandra.service.KeyspaceServiceImpl$1.execute(KeyspaceServiceImpl.java:93)
... 27 more
-------
Task attempt_201104291345_0001_m_000028_0 failed to report status for 602 seconds. Killing!
I also observed also that when connecting to cassandra-cli during the stress test, it was
not possible to list the items written so far:
[default@unknown] use ItemRepo;
Authenticated to keyspace: ItemRepo
[default@ItemRepo] list Items;
Using default limit of 100
Internal error processing get_range_slices
It seems to me that from the point I performed the read operation in the cli tool, the node
becomes somehow confused.
Looking on the jconsole shows that up to this point the heap is well: it grows and gcs clears
it again.
But from this point on, gcs doesn't really help anymore (see attached screenshot).
This has also impact on the other nodes as one can see in the second screenshot. The CPU Usage
goes down as well as the heap memory usage.
I'll run another stress test at the weekend with
* MAX_HEAP_SIZE="4G"
* HEAP_NEWSIZE="400M"
Best Regards
Udo
|