mahout-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pat Ferrel <>
Subject Re: Lambda and Kappa CCO
Date Mon, 17 Apr 2017 19:57:19 GMT
Ted thinks this can be done with DBs alone. What I proposed was in DBs like Solr/Elasticsearch
and a persistent event cache (HBase, Cassandra, etc) but in-memory models for faster indicator
calculations leading to mutable model updates in ES/Solr. One primary reason for kappa over
lambda is items with short life spans or rapidly changing catalogs—things like news. 

The other point for online learning is the volume of data that must be stored and re-processed.
Kappa only deals with small incremental changes. The resource cost of kappa will be much smaller
than lambda especially for slowly changing models, where most updates will be no-ops.

In any case in kappa there would be no matrix of vector multiply explicitly. If we do in-memory
data structures I doubt it would be Mahout ones.

On Apr 9, 2017, at 3:43 PM, Trevor Grant <> wrote:

Specifically, I hacked together a Lambda Streaming CCO with Spark and Flink
for a demo for my upcoming FlinkForward talk.  Will post code once I finish
it / strip all my creds out. In short- the lack of serialization in Mahout
incore vectors/matrices makes handing off / dealing with them somewhat

Trevor Grant
Data Scientist

*"Fortunate is he, who is able to know the causes of things."  -Virgil*

On Sun, Apr 9, 2017 at 5:39 PM, Andrew Palumbo <> wrote:

