incubator-cassandra-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alexandre Linares <lina...@ymail.com>
Subject Re: Ingesting from Hadoop to Cassandra
Date Mon, 25 May 2009 19:55:17 GMT
Following up on this discussion.

Before going the BMT way, I decided to go ahead with the current APIs as a first full pass.

Here's my setup:
 - Cassandra cluster (cassandra-0.3-rc) : 3 nodes , vanilla setup except for <MemtableSizeInMB>512</MemtableSizeInMB>
(b/c I was planning on only having one CF and I wanted as much of it in memory as possible)
- Hadoop jobs use cassandra-0.3.0-dev.jar

My table setup:
 <Table Name="ClusterF">
            <ColumnFamily ColumnType="Super" ColumnSort="Time" Name="Composite"/>
</Table>

I'm pushing data from a small Hadoop cluster for experimentation purposes (for debugging purposes,
one reduce task is doing the batch inserts)

I'm having a reproducible issue -- some kind of contention/race condition (I think) when calling
send_batch_insert_superColumn.  I'm attempting to push ~90k rows into Cassandra, but after
ingesting ~3000 rows with Cassandra.Client, the client stops ingesting and the Cassandra logs
stop showing activity (no exceptions/errors).  

Due to this, the hadoop task times out.

Here's what I get from a jstack on the Hadoop task making use of the Cassandra.Client (after
5 mins of no activity):

[
Full thread dump Java HotSpot(TM) Client VM (1.6.0-b105 mixed mode):

"Attach Listener" daemon prio=10 tid=0x08078400 nid=0x4e2b waiting on condition [0x00000000..0x00000000]
   java.lang.Thread.State: RUNNABLE

"Thread-33" daemon prio=10 tid=0x0807e000 nid=0x4d1e in Object.wait() [0x8f2c7000..0x8f2c7eb0]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x92ac0d08> (a java.util.LinkedList)
    at org.apache.hadoop.dfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:1905)
    - locked <0x92ac0d08> (a java.util.LinkedList)

"Comm thread for attempt_200905242143_0004_r_000000_1" daemon prio=10 tid=0x081c2c00 nid=0x4c52
waiting on condition [0x8f4fe000..0x8f4fee30]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
    at java.lang.Thread.sleep(Native Method)
    at org.apache.hadoop.mapred.Task$1.run(Task.java:301)
    at java.lang.Thread.run(Thread.java:619)

"org.apache.hadoop.dfs.DFSClient$LeaseChecker@13f7281" daemon prio=10 tid=0x081c0800 nid=0x4c51
waiting on condition [0x8f65c000..0x8f65cfb0]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
    at java.lang.Thread.sleep(Native Method)
    at org.apache.hadoop.dfs.DFSClient$LeaseChecker.run(DFSClient.java:791)
    at java.lang.Thread.run(Thread.java:619)

"IPC Client (47) connection to /127.0.0.1:47906 from an unknown user" daemon prio=10 tid=0x0819c800
nid=0x4c4f in Object.wait() [0x8f6fe000..0x8f6ff0b0]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x92a615d8> (a org.apache.hadoop.ipc.Client$Connection)
    at org.apache.hadoop.ipc.Client$Connection.waitForWork(Client.java:398)
    - locked <0x92a615d8> (a org.apache.hadoop.ipc.Client$Connection)
    at org.apache.hadoop.ipc.Client$Connection.run(Client.java:441)

"Low Memory Detector" daemon prio=10 tid=0x8fc13400 nid=0x4c4d runnable [0x00000000..0x00000000]
   java.lang.Thread.State: RUNNABLE

"CompilerThread0" daemon prio=10 tid=0x8fc11c00 nid=0x4c4c waiting on condition [0x00000000..0x8f9febc8]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" daemon prio=10 tid=0x8fc10800 nid=0x4c4b runnable [0x00000000..0x8fda8b90]
   java.lang.Thread.State: RUNNABLE

"Finalizer" daemon prio=10 tid=0x8fc00800 nid=0x4c4a in Object.wait() [0x8fdf9000..0x8fdf9e30]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x92a26da0> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:116)
    - locked <0x92a26da0> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:132)
    at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)

"Reference Handler" daemon prio=10 tid=0x080a9800 nid=0x4c49 in Object.wait() [0x8fe4a000..0x8fe4afb0]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x92a26e30> (a java.lang.ref.Reference$Lock)
    at java.lang.Object.wait(Object.java:485)
    at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116)
    - locked <0x92a26e30> (a java.lang.ref.Reference$Lock)

