mahout-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Trevor Grant <>
Subject Re: Lambda and Kappa CCO
Date Sun, 09 Apr 2017 22:43:05 GMT
Specifically, I hacked together a Lambda Streaming CCO with Spark and Flink
for a demo for my upcoming FlinkForward talk.  Will post code once I finish
it / strip all my creds out. In short- the lack of serialization in Mahout
incore vectors/matrices makes handing off / dealing with them somewhat

Trevor Grant
Data Scientist

*"Fortunate is he, who is able to know the causes of things."  -Virgil*

On Sun, Apr 9, 2017 at 5:39 PM, Andrew Palumbo <> wrote:

> Pat-
> What can we do from the mahout side?  Would we need any new data
> structures?  Trevor and I were just discussing some of  the troubles of
> near real time matrix streaming.
> ------------------------------
> *From:* Pat Ferrel <>
> *Sent:* Monday, March 27, 2017 2:42:55 PM
> *To:* Ted Dunning;
> *Cc:* Trevor Grant; Ted Dunning;
> *Subject:* Re: Lambda and Kappa CCO
> Agreed. Downsampling was ignored in several places and with it a great
> deal of input is a noop. Without downsampling too many things need to
> change.
> Also everything is dependent on this rather vague sentence. “- determine
> if the new interaction element cross-occurs with A and if so calculate the
> llr score”, which needs a lot more explanation. Whether to use Mahout
> in-memory objects or reimplement some in high speed data structures is a
> big question.
> The good thing I noticed in writing this is that model update and real
> time can be arbitrarily far apart, that the system degrades gracefully. So
> during high load it may fall behind but as long as user behavior is
> up-to-date and persisted (it will be) we are still in pretty good shape.
> On Mar 26, 2017, at 6:26 PM, Ted Dunning <> wrote:
> I think that this analysis omits the fact that one user interaction causes
> many cooccurrences to change.
> This becomes feasible if you include the effect of down-sampling, but that
> has to be in the algorithm.
> From: Pat Ferrel <>
> Sent: Saturday, March 25, 2017 12:01:00 PM
> To: Trevor Grant;
> Cc: Ted Dunning;
> Subject: Lambda and Kappa CCO
> This is an overview and proposal for turning the multi-modal Correlated
> Cross-Occurrence (CCO) recommender from Lambda-style into an online
> streaming incrementally updated Kappa-style learner.
> # The CCO Recommender: Lambda-style
> We have largely solved the problems of calculating the multi-modal
> Correlated Cross-Occurrence models and serving recommendations in real time
> from real time user behavior. The model sits in Lucene (Elasticsearch or
> Solr) in a scalable way and the typical query to produce personalized
> recommendations comes from real time user behavior completes with 25ms
> latency.
> # CCO Algorithm
> A = rows are users, columns are items they have “converted” on (purchase,
> read, watch). A represents the conversion event—the interaction that you
> want to recommend.
> B = rows are users columns are items that the user has shown some
> preference for but not necessarily the same items as A. B represent a
> different interaction than A. B might be a preference for some category,
> brand, genre, or just a detailed item page view—or all of these in B, C, D,
> etc
> h_a = a particular user’s history of A type interactions, a vector of
> items that our user converted on.
> h_b = a particular user’s history of B type interactions, a vector of
> items that our user had B type interactions with.
> CCO says:
> [A’A]h_a + [A’B]h_b + [A’C]h_c = r; where r is the weighted items from A
> that represent personalized recommendations for our particular user.
> The innovation here is that A, B, C, … represent multi-modal data.
> Interactions of all types and on item-sets of arbitrary types. In other
> words we can look at virtually any action or possible indicator of user
> preference or taste. We strengthen the above raw cross-occurrence and
> cooccurrence formula by performing:
> [llr(A’A)]h_a + [llr(A’B)]h_b + … = r adding llr (log-likelihood ratio)
> correlation scoring to filter out coincidental cross-occurrences.
> The model becomes [llr(A’A)], [llr(A’B)], … each has items from A in rows
> and items from A, B, … in columns. This sits in Lucene as one document per
> items in A with a field for each of A, B, C items whose user interactions
> most strongly correlate to the conversion event on the row item. Put
> another way, the model is items from A. B, C… what have the most similar
> user interaction from users.
> To calculate r we need to find the most simllar items in the model to the
> history or behavior of our example user. Since Lucene is basically a
> K-Nearest Neighbors engine that is particularly well tuned to work with
> sparse data (our model is typically quite sparse) all we need to do is
> segment the user history into h_a, h_b … and use it as the multi-field
> query on the model. This performs the equivalent of:
> [llr(A’A)]h_a + [llr(A’B)]h_b + … = r where we substitute cosine
> similarity of h_a to every row in [llr(A’A)]h_a for the tensor math.
> Further Lucene sorts by score and returns only the top ranking items. Even
> further we note that since we have performed a multi-field query it does
> the entire multi-field similarity calculation and vector segment addition
> before doing the sort. Lucene does this a a very performant manner so the
> entire query, including fetching user history, forming the Lucene query and
> executing it will take something like 25 ms and is indefinitely scalable to
> any number of simultaneous queries.
> Problem solved?
> Well, yes and no. The above method I’ve label a Lambda-style recommender.
> It uses real time user history and makes recommendations in real time but
> it can only recommend items in A. So if A is changing rapidly, as when the
> items have short lifetimes like newsy items of social media things like
> tweets then A can get out of date in hours or minutes. The other downside
> of Lambda CCO is that we note that the entirety of the data in A, B, C …
> has to be re-examined every time new models are calculated. With data on
> the order of a terabyte or more, this is quite a cost in compute resources.
> It is true that most typical A items will not change often. Think of the
> typical E-Commerce case where A represents items in a catalog, which change
> only infrequently. But the resources required to re-calculate the model
> remain high, even if they are only needed once per week. We have seen the
> existing Lambda-style system take many AWS x1.32xlarge instance hours to
> recalculate the model. This translates into a significant cost for model
> calculation alone.
> # Streaming Online CCO Kappa-style
> Since we have a good solution for serving query results from the model we
> might solve the Lambda CCO issues by converting the model re-calc into a
> streaming online incrementally updated process. In other words instead of
> replacing the model with a new one periodically let’s see if we can update
> it with each new interaction in real time.
> Stripping away the query part of the CCO algorithm the question is can we
> update:
> [llr(A’A)], [llr(A’B)], …
> in real time.
> Caveats: updates may be micro-batched, not with each new event, and real
> time really means “near real time” and the update frequency will be allowed
> to vary with input frequency.
> We note a few things about the problem:
> 1) A, B, … are sparse matrices keyed by a user-id. This is possible to
> model as a hashmap in memory or as a persistent distributed indexed dataset
> in some fast NoSQL DB.
> 2) if we need to update the model we also need to update the data that
> produces the model, A, B, C in a persistent way.
> 3) a single interaction will only affect one segment of the model (A’A or
> A’B but not both) and one input matrix.
> 4) The log-likelihood ratio needs to know 1) if there is a
> cross-occurrence and 2) the counts of interactions from several
> perspectives that correspond to row, column, and total density of the input
> matrices.
> 5) because of #4, the important data can be stored and updated as single
> elements of vectors, not requiring examining the entirety of input.
> We can make several decisions now based on system requirements.
> 1) How important is it to persist A, B, C etc. If we save interaction logs
> we can always re-create them when a system is taken down and brought back
> up. This implies that in-memory hashmaps of sparse vectors are sufficient
> to the solution even though a “reboot” may require some time to perform. If
> the reboot time is critical we can represent A, B, ... in a mutable DB.
> 2) The serving layer persists A’A, … and is highly performant and scalable
> with one of the distributed versions of Lucene so we leave that in place.
> The caveat is that we must now add new load to Lucene in that we must
> update the model in place rather than recalculating it en masse
> periodically. This has the benefit of also persisting the model so be
> “reboot” resilient.
> 3) if we are persisting user interaction h_a, h_b, persistently in a DB
> then we have the making of queries.
> This leaves only the collected interactions in A, B, … that must be
> accessed and updated in real time but if the were to disappear all we would
> loose is model real time model updating, the queries would still work and
> the system as a whole would continue to operate with very little notice of
> the lose. Therefor using in-memory data structures (which give the ultimate
> in speed) should be sufficient to the Kappa-style solution. They will be in
> separate hashmaps with a sparse vector per key, remembering that the key is
> an item from A and the vector is items from A, B, C ...
> # Kappa algorithm
> Transforming Lambda to Kappa requires:
> - receiving a new interaction, store it to a persistent DB as per the
> Lambda implementation that we know is fast
> - insert it into the hashmap that should contain it based on the
> interaction type (A, B, …)
> - recalculate the element in the row and column non-zero element vectors
> and update them in-memory. These are used in llr and can be updated without
> re-counting matrix elements
> - determine if the new interaction element cross-occurs with A and if so
> calculate the llr score
> - fetch the model field for this cross-occurrence and add the new
> interaction item if it scores high enough to meet the same threshold as
> used in Lambda-style—several options are available here
> Using the existing performant infrastructure of the Lambda recommender for
> persistence we have reduced the CCO model updates to in-memory hashmaps and
> vectors and potentially one new update of a single doc field in Lucene.
> Potentially being an important modifier because vey very often and update
> to the persistent model will not be required.
> At any time we can recreate the live system by rebooting, but even if the
> model update mechanism stops, queries continue to be served with real time
> data. Disregarding DB or Lucene failure the system reverts from Kappa to
> Lambda gracefully and then back to Kappa with a restart of the system
> managing in-memory data.
> Many diagrams and illustrations showing actual component choices to be
> inserted. Benchmarks for update timing also to be added.
> Any thoughts? The Kappa-style implementation is not started so suggestions
> are welcome. The Lambda-style version is operational as The Universal
> Recommender built on PredictionIO, HBase, Elasticsearch, Spark, and Mahout.
> It is documented here: <
>>and here:
> users/algorithms/intro-cooccurrence-spark.html <
> users/algorithms/intro-cooccurrence-spark.html>

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message