I am trying to read and process data in Cassandra using Hadoop. I have a 4-node Cassandra cluster, and an 8-node Hadoop cluster:
- 1 Namenode/Jobtracker
- 7 Datanodes/Tasktrackers (4 of them are also hosting Cassandra)
I am using Cassandra 1.2 beta, Hadoop 0.20.2, java 1.6_u_34, 7 of my nodes are on SLES 10 (Linux kernel: 126.96.36.199-0.76.8-smp) and the last one is on SLES 11(Linux kernel: 188.8.131.52-0.7-default). They are all 24 cores with 33 GB ram, but for some reasons, the node running on SLES 11 is running Hadoop jobs significantly faster then the others (two to three times faster); any explanation for this is welcome as well.
In my Hadoop job, I am using ColumnFamilyInputFormat and ColumnFamilyOutputFormat.
Here is my mapper: Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, Text>,
and my reducer: Reducer<Text, Text, ByteBuffer, List<Mutation>>.
The input of my mapper is the values of the columns given in input. In output of my map, I write those values in the Text format separated by comas. I ran the task on about 400 million rows in my database so the map function is called one time for each row. When I run the job with 6 concurrent map tasks on each server and 7 Hadoop servers, the job takes about an hour and a half (the reduce step is done in about 5 seconds, so the problem is in map task), which is too long...
So I set some timers between each call to the map function, and here is what I get:
After mapping about 4150 - 4160 rows (each row has 8 columns and values are strings or long) in Cassandra in 60 ms approximately, there is a gap in time.
This gap is not the same for all the machines:
- it is 200 ms on the node Cassandra + Hadoop that is running on SLES 11 (Cassandra is using 400% cpu on this node)
- it is 4200 ms on the 3 nodes that are hadoop only
- it is 900 ms on two nodes that are Cassandra + Hadoop and running on SLES 10 (Cassandra is using 400% cpu on this node)
- it is 4200 ms on the last Cassandra + Hadoop node (Cassandra is using 2300% cpu on this node and I get a lot of Garbage collection messages in the cassandra logs of this node only)
When I run only 1 concurrent map task per node (instead of 6 above), I get the following results:
- it is 200 ms on the node Cassandra + Hadoop that is running on SLES 11 (Cassandra is using 150% cpu on this node)
- it is 600 ms on the 3 nodes that are hadoop only
- it is 600 ms on two nodes that are Cassandra + Hadoop and running on SLES 10 (Cassandra is using 150% cpu on this node)
- it is 600 ms on the last Cassandra + Hadoop node (Cassandra is using 400% cpu on this node and I don't get Garbage collection messages anymore in the cassandra logs)
I do not really know what is happening during this gap; my guess would be that Hadoop is reading data in Cassandra, streaming it to the Hadoop nodes and finally writing it to the Hadoop Distributed File System.
Does anyone understand how reads are done when using Hadoop and Cassandra? and what is exactly happening during this gap in time? and why there is such a difference in time between nodes running on SLES10 and the node running on SLES 11?
Why does it seem like this gap in time is smaller on nodes running Cassandra + Hadoop?
Finally, does anyone know why this gap in time occurs after approximately 4160 rows which represent about 32 KB in my case? Is there any parameter I am not aware of to change this?