"main" prio=10 tid=0x0805a800 nid=0x4c47 runnable [0xb7fea000..0xb7feb288]
   java.lang.Thread.State: RUNNABLE
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
    - locked <0x92ac9578> (a java.io.BufferedOutputStream)
    at org.apache.thrift.transport.TIOStreamTransport.write(TIOStreamTransport.java:139)
    at org.apache.thrift.protocol.TBinaryProtocol.writeBinary(TBinaryProtocol.java:184)
    at org.apache.cassandra.service.column_t.write(column_t.java:321)
    at org.apache.cassandra.service.superColumn_t.write(superColumn_t.java:291)
    at org.apache.cassandra.service.batch_mutation_super_t.write(batch_mutation_super_t.java:365)
    at org.apache.cassandra.service.Cassandra$batch_insert_superColumn_args.write(Cassandra.java:9776)
    at org.apache.cassandra.service.Cassandra$Client.send_batch_insert_superColumn(Cassandra.java:546)
    at com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.pushDocuments(CassandraImport.java:168)
    at com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.sendOut(CassandraImport.java:146)
    at com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.reduce(CassandraImport.java:127)
    at com.yahoo.carmot.client.mapred.CassandraImport$PushReduce.reduce(CassandraImport.java:1)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:318)
    at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2198)
 
]

It looks like the client is waiting on a response from Cassandra but never gets it. Any ideas?
 I had seen similar behavior in the Cassandra code prior to the 0.3 release candidate, b/c
of a race condition in SelectorManager.  It looks like this was taken care of in 0.3-rc, so
I'm not sure what's going on here.

Thanks,
-Alex



________________________________
From: Jonathan Ellis <jbellis@gmail.com>
To: cassandra-user@incubator.apache.org
Sent: Thursday, May 21, 2009 9:42:29 AM
Subject: Re: Ingesting from Hadoop to Cassandra

No, batch APIs are per CF, not per row.

Several people have asked Avinash for sample code using BinaryMemtable
but to my knowledge nothing ever came of that.

The high level description of the BMT is that you give it serialized
CFs as values instead of raw columns so it can just sort on key and
write directly to disk.  So then you would do something like this:

Table table = Table.open(mytablename);
ColumnFamilyStore store = table.getColumnFamilyStore(mycfname);
for cf : mydata
  store.applyBinary(cf.key, toByteArray(cf))

There's no provision for doing this over the network that I know of,
you have to put the right keys on the right nodes manually.

-Jonathan

On Thu, May 21, 2009 at 11:27 AM, Alexandre Linares <linares@ymail.com> wrote:
> Jonathan,
>
> Thanks for your thoughts.
>
> I've done some simple benchmarks with the batch insert apis and was looking
> for something slightly more performant.  Is there a batch row insert that I
> missed?
>
> Any pointers (at all) to anything related to FB's bulk loading or the
> binarymemtable?  I've attempted to do this by writing a custom IVerbHandler
> for ingestion and interfacing with the MessagingService internally but it's
> not that clean.
>
> Thanks again,
> -Alex
>
> ________________________________
> From: Jonathan Ellis <jbellis@gmail.com>
> To: cassandra-user@incubator.apache.org
> Sent: Thursday, May 21, 2009 7:44:59 AM
> Subject: Re: Ingesting from Hadoop to Cassandra
>
> Have you benchmarked the batch insert apis?  If that is "fast enough"
> then it's by far the simplest way to go.
>
> Otherwise you'll have to use the binarymemtable stuff which is
> undocumented and not exposed as a client api (you basically write a
> custom "loader" version of cassandra to use it, I think).  FB used
> this for their own bulk loading so it works at some level, but clearly
> there is some assembly required.
>
> -Jonathan
>
> On Thu, May 21, 2009 at 2:28 AM, Alexandre Linares <linares@ymail.com>
> wrote:
>> Hi all,
>>
>> I'm trying to find the most optimal way to ingest my content from Hadoop
>> to
>> Cassandra.  Assuming I have figured out the table representation for this
>> content, what is the best way to do go about pushing from my cluster?
>> What
>> Cassandra client batch APIs do you suggest I use to push to Cassandra? I'm
>> sure this is a common pattern, I'm curious to see how it has been
>> implemented.  Assume millions of of rows and 1000s of columns.
>>
>> Thanks in advance,
>> -Alex
>>
>>
>
>



      
Mime
View raw message