Return-Path: X-Original-To: apmail-storm-user-archive@minotaur.apache.org Delivered-To: apmail-storm-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 59E7510077 for ; Thu, 6 Feb 2014 08:25:52 +0000 (UTC) Received: (qmail 79866 invoked by uid 500); 6 Feb 2014 08:25:52 -0000 Delivered-To: apmail-storm-user-archive@storm.apache.org Received: (qmail 79464 invoked by uid 500); 6 Feb 2014 08:25:43 -0000 Mailing-List: contact user-help@storm.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@storm.incubator.apache.org Delivered-To: mailing list user@storm.incubator.apache.org Received: (qmail 79455 invoked by uid 99); 6 Feb 2014 08:25:42 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Feb 2014 08:25:42 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of svend.vanderveken@gmail.com designates 209.85.216.48 as permitted sender) Received: from [209.85.216.48] (HELO mail-qa0-f48.google.com) (209.85.216.48) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Feb 2014 08:25:34 +0000 Received: by mail-qa0-f48.google.com with SMTP id f11so2253833qae.21 for ; Thu, 06 Feb 2014 00:25:13 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=p+lYKCrIGdTlBfdTvPRXSK4ITeNSaYJldr/7mvTE8Gs=; b=mP5y6cTst9PhjssERetvDAAb1RSLk56LT6SBoCOWZsRD8wIfmtGukZRQ6i3f2WGx4X 2+pDVANkF/QRDopSx6i5+02yXSGgfcLK4WnI8yTDkJ+AuBTx9D1e47UbrxggeugMaHfd 9EF4pPknd2vd5GPX7QJvBPMICe0olkBVtyaZ3HGWsCxcKyce5mosNnqeLmy+gHmsykoV V/uDGrG8OzfV9/gJlbwFIM1LgyH+mWLUQIEBR6bDtPEEjQLV/Y/f9QkuGz5aev7spvgt bFFXLZGCNeWcKJKeZLrsQmTMnmOYmED2xRGs/j48Sl4Ozog7TpKLPd5oZtwtZS6eTEsB Iexg== MIME-Version: 1.0 X-Received: by 10.224.51.74 with SMTP id c10mr10185297qag.33.1391675113344; Thu, 06 Feb 2014 00:25:13 -0800 (PST) Received: by 10.140.29.227 with HTTP; Thu, 6 Feb 2014 00:25:13 -0800 (PST) In-Reply-To: <14c28b9796fc451f81f0d9f59f8e5d6f@CO2PR07MB522.namprd07.prod.outlook.com> References: <9dedec1d73274f4b8ea99febad0ceb16@CO2PR07MB522.namprd07.prod.outlook.com> <14c28b9796fc451f81f0d9f59f8e5d6f@CO2PR07MB522.namprd07.prod.outlook.com> Date: Thu, 6 Feb 2014 09:25:13 +0100 Message-ID: Subject: Re: Svend's blog - several questions From: Svend Vanderveken To: user@storm.incubator.apache.org Content-Type: multipart/alternative; boundary=089e0158cb54c615c204f1b89c63 X-Virus-Checked: Checked by ClamAV on apache.org --089e0158cb54c615c204f1b89c63 Content-Type: text/plain; charset=ISO-8859-1 The logic of a map state is to keep a "state" somewhere, you can think of a Storm state as a big Map of key values, the keys come from the groupBy and the values are the result of the aggregations. Conceptually, when your topology is talking to a State, you can imagine it's actually talking to a big HashMap (only there's a DB behind for persistence + opaque logic for error handling). Most of the time, I try not to have any other part of my product that actually depends on the location or structure the data is stored in DB, so I do not really need to be super specific about the storage stucture: that is up to the IBackingMap implementation I am delegating to. Read or write access to the DB is done via the Storm primitive, not by accessing the DB directly. Don't forget there's also the stateQuery primitive you can use to read you stored state from another place. There are ways to configure column families and column names, have a look at the super clear storm-cassandra doc to see how to do that with this implementation: https://github.com/hmsonline/storm-cassandra My blog post of last year is indeed illustrating a full implementation including an in-house IBackingMap implementation, I think that approach is sometimes needed when we want fine grained control over things. I should have made more clear that this is not necessarily the default approach to have. I hope this makes sense now. S On Wed, Feb 5, 2014 at 11:15 PM, Adrian Mocanu wrote: > Thank you Svend and Adam. > > > > Svend I'm your reader and that tutorial is very useful. I've been spending > lots of time looking at the code and that blog post. > > > > BTW I initially thought you were adding the nulls incorrectly in Q3 below, > but now I see you're doing it correctly. > > > > I have a follow up question: > > Why do you say that "we do not implement multiget/multiput, we just take > an existing implementation for Cassandra or Memcached or anything and they > do what's right for that backend." > > I thought that I had to rewrite an IBackingMap implementation to > correspond to the tuples and schema I have in my database. I use Cassandra. > > I started with com.hmsonline.storm.cassandra.trident.CassandraState or > trident.cassandra.CassandraState (they both implement IBackingMap) and I > replaced multiGet and multiPut to match my db schema. (well, I'm trying to > do it) > > > > You are saying I can use CassandraState as it is? :D > > If so how would it even know what table my data should go into? It allows > you to set the column family and a few other things where state will be > saved (keyspace, column family, replication, rowKey). By state I think it > means sth like txID (transaction ID). Do you by any chance know what this > state that CassandraState is saving is? > > So as you can tell I have no idea how to use CassandraState. > > > > Thanks again! > > -A > > *From:* Svend Vanderveken [mailto:svend.vanderveken@gmail.com] > *Sent:* February-05-14 2:56 PM > > *To:* user@storm.incubator.apache.org > *Subject:* Re: Svend's blog - several questions > > > > > > > > On Wed, Feb 5, 2014 at 6:22 PM, Adrian Mocanu > wrote: > > I've read Svend's blog [ > http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/] > multiple times and I have a few questions. > > > > > > So you are my reader! Great :D > > (you can post your questions on the blog itself, I'm more likely to spot > it there) > > > > > > > > "Because we did a groupBy on one tuple field, each List contains here one > single > > String: the correlationId. Note that the list we return must have exactly > the same > > size as the list of keys, so that Storm knows what period corresponds to > what key. > > So for any key that does not exist in DB, we simply put a null in the > resulting list." > > > > Q1: Do the db keys come only from groupBy? > > > > Yes, the key values arriving in the multiget are the field value by which > we are grouping > > do groupBy (new Fields("color")) and you get things like "blue"; "green", > "flowerly romantic red"... > > > > > > > > > > Q2: Can you do groupBy multiple keys:like .groupBy("name").groupBy("id") > ? > > > > yes, the syntax is like this: > > > > groupBy (new Fields("name", "id")) > > > > That's the reason the keys in the multiget are List and not simply > Object. We receive them in the order they specified in the topology > definition > > > > > > Q3: When we add null we keep the size of the results list the same as > they keys list but I don't understand how we make sure that key(3) points > to correct result(3). > > After all we're adding nulls at the end of result list not intermitently. > ie: if > > key(1) does not have an entry in db, and key size is 5, we add null to > last position > > in results not to results(1). This doesn't preserve consistency/order so > key(1) now > > gives result(1) which is not null as it should be. Is the code incorrect > ... or the > > explanation on Svend's blog is incorrect? > > > > > > The order should indeed be respected, so if the strategy to handling error > DB error in a multi-get is to put nulls, that they should indeed be at > index corresponding to the problematic key. Is there part of my toy project > code who is padding nulls at the end? If so that's indeed a bug, please let > me know where (or better, fork and send me a pull request) > > > > Note that I'm not particularly recommending to put nulls in case of > unrecoverable errors in a multi-get, that's actually a simplistic way of > handling the error. The contract with storm is either to fail either to > return a list of the correct size in the correct order. The data itself and > its semantic is up to the topology implementation, i.e. up to us. > > > > > > > > > > Moving on, > > "Once this is loaded Storm will present the tuples having the same > correlation ID > > one by one to our reducer, the PeriodBuilder" > > > > Q4: Does Trident/Storm call the reducer after calling multiGet and before > calling multiPut? > > > > yes > > > > Q5: What params (and their types) are passed to the reducer and what > parameters should it emit so they can go into multiGet? > > > > the reducer is called iteratively, it starts with the state found from DB > (returned by the multiget) and the first grouped tuple, then the second, > then the third... until the last tuple. The return value of the last call > of the reducer is what is provided to the multiput, for the same key as the > multiget. > > > > "reduce" is actually a very common pattern in functional programming, > which us java programming are sometimes less aware of. Look up some general > doc on "reduce", the storm approach to it is very traditional, i.e. Storm > has defined the "reduce" primitive exactly the way many other tools are > defining that primitive > > > > > > Q6: The first time the program is run the database is empty and multiGet > will return nothing. > > Does the reducer need to take care and make sure to insert for the first > time as opposed to update value? I do see that reducer (TimelineUpdater) > checks for nulls and I'm guessing this is the reason why it does so. > > > > > > > > Exactly. > > > > That's also why returning null in case of error in the multiget is > questionable and probably not what you would systematically do: it is > equivalent to saying: there's garbage in persistence for that key, so let's > just consider there's nothing. The actually proper thing to do depends on > the task at hand, but actually, such error in multiget is ofter a symptom > that we stored garbage in persistence in the past due to some other, it's > too late to correct it now. > > > > Last thing: most of the time we do not implement multiget/multiput, we > just take an existing implementation for Cassandra or Memcached or anything > and they do what's right for that backend. > > > > > > Q7: > > Can someone explain what these mean: > > .each (I've seen this used even consecutively: .each(..).each(..) ) > > .newStream > > .newValuesStream > > .persistAggregate > > > > > > I think they are all detailed here: > https://github.com/nathanmarz/storm/wiki/Trident-API-Overview > > > > > > I am unable to find javadocs with documentation for the method signatures. > > These java docs don't help much: > http://nathanmarz.github.io/storm/doc/storm/trident/Stream.html > > > > > > Q8: > > Storm has ack/fail; does Trident handle that automatically? > > > > > > Yes, although you can also explicitly trigger error. Look up my next blog: > error handling in Storm Trident. > > > > > > > > > > Q9: Has anyone tried Spark? http://spark.incubator.apache.org/streaming/ > > I'm wondering if anyone has tried it because I'm thinking of ditching > storm and moving to that. > > It seems much much much better documented. > > > > > > Spark looks cool I've not played with it yet, no. Go ahead, keep us > posted what you find out! > > > > > > > > Lots of questions I know. Thanks for reading! > > > > > > and you :D > > > > > > -Adrian > > > > > > > > Svend > --089e0158cb54c615c204f1b89c63 Content-Type: text/html; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable

The logic of a map state is to keep a "state= " somewhere, you can think of a Storm state as a big Map of key values= , the keys come from the groupBy and the values are the result of the aggre= gations. Conceptually, when your topology is talking to a State, you can im= agine it's actually talking to a big HashMap (only there's a DB beh= ind for persistence + opaque logic for error handling). 

Most of the time, I try not to have any other part of m= y product that actually depends on the location or structure the data is st= ored in DB, so I do not really need to be super specific about the storage = stucture: that is up to the IBackingMap implementation I am delegating to. = Read or write access to the DB is done via the Storm primitive, not by acce= ssing the DB directly. Don't forget there's also the stateQuery pri= mitive you can use to read you stored state from another place. 

There are ways to configure column families and column = names, have a look at the super clear storm-cassandra doc to see how to do = that with this implementation: https://github.com/hmsonline/storm-cassandra

My blog post of last year is indeed illustrating a full= implementation including an in-house IBackingMap  implementation, I t= hink that approach is sometimes needed when we want fine grained control ov= er things. I should have made more clear that this is not necessarily the d= efault approach to have.


I hope this makes sense now. 
=













On Wed, Feb 5, 2014 at 11:15 PM, Adrian Mocanu <amocanu@verticalscope.com> wrote:

Thank you Svend and Adam.=

 

Svend I’m your read= er and that tutorial is very useful. I’ve been spending lots of time = looking at the code and that blog post.

 

BTW I initially thought y= ou were adding the nulls incorrectly in Q3 below, but now I see you’r= e doing it correctly.

 

I have a follow up questi= on:

Why do you say that &ldqu= o;we do not implement multiget/multiput, we just take an existing im= plementation for Cassandra or Memcached or anything and they do what's right for that backend.” 

I thought that I had to r= ewrite an IBackingMap implementation to correspond to the tuples and schema= I have in my database. I use Cassandra.

I started with com.hmsonl= ine.storm.cassandra.trident.CassandraState or trident.cassandra.CassandraSt= ate (they both implement IBackingMap) and I replaced multiGet and multiPut to match my db schema. (well, I’m trying to do it)

 

You are saying I can use = CassandraState as it is? :D

If so how would it even k= now what table my data should go into? It allows you to set the column fami= ly and a few other things where state will be saved (keyspace, column family, replication, rowKey). By state I think it means sth like tx= ID (transaction ID). Do you by any chance know what this state that Cassand= raState is saving is?

So as you can tell I have= no idea how to use CassandraState.

 

Thanks again!

-A

From: Svend Vanderveken [mailto:svend.vanderveken@gmail.com]
Sent: February-05-14 2:56 PM


To: user@storm.incubator.apache.org
Subject: Re: Svend's blog - several questions

 

 

 <= /p>

On Wed, Feb 5, 2014 at 6:22 PM, Adrian Mocanu <amocanu@vertic= alscope.com> wrote:

I've read Svend's blog [http://svendvanderveken.wordpress.com/2013/07/3= 0/scalable-real-time-state-update-with-storm/] multiple times and I have a few questions.

 

 

So you are my reader! Great :D

(you can post your questions on the blog itself, I&#= 39;m more likely to spot it there)

 

 

 

"Because we did a groupBy on one tuple field, e= ach List contains here one single

String: the correlationId. Note that the list we ret= urn must have exactly the same

size as the list of keys, so that Storm knows what p= eriod corresponds to what key.

So for any key that does not exist in DB, we simply = put a null in the resulting list."

 

Q1: Do the db keys come only from groupBy?=

 

Yes, the key values arriving in the multiget are the= field value by which we are grouping

do  groupBy (new Fields("color")) and= you get things like "blue"; "green", "flowerly ro= mantic red"...

 

 

 

 

Q2: Can you do groupBy multiple keys:like .groupBy(&= quot;name").groupBy("id") ?

 

yes, the syntax is like this: 

 

groupBy (new Fields("name", "id"= ))

 

That's the reason the keys in the multiget are L= ist<Object> and not simply Object. We receive them in the order they = specified in the topology definition 

 

 

Q3: When we add null we keep the size of the results= list the same as they keys list but I don't understand how we make sur= e that key(3) points to correct result(3).

After all we're adding nulls at the end of resul= t list not intermitently. ie: if

key(1) does not have an entry in db, and key size is= 5, we add null to last position

in results not to results(1). This doesn't prese= rve consistency/order so key(1) now

gives result(1) which is not null as it should be. I= s the code incorrect ... or the

explanation on Svend's blog is incorrect?=

 

 

The order should indeed be respected, so if the stra= tegy to handling error DB error in a multi-get is to put nulls, that they s= hould indeed be at index corresponding to the problematic key. Is there par= t of my toy project code who is padding nulls at the end? If so that's indeed a bug, please let me know where = (or better, fork and send me a pull request)

 

Note that I'm not particularly recommending to p= ut nulls in case of unrecoverable errors in a multi-get, that's actuall= y a simplistic way of handling the error. The contract with storm is either= to fail either to return a list of the correct size in the correct order. The data itself and its semantic is up to the t= opology implementation, i.e. up to us.  

 

 

 

 

Moving on,

"Once this is loaded Storm will present the tup= les having the same correlation ID

one by one to our reducer, the PeriodBuilder"

 

Q4: Does Trident/Storm call the reducer after callin= g multiGet and before calling multiPut?

 

yes 

 

Q5: What params (and their types) are passed to the = reducer and what parameters should it emit so they can go into multiGet?=

 

the reducer is called iteratively, it starts with th= e state found from DB (returned by the multiget) and the first grouped tupl= e, then the second, then the third... until the last tuple. The return valu= e of the last call of the reducer is what is provided to the multiput, for the same key as the multiget.&nbs= p;

 

"reduce" is actually a very common pattern= in functional programming, which us java programming are sometimes less aw= are of. Look up some general doc on "reduce", the storm approach = to it is very traditional, i.e. Storm has defined the "reduce" primitive exactly the way many other tools are defining= that primitive

 

 

Q6: The first time the program is run the database i= s empty and multiGet will return nothing.

Does the reducer need to take care and make sure to = insert for the first time as opposed to update value? I do see that reducer= (TimelineUpdater) checks for nulls and I'm guessing this is the reason why it does so.

 

 

 

Exactly.

 

That's also why returning null in case of error = in the multiget is questionable and probably not what you would systematica= lly do: it is equivalent to saying: there's garbage in persistence for = that key, so let's just consider there's nothing. The actually proper thing to do depends on the task at hand, but actually,= such error in multiget is ofter a symptom that we stored garbage in persis= tence in the past due to some other, it's too late to correct it now.&n= bsp;

 

Last thing: most of the time we do not implement mul= tiget/multiput, we just take an existing implementation for Cassandra or Me= mcached or anything and they do what's right for that backend. =

 

 

Q7:

Can someone explain what these mean:

.each  (I've seen this used even consecutiv= ely: .each(..).each(..) )

.newStream

.newValuesStream

.persistAggregate

 

 

I think they are all detailed here: https://github.com/nathanmarz/storm/wiki/Trident-API-Overview<= /u>

 

 

I am unable to find javadocs with documentation for = the method signatures.

These java docs don't help much: http://nathanmarz.github.io/storm/doc/storm/trident/Stream.html<= u>

 

 

Q8:

Storm has ack/fail; does Trident handle that automat= ically?

 

 

Yes, although you can also explicitly trigger error.= Look up my next blog:  error handling in Storm Trident. <= u>

 

 

 

 

Q9: Has anyone tried Spark? = http://spark.incubator.apache.org/streaming/

I'm wondering if anyone has tried it because I&#= 39;m thinking of ditching storm and moving to that.

It seems much much much better documented.

 

 

Spark looks cool  I've not played with it y= et, no. Go ahead, keep us posted what you find out!

 

 

 

Lots of questions I know. Thanks for reading!=

 

 

and you :D

 

 

-Adrian<= /span>

 

 

 

Svend


--089e0158cb54c615c204f1b89c63--