storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sujitha chinnu <chinnusujith...@gmail.com>
Subject Re: how to execute one bolt after another when the input is taken from same spout.
Date Tue, 02 Feb 2016 10:04:27 GMT
thanks for your response.
But i am taking input from the spout itself and the two bolts contains
different logic so i cannot emit tuple from the first bolt.

On Mon, Feb 1, 2016 at 11:17 PM, Nathan Leung <ncleung@gmail.com> wrote:

> You should wire the bolts one after the other, and the first will emit the
> tuple to the second only when it has to. Don't use sleep, that's probably
> not correct anyways.
> On Jan 31, 2016 11:22 PM, "sujitha chinnu" <chinnusujitha28@gmail.com>
> wrote:
>
>> hai.,
>>          My requirement is to first execute one bolt and upon successful
>> execution only next bolt have to execute and i am giving the input for the
>> bolts from same spout.For that for second bolt i am using Thread.sleep()
>> method, its working fine but have performance issues.Can anyone help me if
>> there is any alternative for this problem.
>>
>> Here is my sample code:
>>
>> Topology:
>>
>> public class Topology {
>>
>> ConnectionProvider cp;
>> protected static final String JDBC_CONF = "jdbc.conf";
>> protected static final String TABLE_NAME = "users";
>>
>> public static void main(String[] args) throws Exception{
>> String argument = args[0];
>> JdbcMapper jdbcMapper;
>> TopologyBuilder builder = new TopologyBuilder();
>> Map map = Maps.newHashMap();
>> map.put("dataSourceClassName", "org.postgresql.ds.PGSimpleDataSource");
>>
>> map.put("dataSource.url","jdbc:postgresql://localhost:5432/analysis?user=postgres");
>> ConnectionProvider cp = new MyConnectionProvider(map);
>>
>> jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, cp);
>>
>> List<Column> schemaColumns = Lists.newArrayList(new Column("user_id",
>> Types.INTEGER), new Column ("user_name",Types.VARCHAR),new
>> Column("create_date", Types.TIMESTAMP));
>>
>> JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns);
>>
>> PsqlBolt userPersistanceBolt = new PsqlBolt(cp, mapper)
>> .withInsertQuery("insert into user_details (id, user_name,
>> created_timestamp) values (?,?,?)");
>>
>> builder.setSpout("myspout", new UserSpout(), 1);
>>
>> builder.setBolt("Psql_Bolt",
>> userPersistanceBolt,1).shuffleGrouping("myspout");
>>
>> jdbcMapper = new SimpleJdbcMapper("My_details", cp);
>>
>> List<Column> schemaColumns1 = Lists.newArrayList(new Column("my_id",
>> Types.INTEGER), new Column ("my_name",Types.VARCHAR));
>>
>> JdbcMapper mapper1 = new SimpleJdbcMapper(schemaColumns1);
>>
>> PsqlBolt1 userPersistanceBolt1 = new PsqlBolt1(cp, mapper1)
>> .withInsertQuery("insert into My_details (my_id, my_name) values (?,?)");
>>
>> //builder.setSpout("myspout", new UserSpout(), 1);
>>
>> builder.setBolt("Psql_Bolt1",
>> userPersistanceBolt1,1).shuffleGrouping("myspout");
>> Config conf = new Config();
>> conf.put(JDBC_CONF, map);
>> conf.setDebug(true);
>> conf.setNumWorkers(3);
>>
>> if (argument.equalsIgnoreCase("runLocally"))
>> { System.out.println("Running topology locally..."); LocalCluster cluster
>> = new LocalCluster(); cluster.submitTopology("Twitter Test
>> Storm-postgresql", conf, builder.createTopology()); }
>>
>> else
>> { System.out.println("Running topology on cluster...");
>> StormSubmitter.submitTopology("Topology_psql", conf,
>> builder.createTopology()); }
>>
>> }}
>>
>> PsqlBolt:
>>
>> public class PsqlBolt extends AbstractJdbcBolt {
>> private static final Logger LOG = Logger.getLogger(PsqlBolt.class);
>> private String tableName;
>> private String insertQuery;
>> private JdbcMapper jdbcMapper;
>>
>> public PsqlBolt(ConnectionProvider connectionProvider, JdbcMapper
>> jdbcMapper)
>> { super(connectionProvider); this.jdbcMapper = jdbcMapper; }
>> public PsqlBolt withTableName(String tableName) { this.tableName =
>> tableName; return this; }
>>
>> public PsqlBolt withInsertQuery(String insertQuery) { this.insertQuery =
>> insertQuery; System.out.println("query passsed....."); return this; }
>> @Override
>> public void prepare(Map map, TopologyContext topologyContext,
>> OutputCollector collector) {
>> super.prepare(map, topologyContext, collector);
>> if(StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQuery)) {
>> throw new IllegalArgumentException("You must supply either a tableName or
>> an insert Query."); }
>> }
>>
>> @Override
>> public void execute(Tuple tuple) {
>> try {
>>
>> List<Column> columns = jdbcMapper.getColumns(tuple);
>> List<List<Column>> columnLists = new ArrayList<List<Column>>();
>> columnLists.add(columns);
>> if(!StringUtils.isBlank(tableName)) {
>> this.jdbcClient.insert(this.tableName, columnLists); } else {
>> this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists); }
>> this.collector.ack(tuple);
>> } catch (Exception e) { this.collector.reportError(e);
>> this.collector.fail(tuple); }
>> }
>>
>> @Override
>> public void declareOutputFields(OutputFieldsDeclarer
>> outputFieldsDeclarer) {
>>
>> }}
>>
>>
>> PsqlBolt1:
>>
>> public class PsqlBolt1 extends AbstractJdbcBolt {
>> private static final Logger LOG = Logger.getLogger(PsqlBolt1.class);
>>
>> private String tableName;
>> private String insertQuery;
>> private JdbcMapper jdbcMapper;
>>
>> public PsqlBolt1(ConnectionProvider connectionProvider, JdbcMapper
>> jdbcMapper) { super(connectionProvider); this.jdbcMapper = jdbcMapper; }
>>
>> public PsqlBolt1 withInsertQuery(String insertQuery)
>> { this.insertQuery = insertQuery; System.out.println("query
>> passsed....."); return this; }
>>
>> @Override
>> public void prepare(Map map, TopologyContext topologyContext,
>> OutputCollector collector) {
>> super.prepare(map, topologyContext, collector);
>> if(StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQuery))
>> { throw new IllegalArgumentException("You must supply either a tableName
>> or an insert Query."); }
>>
>> }
>>
>> @Override
>> public void execute(Tuple tuple) {
>> try {
>> Thread.sleep(1000);
>> List<Column> columns = jdbcMapper.getColumns(tuple);
>> List<List<Column>> columnLists = new ArrayList<List<Column>>();
>> columnLists.add(columns);
>> if(!StringUtils.isBlank(tableName))
>> { this.jdbcClient.insert(this.tableName, columnLists); }
>>
>> else
>> { this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists); }
>>
>> this.collector.ack(tuple);
>> } catch (Exception e)
>> { this.collector.reportError(e); this.collector.fail(tuple); }
>>
>> }
>>
>> @Override
>> public void declareOutputFields(OutputFieldsDeclarer
>> outputFieldsDeclarer) {
>>
>> }}
>>
>

Mime
View raw message