storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sujitha chinnu <chinnusujith...@gmail.com>
Subject how to execute one bolt after another when the input is taken from same spout.
Date Mon, 01 Feb 2016 07:22:24 GMT
hai.,
         My requirement is to first execute one bolt and upon successful
execution only next bolt have to execute and i am giving the input for the
bolts from same spout.For that for second bolt i am using Thread.sleep()
method, its working fine but have performance issues.Can anyone help me if
there is any alternative for this problem.

Here is my sample code:

Topology:

public class Topology {

ConnectionProvider cp;
protected static final String JDBC_CONF = "jdbc.conf";
protected static final String TABLE_NAME = "users";

public static void main(String[] args) throws Exception{
String argument = args[0];
JdbcMapper jdbcMapper;
TopologyBuilder builder = new TopologyBuilder();
Map map = Maps.newHashMap();
map.put("dataSourceClassName", "org.postgresql.ds.PGSimpleDataSource");
map.put("dataSource.url","jdbc:postgresql://localhost:5432/analysis?user=postgres");
ConnectionProvider cp = new MyConnectionProvider(map);

jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, cp);

List<Column> schemaColumns = Lists.newArrayList(new Column("user_id",
Types.INTEGER), new Column ("user_name",Types.VARCHAR),new
Column("create_date", Types.TIMESTAMP));

JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns);

PsqlBolt userPersistanceBolt = new PsqlBolt(cp, mapper)
.withInsertQuery("insert into user_details (id, user_name,
created_timestamp) values (?,?,?)");

builder.setSpout("myspout", new UserSpout(), 1);

builder.setBolt("Psql_Bolt",
userPersistanceBolt,1).shuffleGrouping("myspout");

jdbcMapper = new SimpleJdbcMapper("My_details", cp);

List<Column> schemaColumns1 = Lists.newArrayList(new Column("my_id",
Types.INTEGER), new Column ("my_name",Types.VARCHAR));

JdbcMapper mapper1 = new SimpleJdbcMapper(schemaColumns1);

PsqlBolt1 userPersistanceBolt1 = new PsqlBolt1(cp, mapper1)
.withInsertQuery("insert into My_details (my_id, my_name) values (?,?)");

//builder.setSpout("myspout", new UserSpout(), 1);

builder.setBolt("Psql_Bolt1",
userPersistanceBolt1,1).shuffleGrouping("myspout");
Config conf = new Config();
conf.put(JDBC_CONF, map);
conf.setDebug(true);
conf.setNumWorkers(3);

if (argument.equalsIgnoreCase("runLocally"))
{ System.out.println("Running topology locally..."); LocalCluster cluster =
new LocalCluster(); cluster.submitTopology("Twitter Test Storm-postgresql",
conf, builder.createTopology()); }

else
{ System.out.println("Running topology on cluster...");
StormSubmitter.submitTopology("Topology_psql", conf,
builder.createTopology()); }

}}

PsqlBolt:

public class PsqlBolt extends AbstractJdbcBolt {
private static final Logger LOG = Logger.getLogger(PsqlBolt.class);
private String tableName;
private String insertQuery;
private JdbcMapper jdbcMapper;

public PsqlBolt(ConnectionProvider connectionProvider, JdbcMapper
jdbcMapper)
{ super(connectionProvider); this.jdbcMapper = jdbcMapper; }
public PsqlBolt withTableName(String tableName) { this.tableName =
tableName; return this; }

public PsqlBolt withInsertQuery(String insertQuery) { this.insertQuery =
insertQuery; System.out.println("query passsed....."); return this; }
@Override
public void prepare(Map map, TopologyContext topologyContext,
OutputCollector collector) {
super.prepare(map, topologyContext, collector);
if(StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQuery)) {
throw new IllegalArgumentException("You must supply either a tableName or
an insert Query."); }
}

@Override
public void execute(Tuple tuple) {
try {

List<Column> columns = jdbcMapper.getColumns(tuple);
List<List<Column>> columnLists = new ArrayList<List<Column>>();
columnLists.add(columns);
if(!StringUtils.isBlank(tableName)) {
this.jdbcClient.insert(this.tableName, columnLists); } else {
this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists); }
this.collector.ack(tuple);
} catch (Exception e) { this.collector.reportError(e);
this.collector.fail(tuple); }
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

}}


PsqlBolt1:

public class PsqlBolt1 extends AbstractJdbcBolt {
private static final Logger LOG = Logger.getLogger(PsqlBolt1.class);

private String tableName;
private String insertQuery;
private JdbcMapper jdbcMapper;

public PsqlBolt1(ConnectionProvider connectionProvider, JdbcMapper
jdbcMapper) { super(connectionProvider); this.jdbcMapper = jdbcMapper; }

public PsqlBolt1 withInsertQuery(String insertQuery)
{ this.insertQuery = insertQuery; System.out.println("query passsed.....");
return this; }

@Override
public void prepare(Map map, TopologyContext topologyContext,
OutputCollector collector) {
super.prepare(map, topologyContext, collector);
if(StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQuery))
{ throw new IllegalArgumentException("You must supply either a tableName or
an insert Query."); }

}

@Override
public void execute(Tuple tuple) {
try {
Thread.sleep(1000);
List<Column> columns = jdbcMapper.getColumns(tuple);
List<List<Column>> columnLists = new ArrayList<List<Column>>();
columnLists.add(columns);
if(!StringUtils.isBlank(tableName))
{ this.jdbcClient.insert(this.tableName, columnLists); }

else
{ this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists); }

this.collector.ack(tuple);
} catch (Exception e)
{ this.collector.reportError(e); this.collector.fail(tuple); }

}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

}}

Mime
View raw message