incubator-cassandra-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jonathan Ellis <jbel...@gmail.com>
Subject Re: Ingesting from Hadoop to Cassandra
Date Wed, 27 May 2009 16:06:09 GMT
That shows both cassandra and hadoop in socket.write, which is
strange.  It's like thrift got de-synced somehow.

Can you reproduce with a non-hadoop client program that you can share here?

-Jonathan

On Tue, May 26, 2009 at 9:40 PM, Alexandre Linares <linares@ymail.com> wrote:
> Jun, Jonathan,
>
> Thanks for the tips.  I've been experimenting with both blocking and
> non-blocking, both resulting in the same behavior.
>
> I reproduced the problem on a 1 node cassandra cluster; here's the thread
> dump:
>
> Full thread dump Java HotSpot(TM) Server VM (11.3-b02 mixed mode):
>
> "Attach Listener" daemon prio=10 tid=0x08dd1000 nid=0x6653 waiting on
> condition [0x00000000..0x00000000]
>    java.lang.Thread.State: RUNNABLE
>
> "pool-1-thread-2" prio=10 tid=0x6c03fc00 nid=0x6603 runnable
> [0x6bcfe000..0x6bcfeeb0]
>    java.lang.Thread.State: RUNNABLE
>     at java.net.SocketOutputStream.socketWrite0(Native Method)
>     at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
>     at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
>     at
> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
>     at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123)
>     - locked <0x747f6c38> (a java.io.BufferedOutputStream)
>     at
> org.apache.thrift.transport.TIOStreamTransport.flush(TIOStreamTransport.java:153)
>     at
> org.apache.cassandra.service.Cassandra$Processor$batch_insert_superColumn.process(Cassandra.java:1159)
>     at
> org.apache.cassandra.service.Cassandra$Processor.process(Cassandra.java:805)
>     at
> org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:252)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>     at java.lang.Thread.run(Thread.java:619)
>
> "pool-1-thread-1" prio=10 tid=0x6c826c00 nid=0x652e waiting on condition
> [0x6c483000..0x6c483f30]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d6e630> (a
> java.util.concurrent.SynchronousQueue$TransferStack)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:422)
>     at
> java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:323)
>     at java.util.concurrent.SynchronousQueue.take(SynchronousQueue.java:857)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "GMFD:1" prio=10 tid=0x6c823400 nid=0x64f9 waiting on condition
> [0x6c4d4000..0x6c4d4db0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d33058> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "Timer-1" prio=10 tid=0x6c822400 nid=0x64f8 in Object.wait()
> [0x6c525000..0x6c525e30]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x72d35e18> (a java.util.TaskQueue)
>     at java.util.TimerThread.mainLoop(Timer.java:509)
>     - locked <0x72d35e18> (a java.util.TaskQueue)
>     at java.util.TimerThread.run(Timer.java:462)
>
> "Timer thread for monitoring AnalyticsContext" daemon prio=10 tid=0x6c821400
> nid=0x64f7 in Object.wait() [0x6c576000..0x6c5770b0]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x72d3f998> (a java.util.TaskQueue)
>     at java.util.TimerThread.mainLoop(Timer.java:509)
>     - locked <0x72d3f998> (a java.util.TaskQueue)
>     at java.util.TimerThread.run(Timer.java:462)
>
> "UDP Selector Manager" prio=10 tid=0x6c820400 nid=0x64f6 runnable
> [0x6c5c7000..0x6c5c8130]
>    java.lang.Thread.State: RUNNABLE
>     at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>     at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:215)
>     at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
>     at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
>     - locked <0x72d4dac0> (a sun.nio.ch.Util$1)
>     - locked <0x72d4dab0> (a java.util.Collections$UnmodifiableSet)
>     - locked <0x72bcad58> (a sun.nio.ch.EPollSelectorImpl)
>     at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
>     at org.apache.cassandra.net.SelectorManager.run(SelectorManager.java:97)
>
> "TCP Selector Manager" prio=10 tid=0x6c81f400 nid=0x64f5 runnable
> [0x6c618000..0x6c618fb0]
>    java.lang.Thread.State: RUNNABLE
>     at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>     at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:215)
>     at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
>     at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
>     - locked <0x72d56578> (a sun.nio.ch.Util$1)
>     - locked <0x72d56568> (a java.util.Collections$UnmodifiableSet)
>     - locked <0x72bcad98> (a sun.nio.ch.EPollSelectorImpl)
>     at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
>     at org.apache.cassandra.net.SelectorManager.run(SelectorManager.java:97)
>
> "HINTED-HANDOFF-POOL:1" prio=10 tid=0x6c80d800 nid=0x64f4 waiting on
> condition [0x6c669000..0x6c66a030]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d3ff70> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1963)
>     at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
>     at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:583)
>     at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:576)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "MINOR-COMPACTION-POOL:1" prio=10 tid=0x6c809c00 nid=0x64f3 waiting on
> condition [0x6c6ba000..0x6c6baeb0]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d7c5f8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1963)
>     at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
>     at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:583)
>     at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:576)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "MEMTABLE-FLUSHER-POOL:1" prio=10 tid=0x6c806800 nid=0x64f2 waiting on
> condition [0x6c70b000..0x6c70bf30]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d7c658> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "HTTP-REQUEST:1" prio=10 tid=0x6c804400 nid=0x64f1 waiting on condition
> [0x6c75c000..0x6c75cdb0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d7c778> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "MAP-REDUCE-STAGE:4" prio=10 tid=0x6c802c00 nid=0x64f0 waiting on condition
> [0x6c7ad000..0x6c7ade30]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d7c898> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "MAP-REDUCE-STAGE:3" prio=10 tid=0x6c801400 nid=0x64ef waiting on condition
> [0x6c7fe000..0x6c7ff0b0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d7c898> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "MAP-REDUCE-STAGE:2" prio=10 tid=0x6c800400 nid=0x64ee waiting on condition
> [0x6c991000..0x6c992130]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d7c898> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "MAP-REDUCE-STAGE:1" prio=10 tid=0x6d8fe400 nid=0x64ed waiting on condition
> [0x6c9e2000..0x6c9e2fb0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d7c898> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "ROW-READ-STAGE:8" prio=10 tid=0x6d8fd000 nid=0x64ec waiting on condition
> [0x6ca33000..0x6ca34030]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d7f828> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "ROW-READ-STAGE:7" prio=10 tid=0x6d8fb800 nid=0x64eb waiting on condition
> [0x6ca84000..0x6ca84eb0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d7f828> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "ROW-READ-STAGE:6" prio=10 tid=0x6d8fa000 nid=0x64ea waiting on condition
> [0x6cad5000..0x6cad5f30]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d7f828> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "ROW-READ-STAGE:5" prio=10 tid=0x6d8f8800 nid=0x64e9 waiting on condition
> [0x6cb26000..0x6cb26db0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d7f828> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "ROW-READ-STAGE:4" prio=10 tid=0x6d8f7400 nid=0x64e8 waiting on condition
> [0x6cb77000..0x6cb77e30]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d7f828> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "ROW-READ-STAGE:3" prio=10 tid=0x6d8f5c00 nid=0x64e7 waiting on condition
> [0x6cbc8000..0x6cbc90b0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d7f828> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "ROW-READ-STAGE:2" prio=10 tid=0x6d8f4400 nid=0x64e6 waiting on condition
> [0x6cc19000..0x6cc1a130]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d7f828> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "ROW-READ-STAGE:1" prio=10 tid=0x6d8f2c00 nid=0x64e5 waiting on condition
> [0x6cc6a000..0x6cc6afb0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d7f828> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "ROW-MUTATION-STAGE:4" prio=10 tid=0x6d8f1800 nid=0x64e4 waiting on
> condition [0x6ccbb000..0x6ccbc030]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d7cc70> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "ROW-MUTATION-STAGE:3" prio=10 tid=0x6d8f0000 nid=0x64e3 waiting on
> condition [0x6cd0c000..0x6cd0ceb0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d7cc70> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "ROW-MUTATION-STAGE:2" prio=10 tid=0x6d8ee800 nid=0x64e2 waiting on
> condition [0x6cd5d000..0x6cd5df30]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d7cc70> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "ROW-MUTATION-STAGE:1" prio=10 tid=0x6d8ed000 nid=0x64e1 waiting on
> condition [0x6cdae000..0x6cdaedb0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d7cc70> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "CONSISTENCY-MANAGER:4" prio=10 tid=0x6d8ebc00 nid=0x64e0 waiting on
> condition [0x6cdff000..0x6cdffe30]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d76fd0> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "CONSISTENCY-MANAGER:3" prio=10 tid=0x6d8ea400 nid=0x64df waiting on
> condition [0x6ce50000..0x6ce510b0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d76fd0> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "CONSISTENCY-MANAGER:2" prio=10 tid=0x6d8e8c00 nid=0x64de waiting on
> condition [0x6cea1000..0x6cea2130]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d76fd0> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "CONSISTENCY-MANAGER:1" prio=10 tid=0x6d8e6000 nid=0x64dd waiting on
> condition [0x6cef2000..0x6cef2fb0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d76fd0> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "RESPONSE-STAGE:4" prio=10 tid=0x6d8e4000 nid=0x64dc waiting on condition
> [0x6cf43000..0x6cf44030]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d78200> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "RESPONSE-STAGE:3" prio=10 tid=0x6d8e2c00 nid=0x64db waiting on condition
> [0x6cf94000..0x6cf94eb0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d78200> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "RESPONSE-STAGE:2" prio=10 tid=0x6d8e1400 nid=0x64da waiting on condition
> [0x6cfe5000..0x6cfe5f30]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d78200> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "RESPONSE-STAGE:1" prio=10 tid=0x6d8dfc00 nid=0x64d9 waiting on condition
> [0x6d036000..0x6d036db0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d78200> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "MESSAGE-STREAMING-POOL:1" prio=10 tid=0x6d8de400 nid=0x64d8 waiting on
> condition [0x6d087000..0x6d087e30]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d57e70> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "MESSAGE-DESERIALIZER-POOL:4" prio=10 tid=0x6d8dcc00 nid=0x64d7 waiting on
> condition [0x6d0d8000..0x6d0d90b0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d57f60> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "MESSAGE-DESERIALIZER-POOL:3" prio=10 tid=0x6d8db400 nid=0x64d6 waiting on
> condition [0x6d129000..0x6d12a130]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d57f60> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "MESSAGE-DESERIALIZER-POOL:2" prio=10 tid=0x6d8da000 nid=0x64d5 waiting on
> condition [0x6d17a000..0x6d17afb0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d57f60> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "MESSAGE-DESERIALIZER-POOL:1" prio=10 tid=0x6d8d8800 nid=0x64d4 waiting on
> condition [0x6d1cb000..0x6d1cc030]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d57f60> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "MESSAGE-SERIALIZER-POOL:4" prio=10 tid=0x6d8d7000 nid=0x64d3 waiting on
> condition [0x6d21c000..0x6d21ceb0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d580e0> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "MESSAGE-SERIALIZER-POOL:3" prio=10 tid=0x6d8d5800 nid=0x64d2 waiting on
> condition [0x6d26d000..0x6d26df30]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d580e0> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "MESSAGE-SERIALIZER-POOL:2" prio=10 tid=0x6d8d4400 nid=0x64d1 waiting on
> condition [0x6d2be000..0x6d2bedb0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d580e0> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "MESSAGE-SERIALIZER-POOL:1" prio=10 tid=0x6d8d2c00 nid=0x64d0 waiting on
> condition [0x6d30f000..0x6d30fe30]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d580e0> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "MESSAGING-SERVICE-POOL:4" prio=10 tid=0x6d8d1400 nid=0x64cf waiting on
> condition [0x6d360000..0x6d3610b0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d66cc8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "MESSAGING-SERVICE-POOL:3" prio=10 tid=0x6d8cfc00 nid=0x64ce waiting on
> condition [0x6d3b1000..0x6d3b2130]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d66cc8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "MESSAGING-SERVICE-POOL:2" prio=10 tid=0x6d8ce800 nid=0x64cd waiting on
> condition [0x6d402000..0x6d402fb0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d66cc8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "MESSAGING-SERVICE-POOL:1" prio=10 tid=0x6d8cd000 nid=0x64cc waiting on
> condition [0x6d453000..0x6d454030]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d66cc8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "CACHETABLE-TIMER-2" daemon prio=10 tid=0x6d8cb800 nid=0x64cb in
> Object.wait() [0x6d4a4000..0x6d4a4eb0]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x72d66640> (a java.util.TaskQueue)
>     at java.util.TimerThread.mainLoop(Timer.java:509)
>     - locked <0x72d66640> (a java.util.TaskQueue)
>     at java.util.TimerThread.run(Timer.java:462)
>
> "CACHETABLE-TIMER-1" daemon prio=10 tid=0x6d8ca000 nid=0x64ca in
> Object.wait() [0x6d4f5000..0x6d4f5f30]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x72d66670> (a java.util.TaskQueue)
>     at java.util.TimerThread.mainLoop(Timer.java:509)
>     - locked <0x72d66670> (a java.util.TaskQueue)
>     at java.util.TimerThread.run(Timer.java:462)
>
> "LOAD-BALANCER-STAGE:1" prio=10 tid=0x6d8c8c00 nid=0x64c9 waiting on
> condition [0x6d546000..0x6d546db0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d66f08> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "LB-TARGET:1" prio=10 tid=0x6d8c7c00 nid=0x64c8 waiting on condition
> [0x6d597000..0x6d597e30]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d4dd50> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "BOOT-STRAPPER:1" prio=10 tid=0x6d8c7000 nid=0x64c7 waiting on condition
> [0x6d5e8000..0x6d5e90b0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72d33328> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "Timer-0" prio=10 tid=0x6d8c4400 nid=0x64c6 in Object.wait()
> [0x6d639000..0x6d63a130]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x72d35fd8> (a java.util.TaskQueue)
>     at java.util.TimerThread.mainLoop(Timer.java:509)
>     - locked <0x72d35fd8> (a java.util.TaskQueue)
>     at java.util.TimerThread.run(Timer.java:462)
>
> "FILEUTILS-DELETE-POOL:1" prio=10 tid=0x6d8bc800 nid=0x64c5 waiting on
> condition [0x6d68a000..0x6d68afb0]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x72b71e58> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
>     at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:358)
>     at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619)
>
> "RMI TCP Accept-0" daemon prio=10 tid=0x6d841800 nid=0x64c3 runnable
> [0x6d75c000..0x6d75ceb0]
>    java.lang.Thread.State: RUNNABLE
>     at java.net.PlainSocketImpl.socketAccept(Native Method)
>     at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:384)
>     - locked <0x72b15f48> (a java.net.SocksSocketImpl)
>     at java.net.ServerSocket.implAccept(ServerSocket.java:453)
>     at java.net.ServerSocket.accept(ServerSocket.java:421)
>     at
> sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRMIServerSocketFactory.java:34)
>     at
> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:369)
>     at
> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:341)
>     at java.lang.Thread.run(Thread.java:619)
>
> "RMI TCP Accept-8080" daemon prio=10 tid=0x6d83e800 nid=0x64c2 runnable
> [0x6d7ad000..0x6d7adf30]
>    java.lang.Thread.State: RUNNABLE
>     at java.net.PlainSocketImpl.socketAccept(Native Method)
>     at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:384)
>     - locked <0x72b16038> (a java.net.SocksSocketImpl)
>     at java.net.ServerSocket.implAccept(ServerSocket.java:453)
>     at java.net.ServerSocket.accept(ServerSocket.java:421)
>     at
> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:369)
>     at
> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:341)
>     at java.lang.Thread.run(Thread.java:619)
>
> "RMI TCP Accept-0" daemon prio=10 tid=0x6d834400 nid=0x64c1 runnable
> [0x6d7fe000..0x6d7fedb0]
>    java.lang.Thread.State: RUNNABLE
>     at java.net.PlainSocketImpl.socketAccept(Native Method)
>     at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:384)
>     - locked <0x72b16128> (a java.net.SocksSocketImpl)
>     at java.net.ServerSocket.implAccept(ServerSocket.java:453)
>     at java.net.ServerSocket.accept(ServerSocket.java:421)
>     at
> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:369)
>     at
> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:341)
>     at java.lang.Thread.run(Thread.java:619)
>
> "Low Memory Detector" daemon prio=10 tid=0x08cfb800 nid=0x64c0 runnable
> [0x00000000..0x00000000]
>    java.lang.Thread.State: RUNNABLE
>
> "CompilerThread1" daemon prio=10 tid=0x08cf9c00 nid=0x64bf waiting on
> condition [0x00000000..0x6daa2568]
>    java.lang.Thread.State: RUNNABLE
>
> "CompilerThread0" daemon prio=10 tid=0x08cf7000 nid=0x64be waiting on
> condition [0x00000000..0x6db235e8]
>    java.lang.Thread.State: RUNNABLE
>
> "JDWP Event Helper Thread" daemon prio=10 tid=0x08ceb800 nid=0x64bd runnable
> [0x00000000..0x00000000]
>    java.lang.Thread.State: RUNNABLE
>
> "JDWP Transport Listener: dt_socket" daemon prio=10 tid=0x08ce9000
> nid=0x64bc runnable [0x00000000..0x00000000]
>    java.lang.Thread.State: RUNNABLE
>
> "Signal Dispatcher" daemon prio=10 tid=0x08ce0000 nid=0x64bb runnable
> [0x00000000..0x6dc16b90]
>    java.lang.Thread.State: RUNNABLE
>
> "Surrogate Locker Thread (CMS)" daemon prio=10 tid=0x08cde800 nid=0x64ba
> waiting on condition [0x00000000..0x6e07711c]
>    java.lang.Thread.State: RUNNABLE
>
> "Finalizer" daemon prio=10 tid=0x08ccb800 nid=0x64b9 in Object.wait()
> [0x6dc67000..0x6dc67db0]
>    java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x72b727f8> (a java.lang.ref.ReferenceQueue$Lock)
>     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:116)
>     - locked <0x72b727f8> (a java.lang.ref.ReferenceQueue$Lock)
>     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:132)
>     at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)
>
> "Reference Handler" daemon prio=10 tid=0x08cca400 nid=0x64b8 in
> Object.wait() [0x6dcb8000..0x6dcb8e30]
>    java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x72b65570> (a java.lang.ref.Reference$Lock)
>     at java.lang.Object.wait(Object.java:485)
>     at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116)
>     - locked <0x72b65570> (a java.lang.ref.Reference$Lock)
>
> "main" prio=10 tid=0x08c02400 nid=0x64b3 runnable [0xb7f33000..0xb7f341f8]
>    java.lang.Thread.State: RUNNABLE
>     at java.net.PlainSocketImpl.socketAccept(Native Method)
>     at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:384)
>     - locked <0x72d6ce40> (a java.net.SocksSocketImpl)
>     at java.net.ServerSocket.implAccept(ServerSocket.java:453)
>     at java.net.ServerSocket.accept(ServerSocket.java:421)
>     at
> org.apache.thrift.transport.TServerSocket.acceptImpl(TServerSocket.java:118)
>     at
> org.apache.thrift.transport.TServerSocket.acceptImpl(TServerSocket.java:34)
>     at
> org.apache.thrift.transport.TServerTransport.accept(TServerTransport.java:31)
>     at
> org.apache.thrift.server.TThreadPoolServer.serve(TThreadPoolServer.java:183)
>     at
> org.apache.cassandra.service.CassandraDaemon.start(CassandraDaemon.java:97)
>     at
> org.apache.cassandra.service.CassandraDaemon.main(CassandraDaemon.java:133)
>
> "VM Thread" prio=10 tid=0x08cc6c00 nid=0x64b7 runnable
>
> "Gang worker#0 (Parallel GC Threads)" prio=10 tid=0x08c06800 nid=0x64b4
> runnable
>
> "Gang worker#1 (Parallel GC Threads)" prio=10 tid=0x08c08000 nid=0x64b5
> runnable
>
> "Concurrent Mark-Sweep GC Thread" prio=10 tid=0x08c75c00 nid=0x64b6 runnable
> "VM Periodic Task Thread" prio=10 tid=0x6d843800 nid=0x64c4 waiting on
> condition
>
> JNI global references: 2915
>
> ##########################################################
>
> And here's the thread dump on the hadoop side (scheduled 1 reduce task):
>
> Full thread dump Java HotSpot(TM) Server VM (10.0-b22 mixed mode):
>
> "Attach Listener" daemon prio=10 tid=0x09f27000 nid=0x3864 waiting on
> condition [0x00000000..0x00000000]
>    java.lang.Thread.State: RUNNABLE
>
> "IPC Client (47) connection to <hostname>/<ip>:9000 from linares" daemon
> prio=10 tid=0x0a1b3800 nid=0x3857 in Object.wait() [0x8fc18000..0x8fc18db0]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0xb165ee40> (a org.apache.hadoop.ipc.Client$Connection)
>     at org.apache.hadoop.ipc.Client$Connection.waitForWork(Client.java:398)
>     - locked <0xb165ee40> (a org.apache.hadoop.ipc.Client$Connection)
>     at org.apache.hadoop.ipc.Client$Connection.run(Client.java:441)
>
> "Thread-31" daemon prio=10 tid=0x900f3800 nid=0x3826 in Object.wait()
> [0x8ff5c000..0x8ff5ce30]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x94f47020> (a java.util.LinkedList)
>     at
> org.apache.hadoop.dfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:1905)
>     - locked <0x94f47020> (a java.util.LinkedList)
>
> "Comm thread for attempt_200905261357_0003_r_000000_1" daemon prio=10
> tid=0x900ce800 nid=0x37da waiting on condition [0x8ffad000..0x8ffaddb0]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
>     at java.lang.Thread.sleep(Native Method)
>     at org.apache.hadoop.mapred.Task$1.run(Task.java:301)
>     at java.lang.Thread.run(Thread.java:619)
>
> "org.apache.hadoop.dfs.DFSClient$LeaseChecker@11671b2" daemon prio=10
> tid=0x900ce400 nid=0x37d9 waiting on condition [0x8fffe000..0x8fffee30]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
>     at java.lang.Thread.sleep(Native Method)
>     at org.apache.hadoop.dfs.DFSClient$LeaseChecker.run(DFSClient.java:791)
>     at java.lang.Thread.run(Thread.java:619)
>
> "IPC Client (47) connection to /127.0.0.1:57136 from an unknown user" daemon
> prio=10 tid=0x900cdc00 nid=0x37d7 in Object.wait() [0x901d5000..0x901d6130]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x94e504b8> (a org.apache.hadoop.ipc.Client$Connection)
>     at org.apache.hadoop.ipc.Client$Connection.waitForWork(Client.java:398)
>     - locked <0x94e504b8> (a org.apache.hadoop.ipc.Client$Connection)
>     at org.apache.hadoop.ipc.Client$Connection.run(Client.java:441)
>
> "Low Memory Detector" daemon prio=10 tid=0x09e6bc00 nid=0x37d5 runnable
> [0x00000000..0x00000000]
>    java.lang.Thread.State: RUNNABLE
>
> "CompilerThread1" daemon prio=10 tid=0x09e6a000 nid=0x37d4 waiting on
> condition [0x00000000..0x90535378]
>    java.lang.Thread.State: RUNNABLE
>
> "CompilerThread0" daemon prio=10 tid=0x09e67800 nid=0x37d3 waiting on
> condition [0x00000000..0x905b63f8]
>    java.lang.Thread.State: RUNNABLE
>
> "Signal Dispatcher" daemon prio=10 tid=0x09e66800 nid=0x37d2 runnable
> [0x00000000..0x90607a90]
>    java.lang.Thread.State: RUNNABLE
>
> "Finalizer" daemon prio=10 tid=0x09e4b400 nid=0x37d1 in Object.wait()
> [0x90858000..0x90858e30]
>    java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x94e197d8> (a java.lang.ref.ReferenceQueue$Lock)
>     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:116)
>     - locked <0x94e197d8> (a java.lang.ref.ReferenceQueue$Lock)
>     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:132)
>     at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)
>
> "Reference Handler" daemon prio=10 tid=0x09e4a400 nid=0x37d0 in
> Object.wait() [0x908a9000..0x908aa0b0]
>    java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x94e86b08> (a java.lang.ref.Reference$Lock)
>     at java.lang.Object.wait(Object.java:485)
>     at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116)
>     - locked <0x94e86b08> (a java.lang.ref.Reference$Lock)
>
> "main" prio=10 tid=0x09dc7000 nid=0x37cc runnable [0xb7f70000..0xb7f71208]
>    java.lang.Thread.State: RUNNABLE
>     at java.net.SocketOutputStream.socketWrite0(Native Method)
>     at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
>     at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
>     at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
>     - locked <0x94f8c870> (a java.io.BufferedOutputStream)
>     at
> org.apache.thrift.transport.TIOStreamTransport.write(TIOStreamTransport.java:139)
>     at
> org.apache.thrift.protocol.TBinaryProtocol.writeBinary(TBinaryProtocol.java:184)
>     at org.apache.cassandra.service.column_t.write(column_t.java:321)
>     at
> org.apache.cassandra.service.superColumn_t.write(superColumn_t.java:291)
>     at
> org.apache.cassandra.service.batch_mutation_super_t.write(batch_mutation_super_t.java:365)
>     at
> org.apache.cassandra.service.Cassandra$batch_insert_superColumn_args.write(Cassandra.java:9776)
>     at
> org.apache.cassandra.service.Cassandra$Client.send_batch_insert_superColumn(Cassandra.java:546)
>     at
> com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.pushDocuments(CassandraImport.java:168)
>     at
> com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.sendOut(CassandraImport.java:146)
>     at
> com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.reduce(CassandraImport.java:127)
>     at
> com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.reduce(CassandraImport.java:1)
>     at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:318)
>     at
> org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2198)
>
> "VM Thread" prio=10 tid=0x09e47000 nid=0x37cf runnable
>
> "GC task thread#0 (ParallelGC)" prio=10 tid=0x09dce000 nid=0x37cd runnable
>
> "GC task thread#1 (ParallelGC)" prio=10 tid=0x09dcf000 nid=0x37ce runnable
>
> "VM Periodic Task Thread" prio=10 tid=0x09e6d400 nid=0x37d6 waiting on
> condition
>
> JNI global references: 728
>
>
> Thanks,
> -Alex
>
> ________________________________
> From: Jonathan Ellis <jbellis@gmail.com>
> To: cassandra-user@incubator.apache.org
> Sent: Tuesday, May 26, 2009 10:21:16 AM
> Subject: Re: Ingesting from Hadoop to Cassandra
>
> It looks like your hadoop client is mid-write, blocking while waiting
> for Cassandra to empty the buffer some.  So waiting for a response yes
> but at the TCP layer not the app layer.
>
> How many nodes are in your cassandra cluster?  If you can reproduce on
> just one node, can you attach a thread dump from that?
>
> Jun's suggestion to move to do your inserts in blocking mode is also a good
> one.
>
> -Jonathan
>
> On Mon, May 25, 2009 at 2:55 PM, Alexandre Linares <linares@ymail.com>
> wrote:
>> Following up on this discussion.
>>
>> Before going the BMT way, I decided to go ahead with the current APIs as a
>> first full pass.
>>
>> Here's my setup:
>>  - Cassandra cluster (cassandra-0.3-rc) : 3 nodes , vanilla setup except
>> for
>> <MemtableSizeInMB>512</MemtableSizeInMB>
>> (b/c I was planning on only having one CF and I wanted as much of it in
>> memory as possible)
>> - Hadoop jobs use cassandra-0.3.0-dev.jar
>>
>> My table setup:
>>  <Table Name="ClusterF">
>>             <ColumnFamily ColumnType="Super" ColumnSort="Time"
>> Name="Composite"/>
>> </Table>
>>
>> I'm pushing data from a small Hadoop cluster for experimentation purposes
>> (for debugging purposes, one reduce task is doing the batch inserts)
>>
>> I'm having a reproducible issue -- some kind of contention/race condition
>> (I
>> think) when calling send_batch_insert_superColumn.  I'm attempting to push
>> ~90k rows into Cassandra, but after ingesting ~3000 rows with
>> Cassandra.Client, the client stops ingesting and the Cassandra logs stop
>> showing activity (no exceptions/errors).
>>
>> Due to this, the hadoop task times out.
>>
>> Here's what I get from a jstack on the Hadoop task making use of the
>> Cassandra.Client (after 5 mins of no activity):
>>
>> [
>> Full thread dump Java HotSpot(TM) Client VM (1.6.0-b105 mixed mode):
>>
>> "Attach Listener" daemon prio=10 tid=0x08078400 nid=0x4e2b waiting on
>> condition [0x00000000..0x00000000]
>>    java.lang.Thread.State: RUNNABLE
>>
>> "Thread-33" daemon prio=10 tid=0x0807e000 nid=0x4d1e in Object.wait()
>> [0x8f2c7000..0x8f2c7eb0]
>>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>>     at java.lang.Object.wait(Native Method)
>>     - waiting on <0x92ac0d08> (a java.util.LinkedList)
>>     at
>>
>> org.apache.hadoop.dfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:1905)
>>     - locked <0x92ac0d08> (a java.util.LinkedList)
>>
>> "Comm thread for attempt_200905242143_0004_r_000000_1" daemon prio=10
>> tid=0x081c2c00 nid=0x4c52 waiting on condition [0x8f4fe000..0x8f4fee30]
>>    java.lang.Thread.State: TIMED_WAITING (sleeping)
>>     at java.lang.Thread.sleep(Native Method)
>>     at org.apache.hadoop.mapred.Task$1.run(Task.java:301)
>>     at java.lang.Thread.run(Thread.java:619)
>>
>> "org.apache.hadoop.dfs.DFSClient$LeaseChecker@13f7281" daemon prio=10
>> tid=0x081c0800 nid=0x4c51 waiting on condition [0x8f65c000..0x8f65cfb0]
>>    java.lang.Thread.State: TIMED_WAITING (sleeping)
>>     at java.lang.Thread.sleep(Native Method)
>>     at
>> org.apache.hadoop.dfs.DFSClient$LeaseChecker.run(DFSClient.java:791)
>>     at java.lang.Thread.run(Thread.java:619)
>>
>> "IPC Client (47) connection to /127.0.0.1:47906 from an unknown user"
>> daemon
>> prio=10 tid=0x0819c800 nid=0x4c4f in Object.wait()
>> [0x8f6fe000..0x8f6ff0b0]
>>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>>     at java.lang.Object.wait(Native Method)
>>     - waiting on <0x92a615d8> (a org.apache.hadoop.ipc.Client$Connection)
>>     at
>> org.apache.hadoop.ipc.Client$Connection.waitForWork(Client.java:398)
>>     - locked <0x92a615d8> (a org.apache.hadoop.ipc.Client$Connection)
>>     at org.apache.hadoop.ipc.Client$Connection.run(Client.java:441)
>>
>> "Low Memory Detector" daemon prio=10 tid=0x8fc13400 nid=0x4c4d runnable
>> [0x00000000..0x00000000]
>>    java.lang.Thread.State: RUNNABLE
>>
>> "CompilerThread0" daemon prio=10 tid=0x8fc11c00 nid=0x4c4c waiting on
>> condition [0x00000000..0x8f9febc8]
>>    java.lang.Thread.State: RUNNABLE
>>
>> "Signal Dispatcher" daemon prio=10 tid=0x8fc10800 nid=0x4c4b runnable
>> [0x00000000..0x8fda8b90]
>>    java.lang.Thread.State: RUNNABLE
>>
>> "Finalizer" daemon prio=10 tid=0x8fc00800 nid=0x4c4a in Object.wait()
>> [0x8fdf9000..0x8fdf9e30]
>>    java.lang.Thread.State: WAITING (on object monitor)
>>     at java.lang.Object.wait(Native Method)
>>     - waiting on <0x92a26da0> (a java.lang.ref.ReferenceQueue$Lock)
>>     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:116)
>>     - locked <0x92a26da0> (a java.lang.ref.ReferenceQueue$Lock)
>>     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:132)
>>     at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)
>>
>> "Reference Handler" daemon prio=10 tid=0x080a9800 nid=0x4c49 in
>> Object.wait() [0x8fe4a000..0x8fe4afb0]
>>    java.lang.Thread.State: WAITING (on object monitor)
>>     at java.lang.Object.wait(Native Method)
>>     - waiting on <0x92a26e30> (a java.lang.ref.Reference$Lock)
>>     at java.lang.Object.wait(Object.java:485)
>>     at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116)
>>     - locked <0x92a26e30> (a java.lang.ref.Reference$Lock)
>>
>> "main" prio=10 tid=0x0805a800 nid=0x4c47 runnable [0xb7fea000..0xb7feb288]
>>    java.lang.Thread.State: RUNNABLE
>>     at java.net.SocketOutputStream.socketWrite0(Native Method)
>>     at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
>>     at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
>>     at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
>>     - locked <0x92ac9578> (a java.io.BufferedOutputStream)
>>     at
>>
>> org.apache.thrift.transport.TIOStreamTransport.write(TIOStreamTransport.java:139)
>>     at
>>
>> org.apache.thrift.protocol.TBinaryProtocol.writeBinary(TBinaryProtocol.java:184)
>>     at org.apache.cassandra.service.column_t.write(column_t.java:321)
>>     at
>> org.apache.cassandra.service.superColumn_t.write(superColumn_t.java:291)
>>     at
>>
>> org.apache.cassandra.service.batch_mutation_super_t.write(batch_mutation_super_t.java:365)
>>     at
>>
>> org.apache.cassandra.service.Cassandra$batch_insert_superColumn_args.write(Cassandra.java:9776)
>>     at
>>
>> org.apache.cassandra.service.Cassandra$Client.send_batch_insert_superColumn(Cassandra.java:546)
>>     at
>>
>> com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.pushDocuments(CassandraImport.java:168)
>>     at
>>
>> com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.sendOut(CassandraImport.java:146)
>>     at
>>
>> com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.reduce(CassandraImport.java:127)
>>     at
>>
>> com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.reduce(CassandraImport.java:1)
>>     at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:318)
>>     at
>> org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2198)
>>
>> ]
>>
>> It looks like the client is waiting on a response from Cassandra but never
>> gets it. Any ideas?  I had seen similar behavior in the Cassandra code
>> prior
>> to the 0.3 release candidate, b/c of a race condition in SelectorManager.
>> It looks like this was taken care of in 0.3-rc, so I'm not sure what's
>> going
>> on here.
>>
>> Thanks,
>> -Alex
>>
>> ________________________________
>> From: Jonathan Ellis <jbellis@gmail.com>
>> To: cassandra-user@incubator.apache.org
>> Sent: Thursday, May 21, 2009 9:42:29 AM
>> Subject: Re: Ingesting from Hadoop to Cassandra
>>
>> No, batch APIs are per CF, not per row.
>>
>> Several people have asked Avinash for sample code using BinaryMemtable
>> but to my knowledge nothing ever came of that.
>>
>> The high level description of the BMT is that you give it serialized
>> CFs as values instead of raw columns so it can just sort on key and
>> write directly to disk.  So then you would do something like this:
>>
>> Table table = Table.open(mytablename);
>> ColumnFamilyStore store = table.getColumnFamilyStore(mycfname);
>> for cf : mydata
>>   store.applyBinary(cf.key, toByteArray(cf))
>>
>> There's no provision for doing this over the network that I know of,
>> you have to put the right keys on the right nodes manually.
>>
>> -Jonathan
>>
>> On Thu, May 21, 2009 at 11:27 AM, Alexandre Linares <linares@ymail.com>
>> wrote:
>>> Jonathan,
>>>
>>> Thanks for your thoughts.
>>>
>>> I've done some simple benchmarks with the batch insert apis and was
>>> looking
>>> for something slightly more performant.  Is there a batch row insert that
>>> I
>>> missed?
>>>
>>> Any pointers (at all) to anything related to FB's bulk loading or the
>>> binarymemtable?  I've attempted to do this by writing a custom
>>> IVerbHandler
>>> for ingestion and interfacing with the MessagingService internally but
>>> it's
>>> not that clean.
>>>
>>> Thanks again,
>>> -Alex
>>>
>>> ________________________________
>>> From: Jonathan Ellis <jbellis@gmail.com>
>>> To: cassandra-user@incubator.apache.org
>>> Sent: Thursday, May 21, 2009 7:44:59 AM
>>> Subject: Re: Ingesting from Hadoop to Cassandra
>>>
>>> Have you benchmarked the batch insert apis?  If that is "fast enough"
>>> then it's by far the simplest way to go.
>>>
>>> Otherwise you'll have to use the binarymemtable stuff which is
>>> undocumented and not exposed as a client api (you basically write a
>>> custom "loader" version of cassandra to use it, I think).  FB used
>>> this for their own bulk loading so it works at some level, but clearly
>>> there is some assembly required.
>>>
>>> -Jonathan
>>>
>>> On Thu, May 21, 2009 at 2:28 AM, Alexandre Linares <linares@ymail.com>
>>> wrote:
>>>> Hi all,
>>>>
>>>> I'm trying to find the most optimal way to ingest my content from Hadoop
>>>> to
>>>> Cassandra.  Assuming I have figured out the table representation for
>>>> this
>>>> content, what is the best way to do go about pushing from my cluster?
>>>> What
>>>> Cassandra client batch APIs do you suggest I use to push to Cassandra?
>>>> I'm
>>>> sure this is a common pattern, I'm curious to see how it has been
>>>> implemented.  Assume millions of of rows and 1000s of columns.
>>>>
>>>> Thanks in advance,
>>>> -Alex
>>>>
>>>>
>>>
>>>
>>
>>
>
>

Mime
View raw message