storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "唐思成" <jadetan...@qq.com>
Subject Re: Re: How to implememt distinct count in trident topolgy?
Date Fri, 18 Jul 2014 02:29:08 GMT
Hi, I find trident topology is  a elegant solution to this problem, suppose I have a spout
emitting user logging information, here is the model

public class ActionLog implements Serializable{
    private static final long serialVersionUID = 22583958918591L;
    private String actionType;
    private String passPort;
    private String game;
    private Date sTime;    
}

if i want to caculate unique user of each game in real-time,here is my topology


inputStream.each(new Fields("actionLog"), new addNewFiled("user"), new Fields("user"))
                    .groupBy(new Fields("user"))
                    .aggregate(new One(), new Fields("one"))  //this step is equal to sql
distinct
                    .persistentAggregate(new RedisState.SnapshottableStateFactory("distinctCountUser"),
new Count(), new Fields("distinCount"))
                    .newValuesStream()
                    .each(new Fields("distinCount"), new justLogger());

there is an another example  http://storm.incubator.apache.org/documentation/Trident-tutorial.html
   Reach

The topology works on my local cluster, but I dont think very throughly, I wannt to put this
implementation on production, so there are still lot work to be done, any suggestiong and
ideas are truly welcome.
2014-07-18 



唐思成 



发件人: Sam Goodwin 
发送时间: 2014-07-17  05:45:03 
收件人: user@storm.incubator.apache.org 
抄送: 
主题: Re: How to implememt distinct count in trident topolgy? 
 
Even with Redis you'll need to maintain the sliding window yourself.


Does it need to be exact? If you want to estimate the number of distinct users seen in a sliding
window then use the HyperLogLog data structure with a ring buffer. It's fast, accurate and
memory efficient. For example, allocate 60 HyperLogLog structure for 60 minutes (1 per minute)
and then use a Ring Buffer algorithm to maintain the sliding window. When you want the total
count you can just merge all the HyperLogLog structures and extract the count. It's not exact
but it's close enough and can be tuned based on your precision requirements.

Twitter's alegebird package has a monoid implementation of the HLL algorithm https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala#L353

 which basically means you can merge them into 1 and should be all you need. The Redis HLL
also allows you to merge two HLLs. Please note that using the Redis HLL algorithm will make
it harder to implement a transactional topology. If you ever want that then I suggest you
implement the above algorithm and serialize/deserialize in your TridentState.


If you want a more precise window you can just increase the bucket counts. You may also be
able to adapt this exponential histogram sliding window algorithm for your needs http://www-cs-students.stanford.edu/~datar/papers/sicomp_streams.pdf



On Wed, Jul 16, 2014 at 1:21 PM, Danijel Schiavuzzi <danijel@schiavuzzi.com> wrote:

Take a look at a distributed data structure server, for example Redis. The are various Storm
integrations available.

On Monday, July 14, 2014, 唐思成 <jadetangcn@qq.com> wrote:

Use case is simple, count unique user in for in a window slide, and I found the common solutions
over the Internet is to use HashSet to fliter the duplicated user,like this 

public class Distinct extends BaseFilter {
    private static final long serialVersionUID = 1L;
    private Set<String> distincter = Collections.synchronizedSet(new HashSet<String>());
    @Override
    public boolean isKeep(TridentTuple tuple) {
        String id = this.getId(tuple);
        return distincter.add(id);
    }
    public String getId(TridentTuple t) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < t.size(); i++) {
            sb.append(t.getString(i));
        }
        return sb.toString();
    }
}

However, the HashSet is stored in memory, when the data grows to a very large level, I think
it will cause a OOM.
So is there a scalable solution?

2014-07-14 



唐思成 


-- 
Danijel Schiavuzzi

E: danijel@schiavuzzi.com
W: www.schiavuzzi.com
T: +385989035562
Skype: danijels7
Mime
View raw message