cassandra-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yuki Morishita <mor.y...@gmail.com>
Subject Re: Streaming failures during bulkloading data using CqlBulkOutputFormat
Date Thu, 05 Mar 2015 17:25:22 GMT
Thanks!

On Thu, Mar 5, 2015 at 11:10 AM, Aby Kuruvilla
<aby.kuruvilla@envisagesystems.com> wrote:
> Thanks Yuki, have created a JIRA ticket
>
> https://issues.apache.org/jira/browse/CASSANDRA-8924
>
> On Thu, Mar 5, 2015 at 10:34 AM, Yuki Morishita <mor.yuki@gmail.com> wrote:
>>
>> Thanks.
>> It looks like a bug. Can you create a ticket on JIRA?
>>
>> https://issues.apache.org/jira/browse/CASSANDRA
>>
>> On Thu, Mar 5, 2015 at 7:56 AM, Aby Kuruvilla
>> <aby.kuruvilla@envisagesystems.com> wrote:
>> > Hi Yuki
>> >
>> > Thanks for the reply!
>> >
>> > Here is the log from Cassandra server for the stream failure
>> >
>> > INFO  [STREAM-INIT-/192.168.56.1:58578] 2015-03-04 09:20:23,816
>> > StreamResultFuture.java:109 - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508
>> > ID#0] Creating new streaming plan for Bulk Load
>> > INFO  [STREAM-INIT-/192.168.56.1:58578] 2015-03-04 09:20:23,816
>> > StreamResultFuture.java:116 - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508,
>> > ID#0] Received streaming plan for Bulk Load
>> > INFO  [STREAM-INIT-/192.168.56.1:58579] 2015-03-04 09:20:23,819
>> > StreamResultFuture.java:116 - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508,
>> > ID#0] Received streaming plan for Bulk Load
>> > INFO  [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,822
>> > StreamResultFuture.java:166 - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508
>> > ID#0] Prepare completed. Receiving 1 files(617874 bytes), sending 0
>> > files(0
>> > bytes)
>> > WARN  [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,823
>> > StreamSession.java:597
>> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Retrying for following
>> > error
>> > java.io.IOException: CF d6d35793-729c-3cab-bee0-84e971e48675 was dropped
>> > during streaming
>> >         at
>> >
>> > org.apache.cassandra.streaming.compress.CompressedStreamReader.read(CompressedStreamReader.java:71)
>> > ~[main/:na]
>> >         at
>> >
>> > org.apache.cassandra.streaming.messages.IncomingFileMessage$1.deserialize(IncomingFileMessage.java:48)
>> > [main/:na]
>> >         at
>> >
>> > org.apache.cassandra.streaming.messages.IncomingFileMessage$1.deserialize(IncomingFileMessage.java:38)
>> > [main/:na]
>> >         at
>> >
>> > org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:55)
>> > [main/:na]
>> >         at
>> >
>> > org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245)
>> > [main/:na]
>> >         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>> > ERROR [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,828
>> > StreamSession.java:477
>> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error
>> > occurred
>> > java.lang.IllegalArgumentException: Unknown type 0
>> >         at
>> >
>> > org.apache.cassandra.streaming.messages.StreamMessage$Type.get(StreamMessage.java:89)
>> > ~[main/:na]
>> >         at
>> >
>> > org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:54)
>> > ~[main/:na]
>> >         at
>> >
>> > org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245)
>> > ~[main/:na]
>> >         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>> > INFO  [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829
>> > StreamResultFuture.java:180 - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508]
>> > Session with /127.0.0.1 is complete
>> > WARN  [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829
>> > StreamResultFuture.java:207 - [Stream
>> > #98ba8730-c279-11e4-b8e9-55374d280508]
>> > Stream failed
>> >
>> >
>> >
>> > On Wed, Mar 4, 2015 at 1:18 PM, Yuki Morishita <mor.yuki@gmail.com>
>> > wrote:
>> >>
>> >> Do you have corresponding error in the other side of the stream
>> >> (/192.168.56.11)?
>> >>
>> >>
>> >> On Wed, Mar 4, 2015 at 9:11 AM, Aby Kuruvilla
>> >> <aby.kuruvilla@envisagesystems.com> wrote:
>> >> > I am trying to use the CqlBulkOutputFormat in a Hadoop job to bulk
>> >> > load
>> >> > data
>> >> > into Cassandra.  Was not able to find any documentation of this new
>> >> > output
>> >> > format , but from looking through the code this uses CQLSSTableWriter
>> >> > to
>> >> > write SSTable files to disk , which are then streamed to Cassandra
>> >> > using
>> >> > SSTableLoader. On running the Hadoop job, I can see that the SSTable
>> >> > files
>> >> > do get generated but fails to stream the data out. I get the same
>> >> > exception
>> >> > when I try with Cassndra node on "localhost" as well as a remote
>> >> > Cassandra
>> >> > cluster. Also I get this exception on C* versions 2.1.1,  2.1.2 and
>> >> > 2.1.3.
>> >> >
>> >> > Relevant portion of logs and stack trace
>> >> >
>> >> > 09:20:23.207 [Thread-6] WARN  org.apache.cassandra.utils.CLibrary -
>> >> > JNA
>> >> > link
>> >> > failure, one or more native method will be unavailable.
>> >> > 09:20:23.208 [Thread-6] DEBUG org.apache.cassandra.utils.CLibrary -
>> >> > JNA
>> >> > link
>> >> > failure details: Error looking up function 'posix_fadvise':
>> >> > dlsym(0x7fff6ab8a5e0, posix_fadvise): symbol not found
>> >> > 09:20:23.504 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils
-
>> >> > Renaming
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Filter.db
>> >> > to
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Filter.db
>> >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils
-
>> >> > Renaming
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Digest.sha1
>> >> > to
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Digest.sha1
>> >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils
-
>> >> > Renaming
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Statistics.db
>> >> > to
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Statistics.db
>> >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils
-
>> >> > Renaming
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Index.db
>> >> > to
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Index.db
>> >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils
-
>> >> > Renaming
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-CompressionInfo.db
>> >> > to
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-CompressionInfo.db
>> >> > 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils
-
>> >> > Renaming
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-TOC.txt
>> >> > to
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-TOC.txt
>> >> > 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils
-
>> >> > Renaming
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Data.db
>> >> > to
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db
>> >> > 09:20:23.727 [Thread-2] DEBUG o.a.c.i.s.m.MetadataSerializer - Load
>> >> > metadata
>> >> > for
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1
>> >> > 09:20:23.729 [Thread-2] INFO  o.a.c.io.sstable.SSTableReader -
>> >> > Opening
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1
>> >> > (617874 bytes)
>> >> > 09:20:23.780 [Thread-2] INFO  o.a.c.streaming.StreamResultFuture -
>> >> > [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Executing streaming plan for
>> >> > Bulk
>> >> > Load
>> >> > 09:20:23.781 [StreamConnectionEstablisher:1] INFO
>> >> > o.a.c.streaming.StreamSession - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Starting streaming to
>> >> > /192.168.56.11
>> >> > 09:20:23.781 [StreamConnectionEstablisher:1] DEBUG
>> >> > o.a.c.streaming.ConnectionHandler - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for
>> >> > incoming
>> >> > stream
>> >> > 09:20:23.792 [StreamConnectionEstablisher:1] DEBUG
>> >> > o.a.c.streaming.ConnectionHandler - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for
>> >> > outgoing
>> >> > stream
>> >> > 09:20:23.794 [STREAM-OUT-/192.168.56.11] DEBUG
>> >> > o.a.c.streaming.ConnectionHandler - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending Prepare (0 requests,
>> >> > 1
>> >> > files}
>> >> > 09:20:23.795 [StreamConnectionEstablisher:1] INFO
>> >> > o.a.c.streaming.StreamResultFuture - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508 ID#0] Prepare completed.
>> >> > Receiving
>> >> > 0
>> >> > files(0 bytes), sending 1 files(617874 bytes)
>> >> > 09:20:23.799 [StreamConnectionEstablisher:1] INFO
>> >> > o.a.c.streaming.StreamCoordinator - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Beginning stream session
>> >> > with
>> >> > /192.168.56.11
>> >> > 09:20:23.799 [STREAM-OUT-/192.168.56.11] DEBUG
>> >> > o.a.c.streaming.ConnectionHandler - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending File (Header (cfId:
>> >> > d6d35793-729c-3cab-bee0-84e971e48675, #0, version: ka, estimated
>> >> > keys:
>> >> > 3072,
>> >> > transfer size: 617874, compressed?: true, repairedAt: 0), file:
>> >> >
>> >> >
>> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db)
>> >> > 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG
>> >> > o.a.c.streaming.ConnectionHandler - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Received Retry
>> >> > (d6d35793-729c-3cab-bee0-84e971e48675, #0)
>> >> > 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG
>> >> > o.a.c.streaming.ConnectionHandler - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Received Session Failed
>> >> > 09:20:23.809 [STREAM-IN-/192.168.56.11] DEBUG
>> >> > o.a.c.streaming.ConnectionHandler - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Closing stream connection
>> >> > handler
>> >> > on
>> >> > /192.168.56.11
>> >> > 09:20:23.811 [STREAM-IN-/192.168.56.11] INFO
>> >> > o.a.c.streaming.StreamResultFuture - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Session with /192.168.56.11
is
>> >> > complete
>> >> > 09:20:23.812 [STREAM-IN-/192.168.56.11] WARN
>> >> > o.a.c.streaming.StreamResultFuture - [Stream
>> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Stream failed
>> >> > 09:20:23.815 [STREAM-OUT-/192.168.56.11] ERROR
>> >> > o.a.c.streaming.StreamSession
>> >> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error
>> >> > occurred
>> >> > java.io.IOException: Broken pipe
>> >> >     at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>> >> > ~[na:1.7.0_51]
>> >> >     at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>> >> > ~[na:1.7.0_51]
>> >> >     at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>> >> > ~[na:1.7.0_51]
>> >> >     at sun.nio.ch.IOUtil.write(IOUtil.java:51) ~[na:1.7.0_51]
>> >> >     at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
>> >> > ~[na:1.7.0_51]
>> >> >     at
>> >> >
>> >> >
>> >> > sun.nio.ch.FileChannelImpl.transferToTrustedChannel(FileChannelImpl.java:473)
>> >> > ~[na:1.7.0_51]
>> >> >     at
>> >> > sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:569)
>> >> > ~[na:1.7.0_51]
>> >> >     at
>> >> >
>> >> >
>> >> > org.apache.cassandra.streaming.compress.CompressedStreamWriter.write(CompressedStreamWriter.java:74)
>> >> > ~[cassandra-all-2.1.2.jar:2.1.2]
>> >> >     at
>> >> >
>> >> >
>> >> > org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:56)
>> >> > ~[cassandra-all-2.1.2.jar:2.1.2]
>> >> >     at
>> >> >
>> >> >
>> >> > org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:40)
>> >> > ~[cassandra-all-2.1.2.jar:2.1.2]
>> >> >     at
>> >> >
>> >> >
>> >> > org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:45)
>> >> > ~[cassandra-all-2.1.2.jar:2.1.2]
>> >> >     at
>> >> >
>> >> >
>> >> > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346)
>> >> > [cassandra-all-2.1.2.jar:2.1.2]
>> >> >     at
>> >> >
>> >> >
>> >> > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:318)
>> >> > [cassandra-all-2.1.2.jar:2.1.2]
>> >> >     at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
>> >> > 09:20:23.816 [STREAM-OUT-/192.168.56.11] ERROR
>> >> > o.a.c.streaming.StreamSession
>> >> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error
>> >> > occurred
>> >> > java.io.IOException: Broken pipe
>> >> >     at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>> >> > ~[na:1.7.0_51]
>> >> >     at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>> >> > ~[na:1.7.0_51]
>> >> >     at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>> >> > ~[na:1.7.0_51]
>> >> >     at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.7.0_51]
>> >> >     at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
>> >> > ~[na:1.7.0_51]
>> >> >     at
>> >> >
>> >> >
>> >> > org.apache.cassandra.io.util.DataOutputStreamAndChannel.write(DataOutputStreamAndChannel.java:48)
>> >> > ~[cassandra-all-2.1.2.jar:2.1.2]
>> >> >     at
>> >> >
>> >> >
>> >> > org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:44)
>> >> > ~[cassandra-all-2.1.2.jar:2.1.2]
>> >> >     at
>> >> >
>> >> >
>> >> > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346)
>> >> > [cassandra-all-2.1.2.jar:2.1.2]
>> >> >     at
>> >> >
>> >> >
>> >> > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:326)
>> >> > [cassandra-all-2.1.2.jar:2.1.2]
>> >> >     at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
>> >> >
>> >> > ----------------------------------------------------
>> >> >
>> >> > Here is what I have tried
>> >> >
>> >> >
>> >> > Hadoop Driver
>> >> >
>> >> > public class CassandraBulkImporter extends Configured implements
>> >> > Tool{
>> >> >
>> >> > .....
>> >> >
>> >> >              public static void main(String[] args) throws Exception
>> >> > {
>> >> >                          int exitCode = ToolRunner.run(new
>> >> > CassandraBulkImporter(), args);
>> >> >                          System.exit(exitCode);
>> >> >              }
>> >> >
>> >> >              @Override
>> >> >               public int run(String[] arg0) throws Exception {
>> >> >                        .......
>> >> >                        Job job = new Job(conf);
>> >> >                        ......
>> >> >
>> >> > job.setOutputFormatClass(CqlBulkOutputFormat.class);
>> >> >
>> >> >
>> >> > ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
>> >> > "192.168.56.11");
>> >> >
>> >> > ConfigHelper.setOutputPartitioner(job.getConfiguration(),
>> >> > "Murmur3Partitioner");
>> >> >
>> >> > ConfigHelper.setOutputRpcPort(job.getConfiguration(),
>> >> > "9160");
>> >> >
>> >> > ConfigHelper.setOutputKeyspace(job.getConfiguration(),
>> >> > CASSANDRA_KEYSPACE_NAME);
>> >> >                        ConfigHelper.setOutputColumnFamily(
>> >> >                                   job.getConfiguration(),
>> >> >                                  CASSANDRA_KEYSPACE_NAME,
>> >> >                                  CASSANDRA_TABLE_NAME
>> >> >                         );
>> >> >                      //Set the properties for CqlBulkOutputFormat
>> >> >                      MultipleOutputs.addNamedOutput(job,
>> >> > CASSANDRA_TABLE_NAME, CqlBulkOutputFormat.class, Object.class,
>> >> > List.class);
>> >> >
>> >> > CqlBulkOutputFormat.setColumnFamilySchema(job.getConfiguration(),
>> >> > CASSANDRA_TABLE_NAME, "CREATE TABLE dev.participant(........)");
>> >> >
>> >> >
>> >> >
>> >> > CqlBulkOutputFormat.setColumnFamilyInsertStatement(job.getConfiguration(),
>> >> > CASSANDRA_TABLE_NAME, "INSERT into dev.participant(........) values
>> >> > (?,?,?,?,?) ");
>> >> >
>> >> >                     .....
>> >> >             }
>> >> >
>> >> > }
>> >> >
>> >> > Reducer Code
>> >> >
>> >> > public class ReducerToCassandra extends Reducer<Text, Text, Object,
>> >> > List<ByteBuffer>> {
>> >> >
>> >> >   private MultipleOutputs multipleOutputs;
>> >> >
>> >> >   @SuppressWarnings("unchecked")
>> >> >    protected void setup(Context context) throws IOException,
>> >> > InterruptedException {
>> >> >         multipleOutputs = new MultipleOutputs(context);
>> >> >    }
>> >> >
>> >> >   @Override
>> >> >    public void reduce(Text id, Iterable<Text> pInfo, Context
context)
>> >> > throws
>> >> > IOException, InterruptedException {
>> >> >        ....
>> >> >        List<ByteBuffer> bVariables = new ArrayList<ByteBuffer>();
>> >> >
>> >> >       .....
>> >> >       multipleOutputs.write(CASSANDRA_TABLE1, null, bVariables);
>> >> >
>> >> > }
>> >> >
>> >> >
>> >> >
>> >>
>> >>
>> >>
>> >> --
>> >> Yuki Morishita
>> >>  t:yukim (http://twitter.com/yukim)
>> >
>> >
>>
>>
>>
>> --
>> Yuki Morishita
>>  t:yukim (http://twitter.com/yukim)
>
>



-- 
Yuki Morishita
 t:yukim (http://twitter.com/yukim)

Mime
View raw message