Return-Path: X-Original-To: apmail-cassandra-user-archive@www.apache.org Delivered-To: apmail-cassandra-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 36B701746D for ; Thu, 5 Mar 2015 13:58:21 +0000 (UTC) Received: (qmail 28716 invoked by uid 500); 5 Mar 2015 13:58:13 -0000 Delivered-To: apmail-cassandra-user-archive@cassandra.apache.org Received: (qmail 28673 invoked by uid 500); 5 Mar 2015 13:58:13 -0000 Mailing-List: contact user-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@cassandra.apache.org Delivered-To: mailing list user@cassandra.apache.org Received: (qmail 28663 invoked by uid 99); 5 Mar 2015 13:58:13 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Mar 2015 13:58:13 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS,WEIRD_PORT X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of aby.kuruvilla@envisagesystems.com designates 209.85.216.44 as permitted sender) Received: from [209.85.216.44] (HELO mail-qa0-f44.google.com) (209.85.216.44) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Mar 2015 13:58:07 +0000 Received: by mail-qa0-f44.google.com with SMTP id n8so38977964qaq.3 for ; Thu, 05 Mar 2015 05:56:59 -0800 (PST) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:content-type; bh=Un3KnaTs7+7diuq/JzpVY1HA3Wp+FPe1t3ufRll8K1c=; b=UYT5lli/skp+v+P9tqtXrVM+x1R65MmFWWmp2fQsz7UjI35HsQUXaSMRR1bHhUF5uA poh0+7HJl1u3Lb2wnNj1LKdMVIR/BhPFXYqhDHiv5sXazBOz/Aa9kmDL+LRWrfrnnice Pec8J0i4YdXj0ZtXwtrd/wGWmgvZ6Yggbbp85zTeVlfbJYtOHMP50OyxiT5hA0X1Hw6S TD7pBeHzhyRIbuIbtpuHj3Hwu0FT4ru8qXnkEMH0evRjAU2KZZhElxglwDZyEgjoJ/K3 jLfl4nUFDUj9iglU/r5kpzGgIppa19Ns1pS/2l2ADB+myzarmUv4l+zrAqQlIv3uRaxz Kuvw== X-Gm-Message-State: ALoCoQkgUkm2XEz/csm/YgbNwTutH11BvXN4AQrxMO+PJSC2Vm2wwlvk2nCQKucswoJsolNPfv8Z X-Received: by 10.140.144.73 with SMTP id 70mr12754781qhq.34.1425563819259; Thu, 05 Mar 2015 05:56:59 -0800 (PST) MIME-Version: 1.0 Received: by 10.229.13.71 with HTTP; Thu, 5 Mar 2015 05:56:38 -0800 (PST) In-Reply-To: References: From: Aby Kuruvilla Date: Thu, 5 Mar 2015 08:56:38 -0500 Message-ID: Subject: Re: Streaming failures during bulkloading data using CqlBulkOutputFormat To: user@cassandra.apache.org Content-Type: multipart/alternative; boundary=001a11353fe40d6d9905108af1d7 X-Virus-Checked: Checked by ClamAV on apache.org --001a11353fe40d6d9905108af1d7 Content-Type: text/plain; charset=UTF-8 Hi Yuki Thanks for the reply! Here is the log from Cassandra server for the stream failure INFO [STREAM-INIT-/192.168.56.1:58578] 2015-03-04 09:20:23,816 StreamResultFuture.java:109 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508 ID#0] Creating new streaming plan for Bulk Load INFO [STREAM-INIT-/192.168.56.1:58578] 2015-03-04 09:20:23,816 StreamResultFuture.java:116 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Received streaming plan for Bulk Load INFO [STREAM-INIT-/192.168.56.1:58579] 2015-03-04 09:20:23,819 StreamResultFuture.java:116 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Received streaming plan for Bulk Load INFO [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,822 StreamResultFuture.java:166 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508 ID#0] Prepare completed. Receiving 1 files(617874 bytes), sending 0 files(0 bytes) WARN [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,823 StreamSession.java:597 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Retrying for following error java.io.IOException: CF d6d35793-729c-3cab-bee0-84e971e48675 was dropped during streaming at org.apache.cassandra.streaming.compress.CompressedStreamReader.read(CompressedStreamReader.java:71) ~[main/:na] at org.apache.cassandra.streaming.messages.IncomingFileMessage$1.deserialize(IncomingFileMessage.java:48) [main/:na] at org.apache.cassandra.streaming.messages.IncomingFileMessage$1.deserialize(IncomingFileMessage.java:38) [main/:na] at org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:55) [main/:na] at org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245) [main/:na] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] ERROR [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,828 StreamSession.java:477 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error occurred java.lang.IllegalArgumentException: Unknown type 0 at org.apache.cassandra.streaming.messages.StreamMessage$Type.get(StreamMessage.java:89) ~[main/:na] at org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:54) ~[main/:na] at org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245) ~[main/:na] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] INFO [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829 StreamResultFuture.java:180 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Session with /127.0.0.1 is complete WARN [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829 StreamResultFuture.java:207 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Stream failed On Wed, Mar 4, 2015 at 1:18 PM, Yuki Morishita wrote: > Do you have corresponding error in the other side of the stream > (/192.168.56.11)? > > > On Wed, Mar 4, 2015 at 9:11 AM, Aby Kuruvilla > wrote: > > I am trying to use the CqlBulkOutputFormat in a Hadoop job to bulk load > data > > into Cassandra. Was not able to find any documentation of this new > output > > format , but from looking through the code this uses CQLSSTableWriter to > > write SSTable files to disk , which are then streamed to Cassandra using > > SSTableLoader. On running the Hadoop job, I can see that the SSTable > files > > do get generated but fails to stream the data out. I get the same > exception > > when I try with Cassndra node on "localhost" as well as a remote > Cassandra > > cluster. Also I get this exception on C* versions 2.1.1, 2.1.2 and > 2.1.3. > > > > Relevant portion of logs and stack trace > > > > 09:20:23.207 [Thread-6] WARN org.apache.cassandra.utils.CLibrary - JNA > link > > failure, one or more native method will be unavailable. > > 09:20:23.208 [Thread-6] DEBUG org.apache.cassandra.utils.CLibrary - JNA > link > > failure details: Error looking up function 'posix_fadvise': > > dlsym(0x7fff6ab8a5e0, posix_fadvise): symbol not found > > 09:20:23.504 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - > > Renaming > > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Filter.db > > to > > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Filter.db > > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - > > Renaming > > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Digest.sha1 > > to > > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Digest.sha1 > > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - > > Renaming > > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Statistics.db > > to > > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Statistics.db > > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - > > Renaming > > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Index.db > > to > > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Index.db > > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - > > Renaming > > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-CompressionInfo.db > > to > > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-CompressionInfo.db > > 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - > > Renaming > > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-TOC.txt > > to > > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-TOC.txt > > 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - > > Renaming > > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Data.db > > to > > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db > > 09:20:23.727 [Thread-2] DEBUG o.a.c.i.s.m.MetadataSerializer - Load > metadata > > for > > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1 > > 09:20:23.729 [Thread-2] INFO o.a.c.io.sstable.SSTableReader - Opening > > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1 > > (617874 bytes) > > 09:20:23.780 [Thread-2] INFO o.a.c.streaming.StreamResultFuture - > [Stream > > #98ba8730-c279-11e4-b8e9-55374d280508] Executing streaming plan for Bulk > > Load > > 09:20:23.781 [StreamConnectionEstablisher:1] INFO > > o.a.c.streaming.StreamSession - [Stream > > #98ba8730-c279-11e4-b8e9-55374d280508] Starting streaming to / > 192.168.56.11 > > 09:20:23.781 [StreamConnectionEstablisher:1] DEBUG > > o.a.c.streaming.ConnectionHandler - [Stream > > #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for incoming > > stream > > 09:20:23.792 [StreamConnectionEstablisher:1] DEBUG > > o.a.c.streaming.ConnectionHandler - [Stream > > #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for outgoing > > stream > > 09:20:23.794 [STREAM-OUT-/192.168.56.11] DEBUG > > o.a.c.streaming.ConnectionHandler - [Stream > > #98ba8730-c279-11e4-b8e9-55374d280508] Sending Prepare (0 requests, 1 > > files} > > 09:20:23.795 [StreamConnectionEstablisher:1] INFO > > o.a.c.streaming.StreamResultFuture - [Stream > > #98ba8730-c279-11e4-b8e9-55374d280508 ID#0] Prepare completed. Receiving > 0 > > files(0 bytes), sending 1 files(617874 bytes) > > 09:20:23.799 [StreamConnectionEstablisher:1] INFO > > o.a.c.streaming.StreamCoordinator - [Stream > > #98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Beginning stream session > with > > /192.168.56.11 > > 09:20:23.799 [STREAM-OUT-/192.168.56.11] DEBUG > > o.a.c.streaming.ConnectionHandler - [Stream > > #98ba8730-c279-11e4-b8e9-55374d280508] Sending File (Header (cfId: > > d6d35793-729c-3cab-bee0-84e971e48675, #0, version: ka, estimated keys: > 3072, > > transfer size: 617874, compressed?: true, repairedAt: 0), file: > > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db) > > 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG > > o.a.c.streaming.ConnectionHandler - [Stream > > #98ba8730-c279-11e4-b8e9-55374d280508] Received Retry > > (d6d35793-729c-3cab-bee0-84e971e48675, #0) > > 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG > > o.a.c.streaming.ConnectionHandler - [Stream > > #98ba8730-c279-11e4-b8e9-55374d280508] Received Session Failed > > 09:20:23.809 [STREAM-IN-/192.168.56.11] DEBUG > > o.a.c.streaming.ConnectionHandler - [Stream > > #98ba8730-c279-11e4-b8e9-55374d280508] Closing stream connection handler > on > > /192.168.56.11 > > 09:20:23.811 [STREAM-IN-/192.168.56.11] INFO > > o.a.c.streaming.StreamResultFuture - [Stream > > #98ba8730-c279-11e4-b8e9-55374d280508] Session with /192.168.56.11 is > > complete > > 09:20:23.812 [STREAM-IN-/192.168.56.11] WARN > > o.a.c.streaming.StreamResultFuture - [Stream > > #98ba8730-c279-11e4-b8e9-55374d280508] Stream failed > > 09:20:23.815 [STREAM-OUT-/192.168.56.11] ERROR > o.a.c.streaming.StreamSession > > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error occurred > > java.io.IOException: Broken pipe > > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[na:1.7.0_51] > > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > > ~[na:1.7.0_51] > > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > > ~[na:1.7.0_51] > > at sun.nio.ch.IOUtil.write(IOUtil.java:51) ~[na:1.7.0_51] > > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487) > > ~[na:1.7.0_51] > > at > > > sun.nio.ch.FileChannelImpl.transferToTrustedChannel(FileChannelImpl.java:473) > > ~[na:1.7.0_51] > > at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:569) > > ~[na:1.7.0_51] > > at > > > org.apache.cassandra.streaming.compress.CompressedStreamWriter.write(CompressedStreamWriter.java:74) > > ~[cassandra-all-2.1.2.jar:2.1.2] > > at > > > org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:56) > > ~[cassandra-all-2.1.2.jar:2.1.2] > > at > > > org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:40) > > ~[cassandra-all-2.1.2.jar:2.1.2] > > at > > > org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:45) > > ~[cassandra-all-2.1.2.jar:2.1.2] > > at > > > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346) > > [cassandra-all-2.1.2.jar:2.1.2] > > at > > > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:318) > > [cassandra-all-2.1.2.jar:2.1.2] > > at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51] > > 09:20:23.816 [STREAM-OUT-/192.168.56.11] ERROR > o.a.c.streaming.StreamSession > > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error occurred > > java.io.IOException: Broken pipe > > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[na:1.7.0_51] > > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > > ~[na:1.7.0_51] > > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > > ~[na:1.7.0_51] > > at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.7.0_51] > > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487) > > ~[na:1.7.0_51] > > at > > > org.apache.cassandra.io.util.DataOutputStreamAndChannel.write(DataOutputStreamAndChannel.java:48) > > ~[cassandra-all-2.1.2.jar:2.1.2] > > at > > > org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:44) > > ~[cassandra-all-2.1.2.jar:2.1.2] > > at > > > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346) > > [cassandra-all-2.1.2.jar:2.1.2] > > at > > > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:326) > > [cassandra-all-2.1.2.jar:2.1.2] > > at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51] > > > > ---------------------------------------------------- > > > > Here is what I have tried > > > > > > Hadoop Driver > > > > public class CassandraBulkImporter extends Configured implements Tool{ > > > > ..... > > > > public static void main(String[] args) throws Exception { > > int exitCode = ToolRunner.run(new > > CassandraBulkImporter(), args); > > System.exit(exitCode); > > } > > > > @Override > > public int run(String[] arg0) throws Exception { > > ....... > > Job job = new Job(conf); > > ...... > > > job.setOutputFormatClass(CqlBulkOutputFormat.class); > > > > > > ConfigHelper.setOutputInitialAddress(job.getConfiguration(), > > "192.168.56.11"); > > > > ConfigHelper.setOutputPartitioner(job.getConfiguration(), > > "Murmur3Partitioner"); > > > ConfigHelper.setOutputRpcPort(job.getConfiguration(), > > "9160"); > > > > ConfigHelper.setOutputKeyspace(job.getConfiguration(), > > CASSANDRA_KEYSPACE_NAME); > > ConfigHelper.setOutputColumnFamily( > > job.getConfiguration(), > > CASSANDRA_KEYSPACE_NAME, > > CASSANDRA_TABLE_NAME > > ); > > //Set the properties for CqlBulkOutputFormat > > MultipleOutputs.addNamedOutput(job, > > CASSANDRA_TABLE_NAME, CqlBulkOutputFormat.class, Object.class, > List.class); > > > > CqlBulkOutputFormat.setColumnFamilySchema(job.getConfiguration(), > > CASSANDRA_TABLE_NAME, "CREATE TABLE dev.participant(........)"); > > > > > CqlBulkOutputFormat.setColumnFamilyInsertStatement(job.getConfiguration(), > > CASSANDRA_TABLE_NAME, "INSERT into dev.participant(........) values > > (?,?,?,?,?) "); > > > > ..... > > } > > > > } > > > > Reducer Code > > > > public class ReducerToCassandra extends Reducer > List> { > > > > private MultipleOutputs multipleOutputs; > > > > @SuppressWarnings("unchecked") > > protected void setup(Context context) throws IOException, > > InterruptedException { > > multipleOutputs = new MultipleOutputs(context); > > } > > > > @Override > > public void reduce(Text id, Iterable pInfo, Context context) > throws > > IOException, InterruptedException { > > .... > > List bVariables = new ArrayList(); > > > > ..... > > multipleOutputs.write(CASSANDRA_TABLE1, null, bVariables); > > > > } > > > > > > > > > > -- > Yuki Morishita > t:yukim (http://twitter.com/yukim) > --001a11353fe40d6d9905108af1d7 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Yuki

