Return-Path: X-Original-To: apmail-cassandra-user-archive@www.apache.org Delivered-To: apmail-cassandra-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7978917932 for ; Thu, 5 Mar 2015 15:36:09 +0000 (UTC) Received: (qmail 23575 invoked by uid 500); 5 Mar 2015 15:35:41 -0000 Delivered-To: apmail-cassandra-user-archive@cassandra.apache.org Received: (qmail 23481 invoked by uid 500); 5 Mar 2015 15:35:41 -0000 Mailing-List: contact user-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@cassandra.apache.org Delivered-To: mailing list user@cassandra.apache.org Received: (qmail 23445 invoked by uid 99); 5 Mar 2015 15:35:41 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Mar 2015 15:35:41 +0000 X-ASF-Spam-Status: No, hits=-0.7 required=5.0 tests=RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of mor.yuki@gmail.com designates 209.85.216.48 as permitted sender) Received: from [209.85.216.48] (HELO mail-qa0-f48.google.com) (209.85.216.48) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Mar 2015 15:35:16 +0000 Received: by mail-qa0-f48.google.com with SMTP id dc16so39422650qab.7 for ; Thu, 05 Mar 2015 07:34:29 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=yWe7nR2pN3q1mcE/3fihdV4nO/DHlxxV6NpnxrStDFU=; b=Fr9xMNWLFNbUYB8yXxOrjaSYzcpNo9HMQLHbrRgOcYUTDTjdHpTYoskXAiXQnwJNUZ PGyizdjAE1LbESyb6G0NzuIJlQhHjdjTHotSRN9buz4E0g/tl3/NIADNCtGJQC1lIov8 lkz9E2288cvDqrdgBxUIHuWqhn9VAbhzYD2tvUCVbjfbNH2kGoMHq2fBSSP1Yu1HkUXH YlG/D7ZkBQfTErbUz8waSoIFCUQMalmcplJnPTq8hDKut/mxrFDNv1xGxdLBlznNx6k4 amREPL2pI09TnKD2/t+IS82RCB9eLLLZgpNOrEMDol6mqN/1HEVQThDqRsgB9WjK8Wpf V8WQ== MIME-Version: 1.0 X-Received: by 10.140.238.73 with SMTP id j70mr13529906qhc.37.1425569669329; Thu, 05 Mar 2015 07:34:29 -0800 (PST) Received: by 10.140.101.1 with HTTP; Thu, 5 Mar 2015 07:34:29 -0800 (PST) In-Reply-To: References: Date: Thu, 5 Mar 2015 09:34:29 -0600 Message-ID: Subject: Re: Streaming failures during bulkloading data using CqlBulkOutputFormat From: Yuki Morishita To: "user@cassandra.apache.org" Content-Type: text/plain; charset=UTF-8 X-Virus-Checked: Checked by ClamAV on apache.org Thanks. It looks like a bug. Can you create a ticket on JIRA? https://issues.apache.org/jira/browse/CASSANDRA On Thu, Mar 5, 2015 at 7:56 AM, Aby Kuruvilla wrote: > Hi Yuki > > Thanks for the reply! > > Here is the log from Cassandra server for the stream failure > > INFO [STREAM-INIT-/192.168.56.1:58578] 2015-03-04 09:20:23,816 > StreamResultFuture.java:109 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508 > ID#0] Creating new streaming plan for Bulk Load > INFO [STREAM-INIT-/192.168.56.1:58578] 2015-03-04 09:20:23,816 > StreamResultFuture.java:116 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508, > ID#0] Received streaming plan for Bulk Load > INFO [STREAM-INIT-/192.168.56.1:58579] 2015-03-04 09:20:23,819 > StreamResultFuture.java:116 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508, > ID#0] Received streaming plan for Bulk Load > INFO [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,822 > StreamResultFuture.java:166 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508 > ID#0] Prepare completed. Receiving 1 files(617874 bytes), sending 0 files(0 > bytes) > WARN [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,823 StreamSession.java:597 > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Retrying for following > error > java.io.IOException: CF d6d35793-729c-3cab-bee0-84e971e48675 was dropped > during streaming > at > org.apache.cassandra.streaming.compress.CompressedStreamReader.read(CompressedStreamReader.java:71) > ~[main/:na] > at > org.apache.cassandra.streaming.messages.IncomingFileMessage$1.deserialize(IncomingFileMessage.java:48) > [main/:na] > at > org.apache.cassandra.streaming.messages.IncomingFileMessage$1.deserialize(IncomingFileMessage.java:38) > [main/:na] > at > org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:55) > [main/:na] > at > org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245) > [main/:na] > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] > ERROR [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,828 StreamSession.java:477 > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error occurred > java.lang.IllegalArgumentException: Unknown type 0 > at > org.apache.cassandra.streaming.messages.StreamMessage$Type.get(StreamMessage.java:89) > ~[main/:na] > at > org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:54) > ~[main/:na] > at > org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245) > ~[main/:na] > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] > INFO [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829 > StreamResultFuture.java:180 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] > Session with /127.0.0.1 is complete > WARN [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829 > StreamResultFuture.java:207 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] > Stream failed > > > > On Wed, Mar 4, 2015 at 1:18 PM, Yuki Morishita wrote: >> >> Do you have corresponding error in the other side of the stream >> (/192.168.56.11)? >> >> >> On Wed, Mar 4, 2015 at 9:11 AM, Aby Kuruvilla >> wrote: >> > I am trying to use the CqlBulkOutputFormat in a Hadoop job to bulk load >> > data >> > into Cassandra. Was not able to find any documentation of this new >> > output >> > format , but from looking through the code this uses CQLSSTableWriter to >> > write SSTable files to disk , which are then streamed to Cassandra using >> > SSTableLoader. On running the Hadoop job, I can see that the SSTable >> > files >> > do get generated but fails to stream the data out. I get the same >> > exception >> > when I try with Cassndra node on "localhost" as well as a remote >> > Cassandra >> > cluster. Also I get this exception on C* versions 2.1.1, 2.1.2 and >> > 2.1.3. >> > >> > Relevant portion of logs and stack trace >> > >> > 09:20:23.207 [Thread-6] WARN org.apache.cassandra.utils.CLibrary - JNA >> > link >> > failure, one or more native method will be unavailable. >> > 09:20:23.208 [Thread-6] DEBUG org.apache.cassandra.utils.CLibrary - JNA >> > link >> > failure details: Error looking up function 'posix_fadvise': >> > dlsym(0x7fff6ab8a5e0, posix_fadvise): symbol not found >> > 09:20:23.504 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - >> > Renaming >> > >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Filter.db >> > to >> > >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Filter.db >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - >> > Renaming >> > >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Digest.sha1 >> > to >> > >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Digest.sha1 >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - >> > Renaming >> > >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Statistics.db >> > to >> > >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Statistics.db >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - >> > Renaming >> > >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Index.db >> > to >> > >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Index.db >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - >> > Renaming >> > >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-CompressionInfo.db >> > to >> > >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-CompressionInfo.db >> > 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - >> > Renaming >> > >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-TOC.txt >> > to >> > >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-TOC.txt >> > 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - >> > Renaming >> > >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Data.db >> > to >> > >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db >> > 09:20:23.727 [Thread-2] DEBUG o.a.c.i.s.m.MetadataSerializer - Load >> > metadata >> > for >> > >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1 >> > 09:20:23.729 [Thread-2] INFO o.a.c.io.sstable.SSTableReader - Opening >> > >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1 >> > (617874 bytes) >> > 09:20:23.780 [Thread-2] INFO o.a.c.streaming.StreamResultFuture - >> > [Stream >> > #98ba8730-c279-11e4-b8e9-55374d280508] Executing streaming plan for Bulk >> > Load >> > 09:20:23.781 [StreamConnectionEstablisher:1] INFO >> > o.a.c.streaming.StreamSession - [Stream >> > #98ba8730-c279-11e4-b8e9-55374d280508] Starting streaming to >> > /192.168.56.11 >> > 09:20:23.781 [StreamConnectionEstablisher:1] DEBUG >> > o.a.c.streaming.ConnectionHandler - [Stream >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for incoming >> > stream >> > 09:20:23.792 [StreamConnectionEstablisher:1] DEBUG >> > o.a.c.streaming.ConnectionHandler - [Stream >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for outgoing >> > stream >> > 09:20:23.794 [STREAM-OUT-/192.168.56.11] DEBUG >> > o.a.c.streaming.ConnectionHandler - [Stream >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending Prepare (0 requests, 1 >> > files} >> > 09:20:23.795 [StreamConnectionEstablisher:1] INFO >> > o.a.c.streaming.StreamResultFuture - [Stream >> > #98ba8730-c279-11e4-b8e9-55374d280508 ID#0] Prepare completed. Receiving >> > 0 >> > files(0 bytes), sending 1 files(617874 bytes) >> > 09:20:23.799 [StreamConnectionEstablisher:1] INFO >> > o.a.c.streaming.StreamCoordinator - [Stream >> > #98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Beginning stream session >> > with >> > /192.168.56.11 >> > 09:20:23.799 [STREAM-OUT-/192.168.56.11] DEBUG >> > o.a.c.streaming.ConnectionHandler - [Stream >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending File (Header (cfId: >> > d6d35793-729c-3cab-bee0-84e971e48675, #0, version: ka, estimated keys: >> > 3072, >> > transfer size: 617874, compressed?: true, repairedAt: 0), file: >> > >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db) >> > 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG >> > o.a.c.streaming.ConnectionHandler - [Stream >> > #98ba8730-c279-11e4-b8e9-55374d280508] Received Retry >> > (d6d35793-729c-3cab-bee0-84e971e48675, #0) >> > 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG >> > o.a.c.streaming.ConnectionHandler - [Stream >> > #98ba8730-c279-11e4-b8e9-55374d280508] Received Session Failed >> > 09:20:23.809 [STREAM-IN-/192.168.56.11] DEBUG >> > o.a.c.streaming.ConnectionHandler - [Stream >> > #98ba8730-c279-11e4-b8e9-55374d280508] Closing stream connection handler >> > on >> > /192.168.56.11 >> > 09:20:23.811 [STREAM-IN-/192.168.56.11] INFO >> > o.a.c.streaming.StreamResultFuture - [Stream >> > #98ba8730-c279-11e4-b8e9-55374d280508] Session with /192.168.56.11 is >> > complete >> > 09:20:23.812 [STREAM-IN-/192.168.56.11] WARN >> > o.a.c.streaming.StreamResultFuture - [Stream >> > #98ba8730-c279-11e4-b8e9-55374d280508] Stream failed >> > 09:20:23.815 [STREAM-OUT-/192.168.56.11] ERROR >> > o.a.c.streaming.StreamSession >> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error >> > occurred >> > java.io.IOException: Broken pipe >> > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) >> > ~[na:1.7.0_51] >> > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) >> > ~[na:1.7.0_51] >> > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) >> > ~[na:1.7.0_51] >> > at sun.nio.ch.IOUtil.write(IOUtil.java:51) ~[na:1.7.0_51] >> > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487) >> > ~[na:1.7.0_51] >> > at >> > >> > sun.nio.ch.FileChannelImpl.transferToTrustedChannel(FileChannelImpl.java:473) >> > ~[na:1.7.0_51] >> > at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:569) >> > ~[na:1.7.0_51] >> > at >> > >> > org.apache.cassandra.streaming.compress.CompressedStreamWriter.write(CompressedStreamWriter.java:74) >> > ~[cassandra-all-2.1.2.jar:2.1.2] >> > at >> > >> > org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:56) >> > ~[cassandra-all-2.1.2.jar:2.1.2] >> > at >> > >> > org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:40) >> > ~[cassandra-all-2.1.2.jar:2.1.2] >> > at >> > >> > org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:45) >> > ~[cassandra-all-2.1.2.jar:2.1.2] >> > at >> > >> > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346) >> > [cassandra-all-2.1.2.jar:2.1.2] >> > at >> > >> > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:318) >> > [cassandra-all-2.1.2.jar:2.1.2] >> > at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51] >> > 09:20:23.816 [STREAM-OUT-/192.168.56.11] ERROR >> > o.a.c.streaming.StreamSession >> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error >> > occurred >> > java.io.IOException: Broken pipe >> > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) >> > ~[na:1.7.0_51] >> > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) >> > ~[na:1.7.0_51] >> > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) >> > ~[na:1.7.0_51] >> > at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.7.0_51] >> > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487) >> > ~[na:1.7.0_51] >> > at >> > >> > org.apache.cassandra.io.util.DataOutputStreamAndChannel.write(DataOutputStreamAndChannel.java:48) >> > ~[cassandra-all-2.1.2.jar:2.1.2] >> > at >> > >> > org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:44) >> > ~[cassandra-all-2.1.2.jar:2.1.2] >> > at >> > >> > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346) >> > [cassandra-all-2.1.2.jar:2.1.2] >> > at >> > >> > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:326) >> > [cassandra-all-2.1.2.jar:2.1.2] >> > at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51] >> > >> > ---------------------------------------------------- >> > >> > Here is what I have tried >> > >> > >> > Hadoop Driver >> > >> > public class CassandraBulkImporter extends Configured implements Tool{ >> > >> > ..... >> > >> > public static void main(String[] args) throws Exception { >> > int exitCode = ToolRunner.run(new >> > CassandraBulkImporter(), args); >> > System.exit(exitCode); >> > } >> > >> > @Override >> > public int run(String[] arg0) throws Exception { >> > ....... >> > Job job = new Job(conf); >> > ...... >> > >> > job.setOutputFormatClass(CqlBulkOutputFormat.class); >> > >> > >> > ConfigHelper.setOutputInitialAddress(job.getConfiguration(), >> > "192.168.56.11"); >> > >> > ConfigHelper.setOutputPartitioner(job.getConfiguration(), >> > "Murmur3Partitioner"); >> > >> > ConfigHelper.setOutputRpcPort(job.getConfiguration(), >> > "9160"); >> > >> > ConfigHelper.setOutputKeyspace(job.getConfiguration(), >> > CASSANDRA_KEYSPACE_NAME); >> > ConfigHelper.setOutputColumnFamily( >> > job.getConfiguration(), >> > CASSANDRA_KEYSPACE_NAME, >> > CASSANDRA_TABLE_NAME >> > ); >> > //Set the properties for CqlBulkOutputFormat >> > MultipleOutputs.addNamedOutput(job, >> > CASSANDRA_TABLE_NAME, CqlBulkOutputFormat.class, Object.class, >> > List.class); >> > >> > CqlBulkOutputFormat.setColumnFamilySchema(job.getConfiguration(), >> > CASSANDRA_TABLE_NAME, "CREATE TABLE dev.participant(........)"); >> > >> > >> > CqlBulkOutputFormat.setColumnFamilyInsertStatement(job.getConfiguration(), >> > CASSANDRA_TABLE_NAME, "INSERT into dev.participant(........) values >> > (?,?,?,?,?) "); >> > >> > ..... >> > } >> > >> > } >> > >> > Reducer Code >> > >> > public class ReducerToCassandra extends Reducer> > List> { >> > >> > private MultipleOutputs multipleOutputs; >> > >> > @SuppressWarnings("unchecked") >> > protected void setup(Context context) throws IOException, >> > InterruptedException { >> > multipleOutputs = new MultipleOutputs(context); >> > } >> > >> > @Override >> > public void reduce(Text id, Iterable pInfo, Context context) >> > throws >> > IOException, InterruptedException { >> > .... >> > List bVariables = new ArrayList(); >> > >> > ..... >> > multipleOutputs.write(CASSANDRA_TABLE1, null, bVariables); >> > >> > } >> > >> > >> > >> >> >> >> -- >> Yuki Morishita >> t:yukim (http://twitter.com/yukim) > > -- Yuki Morishita t:yukim (http://twitter.com/yukim)