Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6B3A118CB0 for ; Wed, 13 Jan 2016 13:14:41 +0000 (UTC) Received: (qmail 84314 invoked by uid 500); 13 Jan 2016 13:14:41 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 84282 invoked by uid 500); 13 Jan 2016 13:14:41 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 84270 invoked by uid 99); 13 Jan 2016 13:14:40 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Jan 2016 13:14:40 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id CAF8A2C0453 for ; Wed, 13 Jan 2016 13:14:40 +0000 (UTC) Date: Wed, 13 Jan 2016 13:14:40 +0000 (UTC) From: "Benedict (JIRA)" To: commits@cassandra.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (CASSANDRA-10528) Proposal: Integrate RxJava MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/CASSANDRA-10528?page=3Dcom.atla= ssian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId= =3D15096160#comment-15096160 ]=20 Benedict commented on CASSANDRA-10528: -------------------------------------- Conceptually this approach appears to be identical to completable futures, = only with the imposition that everything is a stream. Since not everything= is a stream, it looks unnecessarily complex to enforce this model on every= thing we do, and the anonymous functions separated by a comma don=E2=80=99t= do wonders to mitigate this. The most problematic part of RxJava, to me, is the way flow control is impl= emented. It looks ripe to get ugly very quickly, with many context jumps, = so debugging / understanding what is happening is likely to be non-trivial = as the full functionality is transitioned. The act of sub/pub is already a= n extra step to mentally model from the status quo, but we not only sub/pub= , but sub/pub/ask, i.e. each item we want to get, we have to call a method = to say we are now ready for, and then our method we previously registered w= ill get called with the result. The number of methods all interacting to o= rchestrate this is ballooning, and it has to be _nested_. This mechanism of control flow also requires that we have meaningful numeri= cal units by which to split work, and it=E2=80=99s not clear that we do. T= he Row is the best we can probably use, but incurring all of these tiny (po= ssibly virtual) method invocations for every row limits the scope for perfo= rmance improvements, and it doesn=E2=80=99t prevent us being hurt by giant = rows - at some future date we need to be resilient to that problem too, and= buying into a model that may struggle to manage seems short-sighted to me. This hasn=E2=80=99t even accounted for some of the most difficult to model = behaviours in an async world, like Rapid Read Protection. Nor how it would= integrate with the Netty event loop (noting we need to remain responsive t= o network events). A POC really needs to demonstrate it can handle the har= dest parts of a transition. All told I=E2=80=99m personally not sold on the payoff from buying into thi= s framework. (side note: the modest performance gain seen above is the same as simply ja= cking concurrent_reads >=3D native_transport_max_threads) > Proposal: Integrate RxJava > -------------------------- > > Key: CASSANDRA-10528 > URL: https://issues.apache.org/jira/browse/CASSANDRA-1052= 8 > Project: Cassandra > Issue Type: Improvement > Reporter: T Jake Luciani > Assignee: T Jake Luciani > Fix For: 3.x > > Attachments: rxjava-stress.png > > > The purpose of this ticket is to discuss the merits of integrating the [R= xJava|https://github.com/ReactiveX/RxJava] framework into C*. Enabling us = to incrementally make the internals of C* async and move away from SEDA to = a more modern thread per core architecture.=20 > Related tickets: > * CASSANDRA-8520 > * CASSANDRA-8457 > * CASSANDRA-5239 > * CASSANDRA-7040 > * CASSANDRA-5863 > * CASSANDRA-6696 > * CASSANDRA-7392 > My *primary* goals in raising this issue are to provide a way of: > * *Incrementally* making the backend async > * Avoiding code complexity/readability issues > * Avoiding NIH where possible > * Building on an extendable library > My *non*-goals in raising this issue are: > =20 > * Rewrite the entire database in one big bang > * Write our own async api/framework > =20 > -------------------------------------------------------------------------= ------------ > I've attempted to integrate RxJava a while back and found it not ready ma= inly due to our lack of lambda support. Now with Java 8 I've found it very= enjoyable and have not hit any performance issues. A gentle introduction t= o RxJava is [here|http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/= ] as well as their [wiki|https://github.com/ReactiveX/RxJava/wiki/Additiona= l-Reading]. The primary concept of RX is the [Obervable|http://reactivex.i= o/documentation/observable.html] which is essentially a stream of stuff you= can subscribe to and act on, chain, etc. This is quite similar to [Java 8 = streams api|http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-= streams-2177646.html] (or I should say streams api is similar to it). The = difference is java 8 streams can't be used for asynchronous events while Rx= Java can. > Another improvement since I last tried integrating RxJava is the completi= on of CASSANDRA-8099 which provides is a very iterable/incremental approach= to our storage engine. *Iterators and Observables are well paired concept= ually so morphing our current Storage engine to be async is much simpler no= w.* > In an effort to show how one can incrementally change our backend I've do= ne a quick POC with RxJava and replaced our non-paging read requests to bec= ome non-blocking. > https://github.com/apache/cassandra/compare/trunk...tjake:rxjava-3.0 > As you can probably see the code is straight-forward and sometimes quite = nice! > *Old* > {code} > private static PartitionIterator fetchRows(List> commands, ConsistencyLevel consistencyLevel) > throws UnavailableException, ReadFailureException, ReadTimeoutExcepti= on > { > int cmdCount =3D commands.size(); > SinglePartitionReadLifecycle[] reads =3D new SinglePartitionReadL= ifecycle[cmdCount]; > for (int i =3D 0; i < cmdCount; i++) > reads[i] =3D new SinglePartitionReadLifecycle(commands.get(i)= , consistencyLevel); > for (int i =3D 0; i < cmdCount; i++) > reads[i].doInitialQueries(); > for (int i =3D 0; i < cmdCount; i++) > reads[i].maybeTryAdditionalReplicas(); > for (int i =3D 0; i < cmdCount; i++) > reads[i].awaitRes > ultsAndRetryOnDigestMismatch(); > for (int i =3D 0; i < cmdCount; i++) > if (!reads[i].isDone()) > reads[i].maybeAwaitFullDataRead(); > List results =3D new ArrayList<>(cmdCount); > for (int i =3D 0; i < cmdCount; i++) > { > assert reads[i].isDone(); > results.add(reads[i].getResult()); > } > return PartitionIterators.concat(results); > } > {code} > *New* > {code} > private static Observable fetchRows(List> commands, ConsistencyLevel consistencyLevel) > throws UnavailableException, ReadFailureException, ReadTimeoutExcepti= on > { > return Observable.from(commands) > .map(command -> new SinglePartitionReadLifecycle= (command, consistencyLevel)) > .flatMap(read -> read.getPartitionIterator()) > .toList() > .map(results -> PartitionIterators.concat(result= s)); > } > {code} > Since the read call is now non blocking (no more future.get()) we can rem= ove one thread pool hop from the native netty request pool which yields a n= on-trivial improvement to read performance. > !rxjava-stress.png|width=3D800px! > http://cstar.datastax.com/tests/id/ae648c12-729a-11e5-8625-0256e416528f > At the same time the current Iterator based api still works by calling {{= .toBlocking()}} on the observable. So for example the existing thrift read = call requires little modification > On the async side we get the added benefits of RxJava: > * Customizable backpressure strategies (for dealing with streams that c= an't be processed quickly enough) > * Cancelling of work due to timeouts is a 1 line change > * When a Subscriber disconnects from the stream they Observable stops a= s well > * Batching/windowing of work can be added in one line > * Observers and Subscribers can do work across any thread at any stage = of the pipeline > * Observables can be [debugged|https://github.com/ReactiveX/RxJavaDebug= ] and [tested|http://reactivex.io/RxJava/javadoc/rx/observers/TestSubscribe= r.html] > Another plus is the community surrounding RxJava specifically our good fr= iends at netflix have authored and used it extensively. Docs and examples a= re good. > In order to get the most out of this we will need to take this api furthe= r into the code. MessagingService, Disk Access/Page, Cache, Thread per core= ... but again I want to hammer home this will be able to be achieved increm= entally.=20 > On the bad side this is: > * Locking into a "framework" =20 > * Will inevitably hit bugs / performance issues we need fixed upstream > * Some of the more advanced API uses look pretty mentally taxing/hard t= o grasp > Which brings us to the Alternatives, primarily being to just use Completa= bleFutures. > We certainly could but if you look at the code changes I had to make to m= ake the SP calls asynchronous I think you will realize you would need to pa= ss > all kinds of state around to get the read command callback to start the n= etty write. Vs observables which make that pipeline declarative. Also more= advanced things like backpressure and message passing between N:M producer= s and consumers becomes complex. This isn't to say we can't [use both|http= ://www.nurkiewicz.com/2014/11/converting-between-completablefuture.html] if= Observables are overkill. > I hope this ticket sparks some good discussion! > =20 -- This message was sent by Atlassian JIRA (v6.3.4#6332)