storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From MapleFalling <maplefall...@gmail.com>
Subject Anyone can help check my application to do the url count statistic
Date Sun, 12 Jan 2014 08:41:37 GMT
Hi,
I am trying to use storm trident to implement a function to calculate the
big size url data to do the url count and save into mongoDB, I use redis to
pump in data, and write up a redistranactionalspout to get data into storm.
Since I need 'exactly once' semantic, I use trident. But when I submit my
topology to storm, I found two workers fetch data from redis, one is for
$mastercoord-bg0<http://10.5.140.132:8080/topology/tridentWCount-46-1389489118/component/%24mastercoord-bg0>,
another is $spoutcoord-spout0<http://10.5.140.132:8080/topology/tridentWCount-46-1389489118/component/%24spoutcoord-spout0>
.
I do not quite understand why there are two spout to fetch data from redis.
It caused my calculation is not right.



I copy paste my code here, hope someone can check it out for me.

public class RedisPubSubTransactionalSpout implements ITridentSpout<String>{

static final long serialVersionUID = 737015318988609461L;
static Logger LOG = Logger.getLogger(RedisPubSubTransactionalSpout.class);
    static final String REDIS_EMITTEDMSG="emittedMsgs";

SpoutOutputCollector _collector;
final String host;
final int port;
final String pattern;
LinkedBlockingQueue<String> queue;
JedisPool pool;
    private final String singleOutputFieldName;
    private Long count2storm=0L;
    private Long countFromRedis=0L;

public RedisPubSubTransactionalSpout(String host, int port, String
pattern,String singleOutputFieldName) {
this.host = host;
this.port = port;
this.pattern = pattern;
        this.singleOutputFieldName=singleOutputFieldName;
        this.getLogger().debug("Set up RedisClient");

}

    private Logger getLogger(){
        LOG.setLevel(Level.ALL);
        return LOG;
    }

    protected  void initIfNeeded() {
        if (pool==null) {
            queue = new LinkedBlockingQueue<String>(1000);
            pool = new JedisPool(new JedisPoolConfig(),host,port);

            ListenerThread listener = new
ListenerThread(queue,pool,pattern);
            listener.start();
            getLogger().debug("Start redis reader process");
        }
    }
class ListenerThread extends Thread {
LinkedBlockingQueue<String> queue;
JedisPool pool;
String pattern;


