storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nathan Leung <ncle...@gmail.com>
Subject Re: how to execute one bolt after another when the input is taken from same spout.
Date Mon, 01 Feb 2016 17:47:15 GMT
You should wire the bolts one after the other, and the first will emit the
tuple to the second only when it has to. Don't use sleep, that's probably
not correct anyways.
On Jan 31, 2016 11:22 PM, "sujitha chinnu" <chinnusujitha28@gmail.com>
wrote:

> hai.,
>          My requirement is to first execute one bolt and upon successful
> execution only next bolt have to execute and i am giving the input for the
> bolts from same spout.For that for second bolt i am using Thread.sleep()
> method, its working fine but have performance issues.Can anyone help me if
> there is any alternative for this problem.
>
> Here is my sample code:
>
> Topology:
>
> public class Topology {
>
> ConnectionProvider cp;
> protected static final String JDBC_CONF = "jdbc.conf";
> protected static final String TABLE_NAME = "users";
>
> public static void main(String[] args) throws Exception{
> String argument = args[0];
> JdbcMapper jdbcMapper;
> TopologyBuilder builder = new TopologyBuilder();
> Map map = Maps.newHashMap();
> map.put("dataSourceClassName", "org.postgresql.ds.PGSimpleDataSource");
>
> map.put("dataSource.url","jdbc:postgresql://localhost:5432/analysis?user=postgres");
> ConnectionProvider cp = new MyConnectionProvider(map);
>
> jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, cp);
>
> List<Column> schemaColumns = Lists.newArrayList(new Column("user_id",
> Types.INTEGER), new Column ("user_name",Types.VARCHAR),new
> Column("create_date", Types.TIMESTAMP));
>
> JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns);
>
> PsqlBolt userPersistanceBolt = new PsqlBolt(cp, mapper)
> .withInsertQuery("insert into user_details (id, user_name,
> created_timestamp) values (?,?,?)");
>
> builder.setSpout("myspout", new UserSpout(), 1);
>
> builder.setBolt("Psql_Bolt",
> userPersistanceBolt,1).shuffleGrouping("myspout");
>
> jdbcMapper = new SimpleJdbcMapper("My_details", cp);
>
> List<Column> schemaColumns1 = Lists.newArrayList(new Column("my_id",
> Types.INTEGER), new Column ("my_name",Types.VARCHAR));
>
> JdbcMapper mapper1 = new SimpleJdbcMapper(schemaColumns1);
>
> PsqlBolt1 userPersistanceBolt1 = new PsqlBolt1(cp, mapper1)
> .withInsertQuery("insert into My_details (my_id, my_name) values (?,?)");
>
> //builder.setSpout("myspout", new UserSpout(), 1);
>
> builder.setBolt("Psql_Bolt1",
> userPersistanceBolt1,1).shuffleGrouping("myspout");
> Config conf = new Config();
> conf.put(JDBC_CONF, map);
> conf.setDebug(true);
> conf.setNumWorkers(3);
>
> if (argument.equalsIgnoreCase("runLocally"))
> { System.out.println("Running topology locally..."); LocalCluster cluster
> = new LocalCluster(); cluster.submitTopology("Twitter Test
> Storm-postgresql", conf, builder.createTopology()); }
>
> else
> { System.out.println("Running topology on cluster...");
> StormSubmitter.submitTopology("Topology_psql", conf,
> builder.createTopology()); }
>
> }}
>
> PsqlBolt:
>
> public class PsqlBolt extends AbstractJdbcBolt {
> private static final Logger LOG = Logger.getLogger(PsqlBolt.class);
> private String tableName;
> private String insertQuery;
> private JdbcMapper jdbcMapper;
>
> public PsqlBolt(ConnectionProvider connectionProvider, JdbcMapper
> jdbcMapper)
> { super(connectionProvider); this.jdbcMapper = jdbcMapper; }
> public PsqlBolt withTableName(String tableName) { this.tableName =
> tableName; return this; }
>
> public PsqlBolt withInsertQuery(String insertQuery) { this.insertQuery =
> insertQuery; System.out.println("query passsed....."); return this; }
> @Override
> public void prepare(Map map, TopologyContext topologyContext,
> OutputCollector collector) {
> super.prepare(map, topologyContext, collector);
> if(StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQuery)) {
> throw new IllegalArgumentException("You must supply either a tableName or
> an insert Query."); }
> }
>
> @Override
> public void execute(Tuple tuple) {
> try {
>
> List<Column> columns = jdbcMapper.getColumns(tuple);
> List<List<Column>> columnLists = new ArrayList<List<Column>>();
> columnLists.add(columns);
> if(!StringUtils.isBlank(tableName)) {
> this.jdbcClient.insert(this.tableName, columnLists); } else {
> this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists); }
> this.collector.ack(tuple);
> } catch (Exception e) { this.collector.reportError(e);
> this.collector.fail(tuple); }
> }
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
> {
>
> }}
>
>
> PsqlBolt1:
>
> public class PsqlBolt1 extends AbstractJdbcBolt {
> private static final Logger LOG = Logger.getLogger(PsqlBolt1.class);
>
> private String tableName;
> private String insertQuery;
> private JdbcMapper jdbcMapper;
>
> public PsqlBolt1(ConnectionProvider connectionProvider, JdbcMapper
> jdbcMapper) { super(connectionProvider); this.jdbcMapper = jdbcMapper; }
>
> public PsqlBolt1 withInsertQuery(String insertQuery)
> { this.insertQuery = insertQuery; System.out.println("query
> passsed....."); return this; }
>
> @Override
> public void prepare(Map map, TopologyContext topologyContext,
> OutputCollector collector) {
> super.prepare(map, topologyContext, collector);
> if(StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQuery))
> { throw new IllegalArgumentException("You must supply either a tableName
> or an insert Query."); }
>
> }
>
> @Override
> public void execute(Tuple tuple) {
> try {
> Thread.sleep(1000);
> List<Column> columns = jdbcMapper.getColumns(tuple);
> List<List<Column>> columnLists = new ArrayList<List<Column>>();
> columnLists.add(columns);
> if(!StringUtils.isBlank(tableName))
> { this.jdbcClient.insert(this.tableName, columnLists); }
>
> else
> { this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists); }
>
> this.collector.ack(tuple);
> } catch (Exception e)
> { this.collector.reportError(e); this.collector.fail(tuple); }
>
> }
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
> {
>
> }}
>

Mime
View raw message