Thanks for the reply!
Here is the log from Cassandra server for the stream failure
INFO=C2=A0 [STREAM-INIT-/192.168.56.= 1:58578] 2015-03-04 09:20:23,816 StreamResultFuture.java:109 - [Stream = #98ba8730-c279-11e4-b8e9-55374d280508 ID#0] Creating new streaming plan for= Bulk Load
INFO=C2=A0 [STREAM-INIT-/192.168.56.1:58578] 2015-03-04 09:20:23,816 StreamResultFuture.java:1= 16 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Received streamin= g plan for Bulk Load
INFO=C2=A0 [STREAM-INIT-/192.168.56.1:58579] 2015-03-04 09:20:23,819 StreamResultFut= ure.java:116 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Receive= d streaming plan for Bulk Load
INFO=C2=A0 [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,822 StreamResultFuture.java:1= 66 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508 ID#0] Prepare completed.= Receiving 1 files(617874 bytes), sending 0 files(0 bytes)
WARN=C2=A0 [S= TREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,8= 23 StreamSession.java:597 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] = Retrying for following error
java.io.IOException: CF d6d35793-729c-3cab-= bee0-84e971e48675 was dropped during streaming
=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0 at org.apache.cassandra.streaming.compress.CompressedStr= eamReader.read(CompressedStreamReader.java:71) ~[main/:na]
=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.cassandra.streaming.messages.I= ncomingFileMessage$1.deserialize(IncomingFileMessage.java:48) [main/:na]=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.cassandra.streami= ng.messages.IncomingFileMessage$1.deserialize(IncomingFileMessage.java:38) = [main/:na]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.cass= andra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:55) [= main/:na]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.cassa= ndra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandl= er.java:245) [main/:na]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at ja= va.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
ERROR [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,828 StreamSessi= on.java:477 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming erro= r occurred
java.lang.IllegalArgumentException: Unknown type 0
=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.cassandra.streaming.mess= ages.StreamMessage$Type.get(StreamMessage.java:89) ~[main/:na]
=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.cassandra.streaming.message= s.StreamMessage.deserialize(StreamMessage.java:54) ~[main/:na]
=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.cassandra.streaming.Connect= ionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245) ~[main/:n= a]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at java.lang.Thread.run(Th= read.java:745) [na:1.7.0_65]
INFO=C2=A0 [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829 StreamResultFuture.java:180= - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Session with /127.0.0.1 is complete
WARN=C2=A0 [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829 StreamResult= Future.java:207 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Stream fai= led



On Wed, Mar 4, 2015 at 1:18 PM, Yuki Morishita <mor.yuki@gmail.com> wrote:
Do you have correspon= ding error in the other side of the stream
(/192.168.56.11)?


On Wed, Mar 4, 2015 at 9:11 AM, Aby Kuruvilla
<aby.kuruvilla@envi= sagesystems.com> wrote:
> I am trying to use the CqlBulkOutputFormat in a Hadoop job to bulk loa= d data
> into Cassandra.=C2=A0 Was not able to find any documentation of this n= ew output
> format , but from looking through the code this uses CQLSSTableWriter = to
> write SSTable files to disk , which are then streamed to Cassandra usi= ng
> SSTableLoader. On running the Hadoop job, I can see that the SSTable f= iles
> do get generated but fails to stream the data out. I get the same exce= ption
> when I try with Cassndra node on "localhost" as well as a re= mote Cassandra
> cluster. Also I get this exception on C* versions 2.1.1,=C2=A0 2.1.2 a= nd 2.1.3.
>
> Relevant portion of logs and stack trace
>
> 09:20:23.207 [Thread-6] WARN=C2=A0 org.apache.cassandra.utils.CLibrary= - JNA link
> failure, one or more native method will be unavailable.
> 09:20:23.208 [Thread-6] DEBUG org.apache.cassandra.utils.CLibrary - JN= A link
> failure details: Error looking up function 'posix_fadvise': > dlsym(0x7fff6ab8a5e0, posix_fadvise): symbol not found
> 09:20:23.504 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - > Renaming
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce= 044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Filter.db
> to
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce= 044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Filter.db
> 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - > Renaming
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce= 044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Digest.sha1
> to
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce= 044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Digest.sha1
> 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - > Renaming
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce= 044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Statistics.db
> to
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce= 044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Statistics.db
> 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - > Renaming
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce= 044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Index.db
> to
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce= 044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Index.db
> 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - > Renaming
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce= 044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-CompressionInfo.db=
> to
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce= 044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-CompressionInfo.db
> 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - > Renaming
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce= 044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-TOC.txt
> to
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce= 044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-TOC.txt
> 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - > Renaming
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce= 044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Data.db
> to
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce= 044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db
> 09:20:23.727 [Thread-2] DEBUG o.a.c.i.s.m.MetadataSerializer - Load me= tadata
> for
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce= 044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1
> 09:20:23.729 [Thread-2] INFO=C2=A0 o.a.c.io.sstable.SSTableReader - Op= ening
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce= 044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1
> (617874 bytes)
> 09:20:23.780 [Thread-2] INFO=C2=A0 o.a.c.streaming.StreamResultFuture = - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Executing streaming plan for Bu= lk
> Load
> 09:20:23.781 [StreamConnectionEstablisher:1] INFO
> o.a.c.streaming.StreamSession - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Starting streaming to /192.168.56.11
> 09:20:23.781 [StreamConnectionEstablisher:1] DEBUG
> o.a.c.streaming.ConnectionHandler - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for incomin= g
> stream
> 09:20:23.792 [StreamConnectionEstablisher:1] DEBUG
> o.a.c.streaming.ConnectionHandler - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for outgoin= g
> stream
> 09:20:23.794 [STREAM-OUT-/192.168.56.11] DEBUG
> o.a.c.streaming.ConnectionHandler - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Sending Prepare (0 requests,=C2= =A0 1
> files}
> 09:20:23.795 [StreamConnectionEstablisher:1] INFO
> o.a.c.streaming.StreamResultFuture - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508 ID#0] Prepare completed. Receivi= ng 0
> files(0 bytes), sending 1 files(617874 bytes)
> 09:20:23.799 [StreamConnectionEstablisher:1] INFO
> o.a.c.streaming.StreamCoordinator - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Beginning stream session = with
> /192.168.56.11<= br> > 09:20:23.799 [STREAM-OUT-/192.168.56.11] DEBUG
> o.a.c.streaming.ConnectionHandler - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Sending File (Header (cfId:
> d6d35793-729c-3cab-bee0-84e971e48675, #0, version: ka, estimated keys:= 3072,
> transfer size: 617874, compressed?: true, repairedAt: 0), file:
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce= 044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db)
> 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG
> o.a.c.streaming.ConnectionHandler - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Received Retry
> (d6d35793-729c-3cab-bee0-84e971e48675, #0)
> 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG
> o.a.c.streaming.ConnectionHandler - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Received Session Failed
> 09:20:23.809 [STREAM-IN-/192.168.56.11] DEBUG
> o.a.c.streaming.ConnectionHandler - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Closing stream connection handl= er on
> /192.168.56.11<= br> > 09:20:23.811 [STREAM-IN-/192.168.56.11] INFO
> o.a.c.streaming.StreamResultFuture - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Session with /192.168.56.11 is
> complete
> 09:20:23.812 [STREAM-IN-/192.168.56.11] WARN
> o.a.c.streaming.StreamResultFuture - [Stream
> #98ba8730-c279-11e4-b8e9-55374d280508] Stream failed
> 09:20:23.815 [STREAM-OUT-/192.168.56.11] ERROR o.a.c.streaming.StreamSession
> - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error occur= red
> java.io.IOException: Broken pipe
>=C2=A0 =C2=A0 =C2=A0at sun.nio.ch.FileDispatcherImpl.write0(Native Meth= od) ~[na:1.7.0_51]
>=C2=A0 =C2=A0 =C2=A0at sun.nio.ch.SocketDispatcher.write(SocketDispatch= er.java:47)
> ~[na:1.7.0_51]
>=C2=A0 =C2=A0 =C2=A0at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.j= ava:93)
> ~[na:1.7.0_51]
>=C2=A0 =C2=A0 =C2=A0at sun.nio.ch.IOUtil.write(IOUtil.java:51) ~[na:1.7= .0_51]
>=C2=A0 =C2=A0 =C2=A0at sun.nio.ch.SocketChannelImpl.write(SocketChannel= Impl.java:487)
> ~[na:1.7.0_51]
>=C2=A0 =C2=A0 =C2=A0at
> sun.nio.ch.FileChannelImpl.transferToTrustedChannel(FileChannelImpl.ja= va:473)
> ~[na:1.7.0_51]
>=C2=A0 =C2=A0 =C2=A0at sun.nio.ch.FileChannelImpl.transferTo(FileChanne= lImpl.java:569)
> ~[na:1.7.0_51]
>=C2=A0 =C2=A0 =C2=A0at
> org.apache.cassandra.streaming.compress.CompressedStreamWriter.write(C= ompressedStreamWriter.java:74)
> ~[cassandra-all-2.1.2.jar:2.1.2]
>=C2=A0 =C2=A0 =C2=A0at
> org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serializ= e(OutgoingFileMessage.java:56)
> ~[cassandra-all-2.1.2.jar:2.1.2]
>=C2=A0 =C2=A0 =C2=A0at
> org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serializ= e(OutgoingFileMessage.java:40)
> ~[cassandra-all-2.1.2.jar:2.1.2]
>=C2=A0 =C2=A0 =C2=A0at
> org.apache.cassandra.streaming.messages.StreamMessage.serialize(Stream= Message.java:45)
> ~[cassandra-all-2.1.2.jar:2.1.2]
>=C2=A0 =C2=A0 =C2=A0at
> org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandle= r.sendMessage(ConnectionHandler.java:346)
> [cassandra-all-2.1.2.jar:2.1.2]
>=C2=A0 =C2=A0 =C2=A0at
> org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandle= r.run(ConnectionHandler.java:318)
> [cassandra-all-2.1.2.jar:2.1.2]
>=C2=A0 =C2=A0 =C2=A0at java.lang.Thread.run(Thread.java:744) [na:1.7.0_= 51]
> 09:20:23.816 [STREAM-OUT-/192.168.56.11] ERROR o.a.c.streaming.StreamSession
> - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error occur= red
> java.io.IOException: Broken pipe
>=C2=A0 =C2=A0 =C2=A0at sun.nio.ch.FileDispatcherImpl.write0(Native Meth= od) ~[na:1.7.0_51]
>=C2=A0 =C2=A0 =C2=A0at sun.nio.ch.SocketDispatcher.write(SocketDispatch= er.java:47)
> ~[na:1.7.0_51]
>=C2=A0 =C2=A0 =C2=A0at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.j= ava:93)
> ~[na:1.7.0_51]
>=C2=A0 =C2=A0 =C2=A0at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.7= .0_51]
>=C2=A0 =C2=A0 =C2=A0at sun.nio.ch.SocketChannelImpl.write(SocketChannel= Impl.java:487)
> ~[na:1.7.0_51]
>=C2=A0 =C2=A0 =C2=A0at
> org.apache.cassandra.io.util.DataOutputStreamAndChannel.write(DataOutp= utStreamAndChannel.java:48)
> ~[cassandra-all-2.1.2.jar:2.1.2]
>=C2=A0 =C2=A0 =C2=A0at
> org.apache.cassandra.streaming.messages.StreamMessage.serialize(Stream= Message.java:44)
> ~[cassandra-all-2.1.2.jar:2.1.2]
>=C2=A0 =C2=A0 =C2=A0at
> org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandle= r.sendMessage(ConnectionHandler.java:346)
> [cassandra-all-2.1.2.jar:2.1.2]
>=C2=A0 =C2=A0 =C2=A0at
> org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandle= r.run(ConnectionHandler.java:326)
> [cassandra-all-2.1.2.jar:2.1.2]
>=C2=A0 =C2=A0 =C2=A0at java.lang.Thread.run(Thread.java:744) [na:1.7.0_= 51]
>
> ----------------------------------------------------
>
> Here is what I have tried
>
>
> Hadoop Driver
>
> public class CassandraBulkImporter extends Configured implements Tool{=
>
> .....
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 public static void mai= n(String[] args) throws Exception {
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 int exitCode =3D ToolRunner.run(new
> CassandraBulkImporter(), args);
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 System.exit(exitCode);
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 @Override
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0public int run(S= tring[] arg0) throws Exception {
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 .......
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 Job job =3D new Job(conf);
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 ......
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 job.setOutputFormatClass(CqlBulkOutputFormat.class);
>
>
> ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
> "192.168.56.11");
>
> ConfigHelper.setOutputPartitioner(job.getConfiguration(),
> "Murmur3Partitioner");
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 ConfigHelper.setOutputRpcPort(job.getConfiguration(),
> "9160");
>
> ConfigHelper.setOutputKeyspace(job.getConfiguration(),
> CASSANDRA_KEYSPACE_NAME);
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 ConfigHelper.setOutputColumnFamily(
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0job.getConfiguration= (),
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 CASSANDRA_KEYSPACE_NAME, >=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 CASSANDRA_TABLE_NAME
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0);
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 //Set the properties for CqlBulkOutputFormat
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 MultipleOutputs.addNamedOutput(job,
> CASSANDRA_TABLE_NAME, CqlBulkOutputFormat.class, Object.class, List.cl= ass);
>
> CqlBulkOutputFormat.setColumnFamilySchema(job.getConfiguration(),
> CASSANDRA_TABLE_NAME, "CREATE TABLE dev.participant(........)&quo= t;);
>
> CqlBulkOutputFormat.setColumnFamilyInsertStatement(job.getConfiguratio= n(),
> CASSANDRA_TABLE_NAME, "INSERT into dev.participant(........) valu= es
> (?,?,?,?,?) ");
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0.....
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
>
> }
>
> Reducer Code
>
> public class ReducerToCassandra extends Reducer<Text, Text, Object,=
> List<ByteBuffer>> {
>
>=C2=A0 =C2=A0private MultipleOutputs multipleOutputs;
>
>=C2=A0 =C2=A0@SuppressWarnings("unchecked")
>=C2=A0 =C2=A0 protected void setup(Context context) throws IOException,=
> InterruptedException {
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0multipleOutputs =3D new MultipleOutpu= ts(context);
>=C2=A0 =C2=A0 }
>
>=C2=A0 =C2=A0@Override
>=C2=A0 =C2=A0 public void reduce(Text id, Iterable<Text> pInfo, C= ontext context) throws
> IOException, InterruptedException {
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 ....
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 List<ByteBuffer> bVariables =3D new A= rrayList<ByteBuffer>();
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0.....
>=C2=A0 =C2=A0 =C2=A0 =C2=A0multipleOutputs.write(CASSANDRA_TABLE1, null= , bVariables);
>
> }
>
>
>



--
Yuki Morishita
=C2=A0t:yukim (http:= //twitter.com/yukim)

--001a11353fe40d6d9905108af1d7--