 public ListenerThread(LinkedBlockingQueue<String> queue, JedisPool pool,
String pattern) {
this.queue = queue;
this.pool = pool;
this.pattern = pattern;
}
 public void run() {
 JedisPubSub listener = new JedisPubSub() {

@Override
public void onMessage(String channel, String message) {
                   // getLogger().debug("redis spout get message from redis
"+message);
queue.offer(message);
                    countFromRedis++;
                    getLogger().debug("on message,total from
redis:"+countFromRedis.toString());

}

@Override
public void onPMessage(String pattern, String channel, String message) {
                   // getLogger().debug("redis spout get pmessage from
redis "+message);
queue.offer(message);
                    countFromRedis++;
                    getLogger().debug("on pmessage,total from
redis:"+countFromRedis.toString());
}

@Override
public void onPSubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
 }

@Override
public void onPUnsubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
 }

@Override
public void onSubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
 }

@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
 }
};
 Jedis jedis = pool.getResource();
try {
//jedis.psubscribe(listener, pattern);
                jedis.subscribe(listener,pattern);
} finally {
pool.returnResource(jedis);
}
}
};

    private  class Coordinator implements BatchCoordinator<String>{

        private final long batchSize;

        public Coordinator(long batchSize){
            this.batchSize=batchSize;
            initIfNeeded();
        }

        @Override
        public String initializeTransaction(long txid, String prevMetaData){

            String emittedId=UUID.randomUUID().toString();
            Jedis jedisClient=null;
            StringBuffer emittedMsg=new StringBuffer();
            Long txId=txid;
            try{

                jedisClient=pool.getResource();

                getLogger().debug("init transaction!");
                Long _count=0L;
                for (int idx=0;idx<batchSize;idx++){
                    String ret = queue.poll();
                    if(ret==null) {
                        Utils.sleep(50);
                    } else {
                        getLogger().debug("coordinator get message from
redis "+ret);
                        emittedMsg.append(ret).append("\n");
                        _count++;

                    }
                }

                String _emitMsgStr=emittedMsg.toString();
                getLogger().debug("emit Msg in coordinator:"+_emitMsgStr);
                if ((jedisClient!=null) && (_emitMsgStr!=null) &&
(!_emitMsgStr.isEmpty())){
                    getLogger().debug("Put emit message into redis with
txId:"+txId.toString());

 jedisClient.hset(REDIS_EMITTEDMSG,txId.toString(),emittedMsg.toString());
                }

                //save into mongoDB

               /* BasicDBObject _total = (BasicDBObject)
MongoDB.INSTANCE.getMongoDBCol2().findOne();
                if (_total==null){
                    _total =new BasicDBObject("t",0);
                }

                Long count=(Long) _total.get("t");
                if (count==0L || count== null){
                    count=1L;
                }else{
                    count=count+_count;
                }

                _total.put("t",count);
                MongoDB.INSTANCE.getMongoDBCol2().save(_total); */
                count2storm=count2storm+_count;
                getLogger().debug("total to storm:"+count2storm.toString());

            }catch(Exception e){
                getLogger().debug("error in init
transaction!"+e.getMessage());
            }finally {
                if (jedisClient!=null)
                    pool.returnResource(jedisClient);
                return emittedId;
            }


        }

        @Override
        public void success(long txid){
         /*  Long txId=txid;
            Jedis jedisClient=null;
            try{
                jedisClient=pool.getResource();
               //remove from successful handled message
                if (jedisClient!=null){
                    getLogger().debug("remove transaction
:"+txId.toString());
                    jedisClient.hdel(REDIS_EMITTEDMSG,txId.toString());
                }

            }catch(Exception e){

            }finally {
                if (jedisClient!=null)
                    pool.returnResource(jedisClient);
            }*/

        }

        @Override
        public boolean isReady(long txid){
            if (pool!=null){
                return true;
            }else{
                return false;
            }
        }

        @Override
        public void close(){
            pool.destroy();
        }


    }

    private class TheEmitter implements Emitter<String> {

        @Override
        public void emitBatch(TransactionAttempt tx, String
coordinatorMeta, TridentCollector collector) {

            Jedis jedisClient=null;
            initIfNeeded();
            try{
                jedisClient=pool.getResource();
                String
msgData=jedisClient.hget(REDIS_EMITTEDMSG,tx.getTransactionId().toString());
                getLogger().debug("emitter get message from redis
"+msgData);
                if (msgData!=null){
                    String msgs[]=msgData.split("\n");
                    for (String msg:msgs){
                        getLogger().debug("get message from emittedMsgs
:"+msg);
                        collector.emit(new Values(msg));
                    }
                }

            }catch (Exception e){
                getLogger().debug("Emit message error"+e.getMessage());
            }finally {
                if (null != jedisClient)
                    pool.returnResource(jedisClient);
            }

        }

        @Override
        public void success(TransactionAttempt tx) {
            // NOP
            Long txId=tx.getTransactionId();
            initIfNeeded();
            Jedis jedisClient=null;
            try{
                jedisClient=pool.getResource();
                //remove from successful handled message
                if (jedisClient!=null){
                    getLogger().debug("remove transaction
:"+txId.toString());
                    jedisClient.hdel(REDIS_EMITTEDMSG,txId.toString());
                }

            }catch(Exception e){

            }finally {
                if (jedisClient!=null)
                    pool.returnResource(jedisClient);
            }

        }

        @Override
        public void close() {
            // NOP
        }

    }

    @Override
    public BatchCoordinator<String> getCoordinator(String txStateId, Map
conf, TopologyContext context) {
        return new Coordinator((Long) 100L);
    }

    @Override
    public Emitter<String> getEmitter(String txStateId, Map conf,
TopologyContext context) {
        return new TheEmitter();
    }

    @Override
    public Map getComponentConfiguration() {
        return null;
    }

    @Override
    public Fields getOutputFields() {
        return new Fields(singleOutputFieldName);
    }
}




public class TridentURLCount {
    public static final class Constants {
        public static final String RedisHost = "abc.abc.abc.abc";
        public static final int RedisPort = 6379;
        public static final String Pattern = "stormURL";
        public static final String OutputFieldName = "url";
        public static final  String urlHash = "h";
        public static final  String url = "u";
        public static final String urlCount = "c";
        public static final String aveTTL = "at";
        public static final String txId="tid";

    }

    public static class URLStat implements Serializable {
        private String hash;

        public Long getCount() {
            return count;
        }


        public String getUrl() {
            return url;
        }

        public void setUrl(String url) {
            this.url = url;
        }

        public Long getAvgTTL() {
            return this.avgTTL;
        }

        public void mergeAvg(Long avgTTL,Long count) {
            this.avgTTL = ((this.avgTTL * this.count) + avgTTL*count) /
(this.count + count);
            this.count=this.count+count;
        }

        private String url;

        public String getHash() {
            return hash;
        }

        public void setHash(String hash) {
            this.hash = hash;
        }

        private Long avgTTL = 0L;
        private Long count = 0L;

    }



    public static class SumDB implements State {
        private Long txId;
        private static final Logger LOG = Logger.getLogger(SumDB.class);

        {
            LOG.setLevel(Level.ALL);
        }

        private Logger getLogger() {
            return LOG;
        }

        public void beginCommit(Long txid) {
            this.txId = txid;
        }

        public void commit(Long txid) {

        }

        public void update(Map<String, URLStat> data) {
            MongoDB.INSTANCE.setupMongoDB();
            this.getLogger().debug("Start to save data to MongoDB, here we
need to make exact process once semantics");
            Iterator it = data.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                String key = (String) entry.getKey();
                URLStat val = (URLStat) entry.getValue();

                BasicDBObject stormStat = new
BasicDBObject(Constants.urlHash, key);
                BasicDBObject _dbStat = (BasicDBObject)
MongoDB.INSTANCE.getMongoDBCol().findOne(stormStat);
                if (_dbStat == null) {
                    _dbStat = stormStat;
                }

                Long _txId=(Long) _dbStat.get(Constants.txId);
                if (_txId==null || _txId <this.txId){
                    Long _count=(Long) _dbStat.get(Constants.urlCount);
                    if (_count==null)
                        _count=0L;
                    _count=_count+val.getCount();
                    _dbStat.put(Constants.url, val.getUrl());
                    _dbStat.put(Constants.urlCount, _count);
                    _dbStat.put(Constants.aveTTL, val.getAvgTTL());
                    _dbStat.put(Constants.txId, this.txId);
                    MongoDB.INSTANCE.getMongoDBCol().save(_dbStat);
                    this.getLogger().debug("Save stat into mongoDB for url
" + val.getUrl());
                }




            }
        }
    }

        public static class SumDBFactory implements StateFactory {
            public State makeState(Map a_conf, IMetricsContext a_context,
int a_partitionIndex, int a_numPartitions) {
                return new SumDB();
            }

        }


        public static class SumUpdater extends BaseStateUpdater<SumDB> {
            public void updateState(SumDB state, List<TridentTuple> tuples,
TridentCollector collector) {
                Map<String, URLStat> counts = new HashMap<String,
URLStat>();
                for (TridentTuple tuple : tuples) {
                    URLStat _obj=(URLStat) tuple.getValue(1);
                    counts.put(_obj.getHash(), _obj);
                }

                state.update(counts);
            }

        }

        public static class localCombiner implements Aggregator<Map<String,
URLStat>> {
            int partitionId;
            private static final Logger LOG =
Logger.getLogger(localCombiner.class);



            private Logger getLogger() {
                LOG.setLevel(Level.ALL);
                return LOG;
            }

            @SuppressWarnings("rawtypes")
            @Override
            public void prepare(Map conf, TridentOperationContext context) {
                this.partitionId = context.getPartitionIndex();
            }

            @Override
            public void cleanup() {

            }

            @Override
            public Map<String, URLStat> init(Object batchId,
TridentCollector collector) {
                return new HashMap<String, URLStat>();
            }


            @Override
            public void aggregate(Map<String, URLStat> val, TridentTuple
tuple, TridentCollector collector) {
                String dataStr = (String) tuple.getString(0);
                this.getLogger().debug("Get Data from source:" + dataStr);
                if (dataStr != null && !dataStr.isEmpty()) {
                    String[] dataArr = dataStr.split(",");
                        String urlHash = dataArr[0];
                        String url = dataArr[1];
                        Long ttl = Long.parseLong(dataArr[5], 10);
                        URLStat _obj = val.get(urlHash);
                        if (_obj == null) {
                            _obj = new URLStat();
                            _obj.setHash(urlHash);
                            _obj.setUrl(url);
                        }

                        _obj.mergeAvg(ttl,1L);
                        val.put(_obj.getHash(), _obj);



                }
            }

            @Override
            public void complete(Map<String, URLStat> val, TridentCollector
collector) {
                this.getLogger().debug("In localCombiner complete");
                Iterator it = val.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry entry = (Map.Entry) it.next();
                    String key = (String) entry.getKey();
                    URLStat _val = (URLStat) entry.getValue();

this.getLogger().debug("emit??????????????:"+_val.getHash().toString());
                    collector.emit(new Values(key, _val));
                }
            }

        }


    public static class localCombiner2 implements Aggregator<Map<String,
URLStat>> {
        int partitionId;
        private static final Logger LOG =
Logger.getLogger(localCombiner2.class);



        private Logger getLogger() {
            LOG.setLevel(Level.ALL);
            return LOG;
        }

        @SuppressWarnings("rawtypes")
        @Override
        public void prepare(Map conf, TridentOperationContext context) {
            this.partitionId = context.getPartitionIndex();
        }

        @Override
        public void cleanup() {

        }

        @Override
        public Map<String, URLStat> init(Object batchId, TridentCollector
collector) {
            return new HashMap<String, URLStat>();
        }


        @Override
        public void aggregate(Map<String, URLStat> val, TridentTuple tuple,
TridentCollector collector) {

                this.getLogger().debug("In localCombiner2 aggregate");
                    String urlHash = tuple.getString(0);
                    URLStat _tupleStat=(URLStat) tuple.getValue(1);
                    String url =_tupleStat.getUrl() ;
                    URLStat _obj = val.get(urlHash);

                    if (_obj == null) {
                        _obj = new URLStat();
                        _obj.setHash(urlHash);
                        _obj.setUrl(url);
                    }


_obj.mergeAvg(_tupleStat.getAvgTTL(),_tupleStat.getCount());
                    val.put(_obj.getHash(), _obj);



        }

        @Override
        public void complete(Map<String, URLStat> val, TridentCollector
collector) {
            Iterator it = val.entrySet().iterator();
            this.getLogger().debug("In localCombiner2
complete!!!!!!!!!!!!!!!");
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                String key = (String) entry.getKey();
                URLStat _val = (URLStat) entry.getValue();
                collector.emit(new Values(key, _val));
                this.getLogger().debug("In localCombiner2
emit!:"+key.toString());
            }
        }

    }






    public static StormTopology buildTopology(LocalDRPC drpc) {
        RedisPubSubTransactionalSpout spout = new
RedisPubSubTransactionalSpout(Constants.RedisHost,
                Constants.RedisPort,
                Constants.Pattern,
                Constants.OutputFieldName);

        TridentTopology topology = new TridentTopology();
        TridentState wordCounts = topology.newStream("spout1",
spout).shuffle()
                .aggregate(new Fields(Constants.OutputFieldName),
                        new localCombiner(), new Fields("hash", "urlstat"))
                .parallelismHint(16).groupBy(new Fields("hash"))
                .aggregate(new Fields("hash","urlstat"), new
localCombiner2(), new Fields("hash1","urlstat1")).parallelismHint(16)
                .partitionPersist(new SumDBFactory(), new Fields("hash1",
"urlstat1"), new SumUpdater()).parallelismHint(1);



        return topology.build();
    }

    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        conf.setMaxSpoutPending(20);
        conf.setDebug(true);
        conf.put(Config.NIMBUS_HOST, "10.5.140.132");
        ArrayList<String> zookeeperSrvrs=new ArrayList<String>();
        zookeeperSrvrs.add("10.5.140.131");
        conf.put(Config.STORM_ZOOKEEPER_SERVERS, zookeeperSrvrs);
        //Config.setMaxSpoutPending(conf,5000);
        conf.put(Config.TOPOLOGY_DEBUG,true);
        conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS,120);


        if (args.length == 0) {
            LocalDRPC drpc = new LocalDRPC();
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("wordCounter", conf,
buildTopology(drpc));
            for (int i = 0; i < 100; i++) {
                System.out.println("DRPC RESULT: " + drpc.execute("words",
"cat the dog jumped"));
                Thread.sleep(1000);
            }
        } else {
            conf.setNumWorkers(6);
            StormSubmitter.submitTopology(args[0], conf,
buildTopology(null));
        }
    }
}


-- 
Maplefalling

Mime
View raw message