cassandra-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alexandre Linares <lina...@ymail.com>
Subject Re: Ingesting from Hadoop to Cassandra
Date Mon, 25 May 2009 23:48:15 GMT
I hooked up JConsole as advised and tried tuning the attributes as suggested; CPU/Mem look
fine, nothing that would signal GC thrashing during the ingestion.

I changed the <MemtableSizeInMB> back to the default of 32 and got the same exact behavior.

Let me be clearer on one part of my previous description.  When I said "the Cassandra logs
stop showing activity (no exceptions/errors)" I meant there was no more ingestion activity.
 Once the client blocks after ingesting ~3000 rows (~27MB of data), I still see cassandra
performing its major compaction tasks.  It seems to act normally.

Thanks,
-Alex



________________________________
From: Jonathan Ellis <jbellis@gmail.com>
To: cassandra-user@incubator.apache.org
Sent: Monday, May 25, 2009 2:46:20 PM
Subject: Re: Ingesting from Hadoop to Cassandra

First let's rule out the possibility that you are exhausting your
memory and having your VM basically DOSed by GC thrashing.  Eric goes
into some details here:
http://wiki.apache.org/cassandra/MemtableThresholds

On Mon, May 25, 2009 at 1:55 PM, Alexandre Linares <linares@ymail.com> wrote:
> Following up on this discussion.
>
> Before going the BMT way, I decided to go ahead with the current APIs as a
> first full pass.
>
> Here's my setup:
>  - Cassandra cluster (cassandra-0.3-rc) : 3 nodes , vanilla setup except for
> <MemtableSizeInMB>512</MemtableSizeInMB>
> (b/c I was planning on only having one CF and I wanted as much of it in
> memory as possible)
> - Hadoop jobs use cassandra-0.3.0-dev.jar
>
> My table setup:
>  <Table Name="ClusterF">
>             <ColumnFamily ColumnType="Super" ColumnSort="Time"
> Name="Composite"/>
> </Table>
>
> I'm pushing data from a small Hadoop cluster for experimentation purposes
> (for debugging purposes, one reduce task is doing the batch inserts)
>
> I'm having a reproducible issue -- some kind of contention/race condition (I
> think) when calling send_batch_insert_superColumn.  I'm attempting to push
> ~90k rows into Cassandra, but after ingesting ~3000 rows with
> Cassandra.Client, the client stops ingesting and the Cassandra logs stop
> showing activity (no exceptions/errors).
>
> Due to this, the hadoop task times out.
>
> Here's what I get from a jstack on the Hadoop task making use of the
> Cassandra.Client (after 5 mins of no activity):
>
> [
> Full thread dump Java HotSpot(TM) Client VM (1.6.0-b105 mixed mode):
>
> "Attach Listener" daemon prio=10 tid=0x08078400 nid=0x4e2b waiting on
> condition [0x00000000..0x00000000]
>    java.lang.Thread.State: RUNNABLE
>
> "Thread-33" daemon prio=10 tid=0x0807e000 nid=0x4d1e in Object.wait()
> [0x8f2c7000..0x8f2c7eb0]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x92ac0d08> (a java.util.LinkedList)
>     at
> org.apache.hadoop.dfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:1905)
>     - locked <0x92ac0d08> (a java.util.LinkedList)
>
> "Comm thread for attempt_200905242143_0004_r_000000_1" daemon prio=10
> tid=0x081c2c00 nid=0x4c52 waiting on condition [0x8f4fe000..0x8f4fee30]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
>     at java.lang.Thread.sleep(Native Method)
>     at org.apache.hadoop.mapred.Task$1.run(Task.java:301)
>     at java.lang.Thread.run(Thread.java:619)
>
> "org.apache.hadoop.dfs.DFSClient$LeaseChecker@13f7281" daemon prio=10
> tid=0x081c0800 nid=0x4c51 waiting on condition [0x8f65c000..0x8f65cfb0]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
>     at java.lang.Thread.sleep(Native Method)
>     at org.apache.hadoop.dfs.DFSClient$LeaseChecker.run(DFSClient.java:791)
>     at java.lang.Thread.run(Thread.java:619)
>
> "IPC Client (47) connection to /127.0.0.1:47906 from an unknown user" daemon
> prio=10 tid=0x0819c800 nid=0x4c4f in Object.wait() [0x8f6fe000..0x8f6ff0b0]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x92a615d8> (a org.apache.hadoop.ipc.Client$Connection)
>     at org.apache.hadoop.ipc.Client$Connection.waitForWork(Client.java:398)
>     - locked <0x92a615d8> (a org.apache.hadoop.ipc.Client$Connection)
>     at org.apache.hadoop.ipc.Client$Connection.run(Client.java:441)
>
> "Low Memory Detector" daemon prio=10 tid=0x8fc13400 nid=0x4c4d runnable
> [0x00000000..0x00000000]
>    java.lang.Thread.State: RUNNABLE
>
> "CompilerThread0" daemon prio=10 tid=0x8fc11c00 nid=0x4c4c waiting on
> condition [0x00000000..0x8f9febc8]
>    java.lang.Thread.State: RUNNABLE
>
> "Signal Dispatcher" daemon prio=10 tid=0x8fc10800 nid=0x4c4b runnable
> [0x00000000..0x8fda8b90]
>    java.lang.Thread.State: RUNNABLE
>
> "Finalizer" daemon prio=10 tid=0x8fc00800 nid=0x4c4a in Object.wait()
> [0x8fdf9000..0x8fdf9e30]
>    java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x92a26da0> (a java.lang.ref.ReferenceQueue$Lock)
>     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:116)
>     - locked <0x92a26da0> (a java.lang.ref.ReferenceQueue$Lock)
>     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:132)
>     at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)
>
> "Reference Handler" daemon prio=10 tid=0x080a9800 nid=0x4c49 in
> Object.wait() [0x8fe4a000..0x8fe4afb0]
>    java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x92a26e30> (a java.lang.ref.Reference$Lock)
>     at java.lang.Object.wait(Object.java:485)
>     at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116)
>     - locked <0x92a26e30> (a java.lang.ref.Reference$Lock)
>
> "main" prio=10 tid=0x0805a800 nid=0x4c47 runnable [0xb7fea000..0xb7feb288]
>    java.lang.Thread.State: RUNNABLE
>     at java.net.SocketOutputStream.socketWrite0(Native Method)
>     at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
>     at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
>     at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
>     - locked <0x92ac9578> (a java.io.BufferedOutputStream)
>     at
> org.apache.thrift.transport.TIOStreamTransport.write(TIOStreamTransport.java:139)
>     at
> org.apache.thrift.protocol.TBinaryProtocol.writeBinary(TBinaryProtocol.java:184)
>     at org.apache.cassandra.service.column_t.write(column_t.java:321)
>     at
> org.apache.cassandra.service.superColumn_t.write(superColumn_t.java:291)
>     at
> org.apache.cassandra.service.batch_mutation_super_t.write(batch_mutation_super_t.java:365)
>     at
> org.apache.cassandra.service.Cassandra$batch_insert_superColumn_args.write(Cassandra.java:9776)
>     at
> org.apache.cassandra.service.Cassandra$Client.send_batch_insert_superColumn(Cassandra.java:546)
>     at
> com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.pushDocuments(CassandraImport.java:168)
>     at
> com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.sendOut(CassandraImport.java:146)
>     at
> com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.reduce(CassandraImport.java:127)
>     at
> com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.reduce(CassandraImport.java:1)
>     at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:318)
>     at
> org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2198)
>
> ]
>
> It looks like the client is waiting on a response from Cassandra but never
> gets it. Any ideas?  I had seen similar behavior in the Cassandra code prior
> to the 0.3 release candidate, b/c of a race condition in SelectorManager.
> It looks like this was taken care of in 0.3-rc, so I'm not sure what's going
> on here.
>
> Thanks,
> -Alex
>
> ________________________________
> From: Jonathan Ellis <jbellis@gmail.com>
> To: cassandra-user@incubator.apache.org
> Sent: Thursday, May 21, 2009 9:42:29 AM
> Subject: Re: Ingesting from Hadoop to Cassandra
>
> No, batch APIs are per CF, not per row.
>
> Several people have asked Avinash for sample code using BinaryMemtable
> but to my knowledge nothing ever came of that.
>
> The high level description of the BMT is that you give it serialized
> CFs as values instead of raw columns so it can just sort on key and
> write directly to disk.  So then you would do something like this:
>
> Table table = Table.open(mytablename);
> ColumnFamilyStore store = table.getColumnFamilyStore(mycfname);
> for cf : mydata
>   store.applyBinary(cf.key, toByteArray(cf))
>
> There's no provision for doing this over the network that I know of,
> you have to put the right keys on the right nodes manually.
>
> -Jonathan
>
> On Thu, May 21, 2009 at 11:27 AM, Alexandre Linares <linares@ymail.com>
> wrote:
>> Jonathan,
>>
>> Thanks for your thoughts.
>>
>> I've done some simple benchmarks with the batch insert apis and was
>> looking
>> for something slightly more performant.  Is there a batch row insert that
>> I
>> missed?
>>
>> Any pointers (at all) to anything related to FB's bulk loading or the
>> binarymemtable?  I've attempted to do this by writing a custom
>> IVerbHandler
>> for ingestion and interfacing with the MessagingService internally but
>> it's
>> not that clean.
>>
>> Thanks again,
>> -Alex
>>
>> ________________________________
>> From: Jonathan Ellis <jbellis@gmail.com>
>> To: cassandra-user@incubator.apache.org
>> Sent: Thursday, May 21, 2009 7:44:59 AM
>> Subject: Re: Ingesting from Hadoop to Cassandra
>>
>> Have you benchmarked the batch insert apis?  If that is "fast enough"
>> then it's by far the simplest way to go.
>>
>> Otherwise you'll have to use the binarymemtable stuff which is
>> undocumented and not exposed as a client api (you basically write a
>> custom "loader" version of cassandra to use it, I think).  FB used
>> this for their own bulk loading so it works at some level, but clearly
>> there is some assembly required.
>>
>> -Jonathan
>>
>> On Thu, May 21, 2009 at 2:28 AM, Alexandre Linares <linares@ymail.com>
>> wrote:
>>> Hi all,
>>>
>>> I'm trying to find the most optimal way to ingest my content from Hadoop
>>> to
>>> Cassandra.  Assuming I have figured out the table representation for this
>>> content, what is the best way to do go about pushing from my cluster?
>>> What
>>> Cassandra client batch APIs do you suggest I use to push to Cassandra?
>>> I'm
>>> sure this is a common pattern, I'm curious to see how it has been
>>> implemented.  Assume millions of of rows and 1000s of columns.
>>>
>>> Thanks in advance,
>>> -Alex
>>>
>>>
>>
>>
>
>



      
Mime
View raw message