cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jason Brown (JIRA)" <>
Subject [jira] [Commented] (CASSANDRA-13630) support large internode messages with netty
Date Thu, 10 Aug 2017 15:07:01 GMT


Jason Brown commented on CASSANDRA-13630:

The core idea here is that if the outgoing message is large/huge, we don't want to naively
allocate a huge buffer just for serialization. For example, if it's a large mutation (say
16MB), we don't want to allocate 16MB * n number of replica buffers on the coordinator. A
safer approach is to allocate standard sized buffers (currently 64k), serialize into them
via {{DataOutputPlus}} interface, write each buffer to the netty channel when the buffer is
full, and allocate another buffer for further serialization.

The outbound side which splits up serialization into multiple buffers is implemented in {{MessageOutHandler.ByteBufDataOutputStreamPlus}}.
At the same time, I've made it so that all messages are written into a shared buffer (via
{{MessageOutHandler.ByteBufDataOutputStreamPlus}}), whether it's a large message being chunked
across multiple buffers, or multiple small messages being aggregated into one buffer (think
mutations ACKs). This upside here is that we don't need to go to the netty allocator for each
individual small message, and thus just send the single, 'aggregation' buffer downstream in
the channel when we need to flush.

As I implemented this behavior, I discovered that the 'aggregating buffer' could be a problem
wrt {{MessageOutHandler#channelWritabilityChanged}} as that method, when it gets the signal
the channel is writable, attempts to drain any backlog from {{OutboundMessagingConnection}}
(via the {{MessageOutHandler#backlogSupplier}}). If i had retained the current code it is
quite likely that I would start to serialize a backlogged message while in the middle of a
message already being serialized (from {{MessageOutHandler#write}}), which happened to fill
the buffer and write it to the channel.

Further, I noticed I needed to forward-port more of CASSANDRA-13265 in order to handle expiring
messages from the backlog. (FTR, {{MessageOutHandler#userEventTriggered}} handles closing
the channel when we make no progress, but there's no other purging or removing items from
the backlog queue. Closing the channel will fail any messages in the channel, but not from
the backlog). Thus, I added the backlog-expiring behavior to {{OutboundMessagingConnection#expireMessages}},
and now drain messages from the backlog in {{MessageOutHandler#write}}. By trying to send
the backlogged messages before the incoming message on the channel, it gives us a better shot
at ordering the sending of the messages wrt the order in which they came into the {{OutboundMessagingConnection}}.

I updated jctools to 2.0.2. Instead of using a {{LinkedBlockingQueue}} in {{OutboundMessagingConnection}}
for the backlog, I decided to use something without locks from jctools. Even though the queue
still needs to be an unbounded multi-producer/multi-consumer (at least, to replicate existing
behaviors), the jctools queue should be a bit more efficient than an LBQ.

Fixing the outbound size is only half of the problem, as we don't want to naively allocate
a huge buffer on the receiving node, either. This is a bit trickier due to the blocking IO
style of our deserializers. Thus, similar to what I've done in CASSANDRA-12229, I need to
add incoming {{ByteBuf}}s to a {{RebufferingByteBufDataInputPlus}} and spin up a background
thread for performing the deserialization. Since we only need to spin up the the thread when
we have large message payloads, this will only happen in a minority of use cases:

- we are actually transmitting a message larger than {{OutboundMessagingPool#LARGE_MESSAGE_THRESHOLD}},
which defaults to 64k. At that point we're sending all of those over the outbound large message
queue anyway, so all messages on that channel/socket will be over the threshold and require
the background deserialization. So this won't apply to the small messages channel, where we
can still handle all those messages in-line on the inbound netty event loop.
- If you are operating a huge sized cluster (I'm guessing at least 500 nodes in size, haven't
done the math, tbh), large gossip messages might trigger the receiving gossip channel to switch
to the background deserialization mode, especially ACK/ACK2 messages after a bounce as they
will contain all the {{ApplicationState}}s for all the peers in the cluster. I do not think
this will be a problem in practice.

I want to add more comments/documentation before committing, but that should not hold up a
review. Also, this code is based on the current CASSANDRA-12229. Currently failing tests for
this branch seem to be race conditions only in the streaming code, so I'll fix on the CASSANDRA-12229

> support large internode messages with netty
> -------------------------------------------
>                 Key: CASSANDRA-13630
>                 URL:
>             Project: Cassandra
>          Issue Type: Task
>          Components: Streaming and Messaging
>            Reporter: Jason Brown
>            Assignee: Jason Brown
>             Fix For: 4.0
> As part of CASSANDRA-8457, we decided to punt on large mesages to reduce the scope of
that ticket. However, we still need that functionality to ship a correctly operating internode
messaging subsystem.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message