cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jason Brown (JIRA)" <>
Subject [jira] [Commented] (CASSANDRA-12229) Move streaming to non-blocking IO and netty (streaming 2.1)
Date Mon, 22 May 2017 13:10:05 GMT


Jason Brown commented on CASSANDRA-12229:

Given [~aweisberg]'s comments on PR2, I've gone back and clarified/simplified the use of nio
channels and input/output streams. To that end I'm also reusing the ByteBuffer-based Data*Plus
classes, especially {{RebufferingInputStream}}. Making these changes not only clarified this
set of changes, but simplified the existing streaming code's habit of wrapping a channels
to input streams, and vice versa. Further, by switching to use the {{DataInput}} model rather
than the {{InputStream}}, I'm able to read primitives more efficiently at the {{StreamReader.StreamDeserializer}}.
These changes also necessitated switching StreamMessage.Serializer.deserialize to take a {{DataInputPlus}}
rather than a {{ReadableByteChannel}}, but that's small beans.

The net results of this optimization is that a first, naive perf run shows the compressed
sstable streaming is about 10% faster than trunk, and streamed compressed sstables are about
on-par with trunk. Clearly, this is better than the last commit.

All these changes were pushed as a single commit to the same branch. That updated the existing
PR, which I'm not sure I really wanted to have happen, but all the comments have been preserved.

The basic use of streams looks like this now:

- {{NettyStreamingMessageSender.FileStreamTask}} creates {{ByteBufDataOutputStreamPlus}},
and passes that to {{StreamMessage#serialize}}. {{ByteBufDataOutputStreamPlus}} extends {{BufferedDataOutputStreamPlus}}
and writes to the netty channel on flush. It still has the {{#waitUntilWritable()}} checks
for testing when it's safe to write to the channel, and I still need to reevaluate that design
based on Ariel's concerns.
- {{StreamWriter}} will wrap the {{ByteBufDataOutputStreamPlus}} with {{ByteBufCompressionDataOutputStreamPlus}},
who is responsible for (obviously) compressing on the-the-fly any outbound file data.
- {{CompressedStreamWriter}} doesn't wrap, and just writes directly to {{ByteBufDataOutputStreamPlus}}

- {{StreamingInboundHandler}} moves all incoming {{ByteBuf}} s into {{RebufferingByteBufDataInputPlus}}.
{{RebufferingByteBufDataInputPlus}} extends {{RebufferingInputStream}}, and handles adjusting
the netty channel's auto read like I had in previous commits. It also implements {{ReadableByteChannel}}
to enable writing (copying) to a direct {{ByteBuffer}} rather than always copying to on on-heap
byte array.
- {{StreamReader}} will wrap the {{RebufferingByteBufDataInputPlus}} with {{StreamCompressionInputStream}},
which decompresses incoming data. {{StreamCompressionInputStream}} is wrapped with {{TrackedDataInputPlus}}
rather than {{TrackedInputStream}} to optimze reading primitive from the stream.
- {{CompressedStreamReader}} still uses {{CompressedInputStream}}, but that class now extends
{{RebufferingInputStream}}, as well, to a) efficiently read primitives vs the existing {{InputStream}}-derived
implemention, and b) use {{ByteBuffer}} for direct buffers rather than using on-heap byte

There's a few minor wrinkles to iron out, but that shouldn't hold up review. For example {{CompressedStreamWriter#write}}
casts the {{DataInputPlus}} to {{ByteBufDataOutputStreamPlus}} to make sure we can write a
{{ByteBuf}} to it (as {{ByteBuffer}} will cause a copy to the backing buffer to occur, which,
I'd like to avoid. There's also some comments from PR2 that I haven't addressed yet, such
as checksums and timing problem with waitUntilWritable(), but I wanted to get the streams
situation under better control.

> Move streaming to non-blocking IO and netty (streaming 2.1)
> -----------------------------------------------------------
>                 Key: CASSANDRA-12229
>                 URL:
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Streaming and Messaging
>            Reporter: Jason Brown
>            Assignee: Jason Brown
>             Fix For: 4.0
> As followup work to CASSANDRA-8457, we need to move streaming to use netty.
> Streaming 2.0 (CASSANDRA-5286) brought many good improvements to how files are transferred
between nodes in a cluster. However, the low-level details of the current streaming implementation
does not line up nicely with a non-blocking model, so I think this is a good time to review
some of those details and add in additional goodness. The current implementation assumes a
sequential or "single threaded" approach to the sending of stream messages as well as the
transfer of files. In short, after several iterative prototypes, I propose the following:
> 1) use a single bi-diredtional connection (instead of requiring to two sockets &
two threads)
> 2) send the "non-file" {{StreamMessage}} s (basically anything not {{OutboundFileMessage}})
via the normal internode messaging. This will require a slight bit more management of the
session (the ability to look up a {{StreamSession}} from a static function on {{StreamManager}},
but we have have most of the pieces we need for this already.
> 3) switch to a non-blocking IO model (facilitated via netty)
> 4) Allow files to be streamed in parallel (CASSANDRA-4663) - this should just be a thing
> 5) If the entire sstable is to streamed, in addition to the DATA component, transfer
all the components of the sstable (primary index, bloom filter, stats, and so on). This way
we can avoid the CPU and GC pressure from deserializing the stream into objects. File streaming
then amounts to a block-level transfer.
> Note: The progress/results of CASSANDRA-11303 will need to be reflected here, as well.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message