> Pat-
> What can we do from the mahout side?  Would we need any new data
> structures?  Trevor and I were just discussing some of  the troubles of
> near real time matrix streaming.
> ------------------------------
> *From:* Pat Ferrel <>
> *Sent:* Monday, March 27, 2017 2:42:55 PM
> *To:* Ted Dunning;
> *Cc:* Trevor Grant; Ted Dunning;
> *Subject:* Re: Lambda and Kappa CCO
> Agreed. Downsampling was ignored in several places and with it a great
> deal of input is a noop. Without downsampling too many things need to
> change.
> Also everything is dependent on this rather vague sentence. “- determine
> if the new interaction element cross-occurs with A and if so calculate the
> llr score”, which needs a lot more explanation. Whether to use Mahout
> in-memory objects or reimplement some in high speed data structures is a
> big question.
> The good thing I noticed in writing this is that model update and real
> time can be arbitrarily far apart, that the system degrades gracefully. So
> during high load it may fall behind but as long as user behavior is
> up-to-date and persisted (it will be) we are still in pretty good shape.
> On Mar 26, 2017, at 6:26 PM, Ted Dunning <> wrote:
> I think that this analysis omits the fact that one user interaction causes
> many cooccurrences to change.
> This becomes feasible if you include the effect of down-sampling, but that
> has to be in the algorithm.
> From: Pat Ferrel <>
> Sent: Saturday, March 25, 2017 12:01:00 PM
> To: Trevor Grant;
> Cc: Ted Dunning;
> Subject: Lambda and Kappa CCO
> This is an overview and proposal for turning the multi-modal Correlated
> Cross-Occurrence (CCO) recommender from Lambda-style into an online
> streaming incrementally updated Kappa-style learner.
> # The CCO Recommender: Lambda-style
> We have largely solved the problems of calculating the multi-modal
> Correlated Cross-Occurrence models and serving recommendations in real time
> from real time user behavior. The model sits in Lucene (Elasticsearch or
> Solr) in a scalable way and the typical query to produce personalized
> recommendations comes from real time user behavior completes with 25ms
> latency.
> # CCO Algorithm
> A = rows are users, columns are items they have “converted” on (purchase,
> read, watch). A represents the conversion event—the interaction that you
> want to recommend.
> B = rows are users columns are items that the user has shown some
> preference for but not necessarily the same items as A. B represent a
> different interaction than A. B might be a preference for some category,
> brand, genre, or just a detailed item page view—or all of these in B, C, D,
> etc
> h_a = a particular user’s history of A type interactions, a vector of
> items that our user converted on.
> h_b = a particular user’s history of B type interactions, a vector of
> items that our user had B type interactions with.
> CCO says:
> [A’A]h_a + [A’B]h_b + [A’C]h_c = r; where r is the weighted items from A
> that represent personalized recommendations for our particular user.
> The innovation here is that A, B, C, … represent multi-modal data.
> Interactions of all types and on item-sets of arbitrary types. In other
> words we can look at virtually any action or possible indicator of user
> preference or taste. We strengthen the above raw cross-occurrence and
> cooccurrence formula by performing:
> [llr(A’A)]h_a + [llr(A’B)]h_b + … = r adding llr (log-likelihood ratio)
> correlation scoring to filter out coincidental cross-occurrences.
> The model becomes [llr(A’A)], [llr(A’B)], … each has items from A in rows
> and items from A, B, … in columns. This sits in Lucene as one document per
> items in A with a field for each of A, B, C items whose user interactions
> most strongly correlate to the conversion event on the row item. Put
> another way, the model is items from A. B, C… what have the most similar
> user interaction from users.
> To calculate r we need to find the most simllar items in the model to the
> history or behavior of our example user. Since Lucene is basically a
> K-Nearest Neighbors engine that is particularly well tuned to work with
> sparse data (our model is typically quite sparse) all we need to do is
> segment the user history into h_a, h_b … and use it as the multi-field
> query on the model. This performs the equivalent of:
> [llr(A’A)]h_a + [llr(A’B)]h_b + … = r where we substitute cosine
> similarity of h_a to every row in [llr(A’A)]h_a for the tensor math.
> Further Lucene sorts by score and returns only the top ranking items. Even
> further we note that since we have performed a multi-field query it does
> the entire multi-field similarity calculation and vector segment addition
> before doing the sort. Lucene does this a a very performant manner so the
> entire query, including fetching user history, forming the Lucene query and
> executing it will take something like 25 ms and is indefinitely scalable to
> any number of simultaneous queries.
> Problem solved?
> Well, yes and no. The above method I’ve label a Lambda-style recommender.
> It uses real time user history and makes recommendations in real time but
> it can only recommend items in A. So if A is changing rapidly, as when the
> items have short lifetimes like newsy items of social media things like
> tweets then A can get out of date in hours or minutes. The other downside
> of Lambda CCO is that we note that the entirety of the data in A, B, C …
> has to be re-examined every time new models are calculated. With data on
> the order of a terabyte or more, this is quite a cost in compute resources.
> It is true that most typical A items will not change often. Think of the
> typical E-Commerce case where A represents items in a catalog, which change
> only infrequently. But the resources required to re-calculate the model
> remain high, even if they are only needed once per week. We have seen the
> existing Lambda-style system take many AWS x1.32xlarge instance hours to
> recalculate the model. This translates into a significant cost for model
> calculation alone.
> # Streaming Online CCO Kappa-style
> Since we have a good solution for serving query results from the model we
> might solve the Lambda CCO issues by converting the model re-calc into a
> streaming online incrementally updated process. In other words instead of
> replacing the model with a new one periodically let’s see if we can update
> it with each new interaction in real time.
> Stripping away the query part of the CCO algorithm the question is can we
> update:
> [llr(A’A)], [llr(A’B)], …
> in real time.
> Caveats: updates may be micro-batched, not with each new event, and real
> time really means “near real time” and the update frequency will be allowed
> to vary with input frequency.
> We note a few things about the problem:
> 1) A, B, … are sparse matrices keyed by a user-id. This is possible to
> model as a hashmap in memory or as a persistent distributed indexed dataset
> in some fast NoSQL DB.
> 2) if we need to update the model we also need to update the data that
> produces the model, A, B, C in a persistent way.
> 3) a single interaction will only affect one segment of the model (A’A or
> A’B but not both) and one input matrix.
> 4) The log-likelihood ratio needs to know 1) if there is a
> cross-occurrence and 2) the counts of interactions from several
> perspectives that correspond to row, column, and total density of the input
> matrices.
> 5) because of #4, the important data can be stored and updated as single
> elements of vectors, not requiring examining the entirety of input.
> We can make several decisions now based on system requirements.
> 1) How important is it to persist A, B, C etc. If we save interaction logs
> we can always re-create them when a system is taken down and brought back
> up. This implies that in-memory hashmaps of sparse vectors are sufficient
> to the solution even though a “reboot” may require some time to perform. If
> the reboot time is critical we can represent A, B, ... in a mutable DB.
> 2) The serving layer persists A’A, … and is highly performant and scalable
> with one of the distributed versions of Lucene so we leave that in place.
> The caveat is that we must now add new load to Lucene in that we must
> update the model in place rather than recalculating it en masse
> periodically. This has the benefit of also persisting the model so be
> “reboot” resilient.
> 3) if we are persisting user interaction h_a, h_b, persistently in a DB
> then we have the making of queries.
> This leaves only the collected interactions in A, B, … that must be
> accessed and updated in real time but if the were to disappear all we would
> loose is model real time model updating, the queries would still work and
> the system as a whole would continue to operate with very little notice of
> the lose. Therefor using in-memory data structures (which give the ultimate
> in speed) should be sufficient to the Kappa-style solution. They will be in
> separate hashmaps with a sparse vector per key, remembering that the key is
> an item from A and the vector is items from A, B, C ...
> # Kappa algorithm
> Transforming Lambda to Kappa requires:
> - receiving a new interaction, store it to a persistent DB as per the
> Lambda implementation that we know is fast
> - insert it into the hashmap that should contain it based on the
> interaction type (A, B, …)
> - recalculate the element in the row and column non-zero element vectors
> and update them in-memory. These are used in llr and can be updated without
> re-counting matrix elements
> - determine if the new interaction element cross-occurs with A and if so
> calculate the llr score
> - fetch the model field for this cross-occurrence and add the new
> interaction item if it scores high enough to meet the same threshold as
> used in Lambda-style—several options are available here
> Using the existing performant infrastructure of the Lambda recommender for
> persistence we have reduced the CCO model updates to in-memory hashmaps and
> vectors and potentially one new update of a single doc field in Lucene.
> Potentially being an important modifier because vey very often and update
> to the persistent model will not be required.
> At any time we can recreate the live system by rebooting, but even if the
> model update mechanism stops, queries continue to be served with real time
> data. Disregarding DB or Lucene failure the system reverts from Kappa to
> Lambda gracefully and then back to Kappa with a restart of the system
> managing in-memory data.
> Many diagrams and illustrations showing actual component choices to be
> inserted. Benchmarks for update timing also to be added.
> Any thoughts? The Kappa-style implementation is not started so suggestions
> are welcome. The Lambda-style version is operational as The Universal
> Recommender built on PredictionIO, HBase, Elasticsearch, Spark, and Mahout.
> It is documented here: <
>>and here:
> users/algorithms/intro-cooccurrence-spark.html <
> users/algorithms/intro-cooccurrence-spark.html>

View raw message