cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "David Karnok (JIRA)" <>
Subject [jira] [Commented] (CASSANDRA-10528) Proposal: Integrate RxJava
Date Mon, 11 Apr 2016 19:00:27 GMT


David Karnok commented on CASSANDRA-10528:

I'm glad you want to switch to the reactive world, however, depending on the urgency, I'd
design for a Reactive-Streams based API. (Reactive-Streams is an initiative to have a standard
set of interfaces and a protocol for reactive dataflows on the JVM that allows interoperation
between compatible libraries).

The RxJava 1.x library is mature indeed but has reached its performance potential limit due
to its architecture. Version 2, which is fully Reactive-Streams, has generally better performance.
Unfortunately, Netflix takes its sweet time with it and since Ben left the project, there
is no one to push it forward. It could take a year to have a stable API.

Alternatively, the Project Reactor (also fully Reactive-Streams) seems to be closest to a
stable release with its version 2.5. It is kind of an RxJava lite but also has some non-overlapping
features with RxJava. It is the most advanced and most performant RS library currently available.
The unfortunate thing is that its API is still in flux (pun intended) as bad old habits get
ironed out, therefore, expect its snapshot to change significantly from time to time. Version
2.5 should be ready within 6 months I presume.

Honorable mention is the Akka-Streams, which has a framework attached to it: Akka. That means
you would be at the mercy of the actor system most of the time, not to mention, its architecture
has a ton of mandatory async boundaries that lower performance considerably.

I wouldn't recommend writing your own RS library. Writing correct, backpressure-enabled operators
is 1-2 orders of magnitude more complicated than the complication of the steep learning curve
of RxJava.

In conclusion, if you can wait a few months before work is started on this, you can use Reactor
for the internal implementation and expose the functionalities as standard RS interface(s).

If you have questions about reactive topics (I don't know or use Cassandra btw), let me know.

> Proposal: Integrate RxJava
> --------------------------
>                 Key: CASSANDRA-10528
>                 URL:
>             Project: Cassandra
>          Issue Type: Improvement
>            Reporter: T Jake Luciani
>            Assignee: T Jake Luciani
>             Fix For: 3.x
>         Attachments: rxjava-stress.png
> The purpose of this ticket is to discuss the merits of integrating the [RxJava|]
framework into C*.  Enabling us to incrementally make the internals of C* async and move away
from SEDA to a more modern thread per core architecture. 
> Related tickets:
>    * CASSANDRA-8520
>    * CASSANDRA-8457
>    * CASSANDRA-5239
>    * CASSANDRA-7040
>    * CASSANDRA-5863
>    * CASSANDRA-6696
>    * CASSANDRA-7392
> My *primary* goals in raising this issue are to provide a way of:
>     *  *Incrementally* making the backend async
>     *  Avoiding code complexity/readability issues
>     *  Avoiding NIH where possible
>     *  Building on an extendable library
> My *non*-goals in raising this issue are:
>    * Rewrite the entire database in one big bang
>    * Write our own async api/framework
> -------------------------------------------------------------------------------------
> I've attempted to integrate RxJava a while back and found it not ready mainly due to
our lack of lambda support.  Now with Java 8 I've found it very enjoyable and have not hit
any performance issues. A gentle introduction to RxJava is [here|]
as well as their [wiki|].  The
primary concept of RX is the [Obervable|]
which is essentially a stream of stuff you can subscribe to and act on, chain, etc. This is
quite similar to [Java 8 streams api|]
(or I should say streams api is similar to it).  The difference is java 8 streams can't be
used for asynchronous events while RxJava can.
> Another improvement since I last tried integrating RxJava is the completion of CASSANDRA-8099
which provides is a very iterable/incremental approach to our storage engine.  *Iterators
and Observables are well paired conceptually so morphing our current Storage engine to be
async is much simpler now.*
> In an effort to show how one can incrementally change our backend I've done a quick POC
with RxJava and replaced our non-paging read requests to become non-blocking.
> As you can probably see the code is straight-forward and sometimes quite nice!
> *Old*
> {code}
> private static PartitionIterator fetchRows(List<SinglePartitionReadCommand<?>>
commands, ConsistencyLevel consistencyLevel)
>     throws UnavailableException, ReadFailureException, ReadTimeoutException
>     {
>         int cmdCount = commands.size();
>         SinglePartitionReadLifecycle[] reads = new SinglePartitionReadLifecycle[cmdCount];
>         for (int i = 0; i < cmdCount; i++)
>             reads[i] = new SinglePartitionReadLifecycle(commands.get(i), consistencyLevel);
>         for (int i = 0; i < cmdCount; i++)
>             reads[i].doInitialQueries();
>         for (int i = 0; i < cmdCount; i++)
>             reads[i].maybeTryAdditionalReplicas();
>         for (int i = 0; i < cmdCount; i++)
>             reads[i].awaitRes
> ultsAndRetryOnDigestMismatch();
>         for (int i = 0; i < cmdCount; i++)
>             if (!reads[i].isDone())
>                 reads[i].maybeAwaitFullDataRead();
>         List<PartitionIterator> results = new ArrayList<>(cmdCount);
>         for (int i = 0; i < cmdCount; i++)
>         {
>             assert reads[i].isDone();
>             results.add(reads[i].getResult());
>         }
>         return PartitionIterators.concat(results);
>     }
> {code}
>  *New*
> {code}
> private static Observable<PartitionIterator> fetchRows(List<SinglePartitionReadCommand<?>>
commands, ConsistencyLevel consistencyLevel)
>     throws UnavailableException, ReadFailureException, ReadTimeoutException
>     {
>         return Observable.from(commands)
>                          .map(command -> new SinglePartitionReadLifecycle(command,
>                          .flatMap(read -> read.getPartitionIterator())
>                          .toList()
>                          .map(results -> PartitionIterators.concat(results));
>     }
> {code}
> Since the read call is now non blocking (no more future.get()) we can remove one thread
pool hop from the native netty request pool which yields a non-trivial improvement to read
> !rxjava-stress.png|width=800px!
> At the same time the current Iterator based api still works by calling {{.toBlocking()}}
on the observable. So for example the existing thrift read call requires little modification
> On the async side we get the added benefits of RxJava:
>   * Customizable backpressure strategies (for dealing with streams that can't be processed
quickly enough)
>   * Cancelling of work due to timeouts is a 1 line change
>   * When a Subscriber disconnects from the stream they Observable stops as well
>   * Batching/windowing of work can be added in one line
>   * Observers and Subscribers can do work across any thread at any stage of the pipeline
>   * Observables can be [debugged|] and [tested|]
> Another plus is the community surrounding RxJava specifically our good friends at netflix
have authored and used it extensively. Docs and examples are good.
> In order to get the most out of this we will need to take this api further into the code.
MessagingService, Disk Access/Page, Cache, Thread per core... but again I want to hammer home
this will be able to be achieved incrementally. 
> On the bad side this is:
>   *  Locking into a "framework"  
>   *  Will inevitably hit bugs / performance issues we need fixed upstream
>   * Some of the more advanced API uses look pretty mentally taxing/hard to grasp
> Which brings us to the Alternatives, primarily being to just use CompletableFutures.
> We certainly could but if you look at the code changes I had to make to make the SP calls
asynchronous I think you will realize you would need to pass
> all kinds of state around to get the read command callback to start the netty write.
 Vs observables which make that pipeline declarative. Also more advanced things like backpressure
and message passing between N:M producers and consumers becomes complex.  This isn't to say
we can't [use both|]
if Observables are overkill.
> I hope this ticket sparks some good discussion!

This message was sent by Atlassian JIRA

View raw message