incubator-cassandra-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jonathan Ellis <jbel...@gmail.com>
Subject Re: Ingesting from Hadoop to Cassandra
Date Tue, 26 May 2009 17:21:16 GMT
It looks like your hadoop client is mid-write, blocking while waiting
for Cassandra to empty the buffer some.  So waiting for a response yes
but at the TCP layer not the app layer.

How many nodes are in your cassandra cluster?  If you can reproduce on
just one node, can you attach a thread dump from that?

Jun's suggestion to move to do your inserts in blocking mode is also a good one.

-Jonathan

On Mon, May 25, 2009 at 2:55 PM, Alexandre Linares <linares@ymail.com> wrote:
> Following up on this discussion.
>
> Before going the BMT way, I decided to go ahead with the current APIs as a
> first full pass.
>
> Here's my setup:
>  - Cassandra cluster (cassandra-0.3-rc) : 3 nodes , vanilla setup except for
> <MemtableSizeInMB>512</MemtableSizeInMB>
> (b/c I was planning on only having one CF and I wanted as much of it in
> memory as possible)
> - Hadoop jobs use cassandra-0.3.0-dev.jar
>
> My table setup:
>  <Table Name="ClusterF">
>             <ColumnFamily ColumnType="Super" ColumnSort="Time"
> Name="Composite"/>
> </Table>
>
> I'm pushing data from a small Hadoop cluster for experimentation purposes
> (for debugging purposes, one reduce task is doing the batch inserts)
>
> I'm having a reproducible issue -- some kind of contention/race condition (I
> think) when calling send_batch_insert_superColumn.  I'm attempting to push
> ~90k rows into Cassandra, but after ingesting ~3000 rows with
> Cassandra.Client, the client stops ingesting and the Cassandra logs stop
> showing activity (no exceptions/errors).
>
> Due to this, the hadoop task times out.
>
> Here's what I get from a jstack on the Hadoop task making use of the
> Cassandra.Client (after 5 mins of no activity):
>
> [
> Full thread dump Java HotSpot(TM) Client VM (1.6.0-b105 mixed mode):
>
> "Attach Listener" daemon prio=10 tid=0x08078400 nid=0x4e2b waiting on
> condition [0x00000000..0x00000000]
>    java.lang.Thread.State: RUNNABLE
>
> "Thread-33" daemon prio=10 tid=0x0807e000 nid=0x4d1e in Object.wait()
> [0x8f2c7000..0x8f2c7eb0]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x92ac0d08> (a java.util.LinkedList)
>     at
> org.apache.hadoop.dfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:1905)
>     - locked <0x92ac0d08> (a java.util.LinkedList)
>
> "Comm thread for attempt_200905242143_0004_r_000000_1" daemon prio=10
> tid=0x081c2c00 nid=0x4c52 waiting on condition [0x8f4fe000..0x8f4fee30]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
>     at java.lang.Thread.sleep(Native Method)
>     at org.apache.hadoop.mapred.Task$1.run(Task.java:301)
>     at java.lang.Thread.run(Thread.java:619)
>
> "org.apache.hadoop.dfs.DFSClient$LeaseChecker@13f7281" daemon prio=10
> tid=0x081c0800 nid=0x4c51 waiting on condition [0x8f65c000..0x8f65cfb0]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
>     at java.lang.Thread.sleep(Native Method)
>     at org.apache.hadoop.dfs.DFSClient$LeaseChecker.run(DFSClient.java:791)
>     at java.lang.Thread.run(Thread.java:619)
>
> "IPC Client (47) connection to /127.0.0.1:47906 from an unknown user" daemon
> prio=10 tid=0x0819c800 nid=0x4c4f in Object.wait() [0x8f6fe000..0x8f6ff0b0]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x92a615d8> (a org.apache.hadoop.ipc.Client$Connection)
>     at org.apache.hadoop.ipc.Client$Connection.waitForWork(Client.java:398)
>     - locked <0x92a615d8> (a org.apache.hadoop.ipc.Client$Connection)
>     at org.apache.hadoop.ipc.Client$Connection.run(Client.java:441)
>
> "Low Memory Detector" daemon prio=10 tid=0x8fc13400 nid=0x4c4d runnable
> [0x00000000..0x00000000]
>    java.lang.Thread.State: RUNNABLE
>
> "CompilerThread0" daemon prio=10 tid=0x8fc11c00 nid=0x4c4c waiting on
> condition [0x00000000..0x8f9febc8]
>    java.lang.Thread.State: RUNNABLE
>
> "Signal Dispatcher" daemon prio=10 tid=0x8fc10800 nid=0x4c4b runnable
> [0x00000000..0x8fda8b90]
>    java.lang.Thread.State: RUNNABLE
>
> "Finalizer" daemon prio=10 tid=0x8fc00800 nid=0x4c4a in Object.wait()
> [0x8fdf9000..0x8fdf9e30]
>    java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x92a26da0> (a java.lang.ref.ReferenceQueue$Lock)
>     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:116)
>     - locked <0x92a26da0> (a java.lang.ref.ReferenceQueue$Lock)
>     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:132)
>     at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)
>
> "Reference Handler" daemon prio=10 tid=0x080a9800 nid=0x4c49 in
> Object.wait() [0x8fe4a000..0x8fe4afb0]
>    java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x92a26e30> (a java.lang.ref.Reference$Lock)
>     at java.lang.Object.wait(Object.java:485)
>     at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116)
>     - locked <0x92a26e30> (a java.lang.ref.Reference$Lock)
>
> "main" prio=10 tid=0x0805a800 nid=0x4c47 runnable [0xb7fea000..0xb7feb288]
>    java.lang.Thread.State: RUNNABLE
>     at java.net.SocketOutputStream.socketWrite0(Native Method)
>     at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
>     at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
>     at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
>     - locked <0x92ac9578> (a java.io.BufferedOutputStream)
>     at
> org.apache.thrift.transport.TIOStreamTransport.write(TIOStreamTransport.java:139)
>     at
> org.apache.thrift.protocol.TBinaryProtocol.writeBinary(TBinaryProtocol.java:184)
>     at org.apache.cassandra.service.column_t.write(column_t.java:321)
>     at
> org.apache.cassandra.service.superColumn_t.write(superColumn_t.java:291)
>     at
> org.apache.cassandra.service.batch_mutation_super_t.write(batch_mutation_super_t.java:365)
>     at
> org.apache.cassandra.service.Cassandra$batch_insert_superColumn_args.write(Cassandra.java:9776)
>     at
> org.apache.cassandra.service.Cassandra$Client.send_batch_insert_superColumn(Cassandra.java:546)
>     at
> com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.pushDocuments(CassandraImport.java:168)
>     at
> com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.sendOut(CassandraImport.java:146)
>     at
> com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.reduce(CassandraImport.java:127)
>     at
> com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.reduce(CassandraImport.java:1)
>     at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:318)
>     at
> org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2198)
>
> ]
>
> It looks like the client is waiting on a response from Cassandra but never
> gets it. Any ideas?  I had seen similar behavior in the Cassandra code prior
> to the 0.3 release candidate, b/c of a race condition in SelectorManager.
> It looks like this was taken care of in 0.3-rc, so I'm not sure what's going
> on here.
>
> Thanks,
> -Alex
>
> ________________________________
> From: Jonathan Ellis <jbellis@gmail.com>
> To: cassandra-user@incubator.apache.org
> Sent: Thursday, May 21, 2009 9:42:29 AM
> Subject: Re: Ingesting from Hadoop to Cassandra
>
> No, batch APIs are per CF, not per row.
>
> Several people have asked Avinash for sample code using BinaryMemtable
> but to my knowledge nothing ever came of that.
>
> The high level description of the BMT is that you give it serialized
> CFs as values instead of raw columns so it can just sort on key and
> write directly to disk.  So then you would do something like this:
>
> Table table = Table.open(mytablename);
> ColumnFamilyStore store = table.getColumnFamilyStore(mycfname);
> for cf : mydata
>   store.applyBinary(cf.key, toByteArray(cf))
>
> There's no provision for doing this over the network that I know of,
> you have to put the right keys on the right nodes manually.
>
> -Jonathan
>
> On Thu, May 21, 2009 at 11:27 AM, Alexandre Linares <linares@ymail.com>
> wrote:
>> Jonathan,
>>
>> Thanks for your thoughts.
>>
>> I've done some simple benchmarks with the batch insert apis and was
>> looking
>> for something slightly more performant.  Is there a batch row insert that
>> I
>> missed?
>>
>> Any pointers (at all) to anything related to FB's bulk loading or the
>> binarymemtable?  I've attempted to do this by writing a custom
>> IVerbHandler
>> for ingestion and interfacing with the MessagingService internally but
>> it's
>> not that clean.
>>
>> Thanks again,
>> -Alex
>>
>> ________________________________
>> From: Jonathan Ellis <jbellis@gmail.com>
>> To: cassandra-user@incubator.apache.org
>> Sent: Thursday, May 21, 2009 7:44:59 AM
>> Subject: Re: Ingesting from Hadoop to Cassandra
>>
>> Have you benchmarked the batch insert apis?  If that is "fast enough"
>> then it's by far the simplest way to go.
>>
>> Otherwise you'll have to use the binarymemtable stuff which is
>> undocumented and not exposed as a client api (you basically write a
>> custom "loader" version of cassandra to use it, I think).  FB used
>> this for their own bulk loading so it works at some level, but clearly
>> there is some assembly required.
>>
>> -Jonathan
>>
>> On Thu, May 21, 2009 at 2:28 AM, Alexandre Linares <linares@ymail.com>
>> wrote:
>>> Hi all,
>>>
>>> I'm trying to find the most optimal way to ingest my content from Hadoop
>>> to
>>> Cassandra.  Assuming I have figured out the table representation for this
>>> content, what is the best way to do go about pushing from my cluster?
>>> What
>>> Cassandra client batch APIs do you suggest I use to push to Cassandra?
>>> I'm
>>> sure this is a common pattern, I'm curious to see how it has been
>>> implemented.  Assume millions of of rows and 1000s of columns.
>>>
>>> Thanks in advance,
>>> -Alex
>>>
>>>
>>
>>
>
>

Mime
View raw message