cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <>
Subject [Cassandra Wiki] Update of "Streaming2" by yukim
Date Thu, 29 Aug 2013 18:48:11 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Cassandra Wiki" for change notification.

The "Streaming2" page has been changed by yukim:

initial commit

New page:
(This page is still under construction)

= Streaming 2.0 =

Streaming protocol is re-designed from ground up in Apache Cassandra 2.0.
Here is the overview of the protocol design and implementation.

== Design goal ==

 * Better control
    * One API for all (bootstrap, move, bulkload, repair...)
    * Sending/receiving data in the same session
 * Better performance
    * Pipelined stream
    * Persistent connection per host
 * Better reporting
    * Better logging/tracing
    * Event notification
    * More metrics

== Highlight ==

=== Stream Plan ===

Unlike the previous version, which performs sending and receiving data separately from each
other and from the operation, Streaming 2.0 groups the related stream sessions under the same
"Stream Plan".

Stream Plan for repair, bootstrap, bulkload, etc.
 |- Stream session with Endpoint 1
      |- Stream receiving tasks
      |- Stream transfer tasks
 |- Stream session with Endpoint 2

=== File transfer and messages ===

Streaming message and file exchange are pipelined on the same, persistent tcp connection.

=== Stream event support ===

Finer grained event notification. With JMX notification support, even external client can
listen on event.

== API ==

=== Public APIs ===

 - `StreamPlan`
    * Builder for building streaming plan(what to transfer, what to request). Internally builds
`StreamSession`s to interact with the other nodes and associates them with `StreamResultFuture`
which asynchronously returns final `StreamState`.

 - `StreamResultFuture`
    * Represents future result of `StreamPlan` execution. You can attach `StreamEventHandler`
to track the progress of streaming plan.

 - `StreamState`
    * State of streaming execution. You can get snapshot of in-progress streaming from `StreamResultFuture#getCurrentState`
or final state as the return value of `StreamResultFuture#get`.

 - `StreamManager`
    * Manages all streaming progress
    * Provides various metrics through JMX including notification

 - `StreamEventHandler`
    * Listens on various stream events.

Basic API usage is as follows:

// Start building your streaming plan
StreamPlan bulkloadPlan = new StreamPlan("Bulkload");
// Add transfer files tasks for each destination
for (InetAddress remote : remoteTargets)
    bulkloadPlan.transferFiles(remote, ranges, sstables);
// Execute your plan
StreamResultFuture result = bulkloadPlan.execute();
    // ... and wait for streaming completes
    // all streaming success!
catch (Exception e)
    // some stream failed

Alternatively, `StreamResultFuture` implements guava's [[|ListenableFuture<StreamState>]],
So you can use [[|FutureCallback<StreamState>]]
to capture stream success and failure.

Futures.addCallback(result, new FutureCallback<StreamState>()
    public void onSuccess(StreamState result)
        // Yes, we did it!

    public void onFailure(Throwable t)
        // O_o something goes wrong

You can add event listener to `StreamResultFuture` for stream events:

StreamResultFuture result = bulkloadPlan.execute();
result.addEventListener(new StreamEventHandler()
    public void handleEvent(StreamEvent event)
        // streaming completed

=== Internal APIs ===

 - `StreamSession`
    * Group of stream tasks (INs and/or OUTs) per *destination*

 - `StreamTask`
    * Represents each IN/OUT stream task
    * Each task MUST belong to one Stream session
    * `StreamReceiveTask`
        - execute method sends stream request to destination, wait for reply,
    * `StreamTransferTask`

 - `ConnectionHandler`
    * Receives/sends streaming messages.

== Stream session ==

Stream session handles the streaming part of one of more SSTables to and from a specific remote
Both this node and the remote one will create a similar symmetrical `StreamSession`. A streaming
session has the following life-cycle:

1. Connections Initialization

   (a) A node (the initiator in the following) creates a new `StreamSession`, initialize and
then start. Starting will create `ConnectionHandler` that creates two connections to the remote
node (the follower in the following) with whom to stream and send `StreamInit` message. The
first connection will be the incoming connection and the second connection will be the outgoing
for the initiator.

   (b) Upon reception of `StreamInit` message, the follower creates its own `StreamSession`,
initialize it if it still does not exist, and attaches connecting socket to its `ConnectionHandler`.

   (c) When the both incoming and outgoing connections are established, `StreamSession` starts
the streaming prepare phase.

2. Streaming preparation phase

   (a) Sends a `Prepare` message that includes what files/sections this node will stream to
the follower and what the follower needs to stream back. If the initiator has nothing to receive
from the follower, it goes directly to streaming phase. Otherwise, it waits for the follower's
`Prepare` message.

   (b) Upon reception of the `Prepare` message, the follower records which files/sections
it will receive and send back its own `Prepare` message with a summary of the files/sections
that will be sent to the initiator. After having sent that message, the follower goes to streaming

   (c) When the initiator receives the follower's `Prepare` message, it records which files/sections
it will receive and then goes to streaming phase.

3. Streaming phase

   (a) Sequentially sends a `File` message. Each `File` message consists of a `File` message
header that indicates which file is coming and then start streaming the content for that file.
When all files are sent, the task is marked as complete.

   (b) On the receiving side, an SSTable will be written for the incoming file and once the
`File` message is fully received, the file will be marked as completed and sends back `Received`
message. Once all files are received, those are added to the `ColumnFamilyStore` and secondary
indexes are built, and the task is marked as completed.

   (c) If an I/O error occurs during the streaming, the node will send `Retry` message of
the file(up to `max_streaming_retries`, default 3). On receiving `Retry` message, the sender
simply queue back new `File` message for that file.

   (d) When all transfer and receive tasks for the session are complete, move to the Completion

4. Completion phase

   (a) When the node has finished all transfer and receive task, it sends `Complete` message.
Stream session is considered complete when the node sends `Complete` message and also receives
`Complete` message from the other side.

== Events ==

`StreamResultFuture` emits `StreamEvent` at the following cases:

  * Stream session prepared(`SESSION_PREPARED`)

    Fired when stream session complete prepare receiving/sending files to tell event handler
about number of files and total bytes receiving/sending.

  * Stream session complete(`SESSION_COMPLETE`)

    Fired when session complete.

  * Stream progress(`FILE_PROGRESS`)

    Fired when receiving/sending file progress.

To listen to `StreamEvent`, implement `StreamEventHandler` and register handler to `StreamResultFuture`.
== JMX support ==

JMX support is provided through `StreamingManager` MBean. You can get list of streaming states
of all currently running stream plans. It also provides JMX Notification support so that you
can subscribe to stream events above through JMX interface.

View raw message