Return-Path: X-Original-To: apmail-storm-user-archive@minotaur.apache.org Delivered-To: apmail-storm-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6CB9F18280 for ; Mon, 1 Feb 2016 17:47:26 +0000 (UTC) Received: (qmail 49156 invoked by uid 500); 1 Feb 2016 17:47:25 -0000 Delivered-To: apmail-storm-user-archive@storm.apache.org Received: (qmail 49110 invoked by uid 500); 1 Feb 2016 17:47:24 -0000 Mailing-List: contact user-help@storm.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@storm.apache.org Delivered-To: mailing list user@storm.apache.org Received: (qmail 49100 invoked by uid 99); 1 Feb 2016 17:47:24 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Feb 2016 17:47:24 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 50BC41A01CA for ; Mon, 1 Feb 2016 17:47:24 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.879 X-Spam-Level: ** X-Spam-Status: No, score=2.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id oSckNQ2QYI7d for ; Mon, 1 Feb 2016 17:47:21 +0000 (UTC) Received: from mail-ig0-f178.google.com (mail-ig0-f178.google.com [209.85.213.178]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 5DD7C43A19 for ; Mon, 1 Feb 2016 17:47:21 +0000 (UTC) Received: by mail-ig0-f178.google.com with SMTP id mw1so38580011igb.1 for ; Mon, 01 Feb 2016 09:47:21 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=ZtRrntCpS8ypm3PIjAslKtKyfZP3L1ttCyw+W7t5h9k=; b=QfoW32RwK9w51/awh8qfPUF21T91kKHcUFs3YDqkHp3KQqH4dyaujfCW+wa4QhP4X7 y/o/6H2pIOm0d5KLSmj9c5XC1STArr6NjqW6hOEIVIdOiLa05k9g6AN4wEB4IaUsOydf gq1WEwFvNC8jo7gNBz12CROBBSHCAFTOobC+4/MyprP7iPEVSBMYD9sOz6oCp8xAcEe8 reuBJ65k4yha+tTS27jD1fCCI6fB6Z/6R7wj8JXA4EPVE1sRb6CXdd1+D6Y13TJssZjJ KLNpQy04zFDD3ANhkHM9YHNkPYwj7O3q+JAVGSfJcMehXWy+LkSdILPrvadVlqa7l4p6 Pa7g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type; bh=ZtRrntCpS8ypm3PIjAslKtKyfZP3L1ttCyw+W7t5h9k=; b=OoQmmKw997EPQna1TE+eOmwl0GIueyKjD11zs3GrGvgAm9uJ069UA6kCv0yKrPute1 oAwT+4ETH+SUtcAMh4GsgWPAdzuPYJTRjGJjlKglfDdK6AUeRNE7HugjMcDGH+HCP+bV EZNBhfnFUtrzikb8olr7QoGo+8k2q4JmEWv6lYmDxdWB2hr+yIVFPcqvc0gHBdKBh9k1 R/TR/uqI2DYlDRxugB27iK5KHrJ5NkAc3xSjCb4HvJgBIWvdCK5WVLjUrPk00Nv7TZ6Q 7H8hzlAoaQV+w1RtpcIBMJbN+WFc9RRY4wZJY6/3RO9PaiQ4DbDWhjg4AZT+6gFDF3zV FR8A== X-Gm-Message-State: AG10YOSFFQWlbWtxY75uKDp9S6+orcsuX36SsmVFbcTWWmwendGLJYnHeH3IUreZ1tqhYVcPMjxvZnT0Ya84Sw== MIME-Version: 1.0 X-Received: by 10.50.66.236 with SMTP id i12mr11462220igt.51.1454348835439; Mon, 01 Feb 2016 09:47:15 -0800 (PST) Received: by 10.107.183.195 with HTTP; Mon, 1 Feb 2016 09:47:15 -0800 (PST) Received: by 10.107.183.195 with HTTP; Mon, 1 Feb 2016 09:47:15 -0800 (PST) In-Reply-To: References: Date: Mon, 1 Feb 2016 12:47:15 -0500 Message-ID: Subject: Re: how to execute one bolt after another when the input is taken from same spout. From: Nathan Leung To: user Content-Type: multipart/alternative; boundary=047d7bdc0ce0b74eca052ab8f9a8 --047d7bdc0ce0b74eca052ab8f9a8 Content-Type: text/plain; charset=UTF-8 You should wire the bolts one after the other, and the first will emit the tuple to the second only when it has to. Don't use sleep, that's probably not correct anyways. On Jan 31, 2016 11:22 PM, "sujitha chinnu" wrote: > hai., > My requirement is to first execute one bolt and upon successful > execution only next bolt have to execute and i am giving the input for the > bolts from same spout.For that for second bolt i am using Thread.sleep() > method, its working fine but have performance issues.Can anyone help me if > there is any alternative for this problem. > > Here is my sample code: > > Topology: > > public class Topology { > > ConnectionProvider cp; > protected static final String JDBC_CONF = "jdbc.conf"; > protected static final String TABLE_NAME = "users"; > > public static void main(String[] args) throws Exception{ > String argument = args[0]; > JdbcMapper jdbcMapper; > TopologyBuilder builder = new TopologyBuilder(); > Map map = Maps.newHashMap(); > map.put("dataSourceClassName", "org.postgresql.ds.PGSimpleDataSource"); > > map.put("dataSource.url","jdbc:postgresql://localhost:5432/analysis?user=postgres"); > ConnectionProvider cp = new MyConnectionProvider(map); > > jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, cp); > > List schemaColumns = Lists.newArrayList(new Column("user_id", > Types.INTEGER), new Column ("user_name",Types.VARCHAR),new > Column("create_date", Types.TIMESTAMP)); > > JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns); > > PsqlBolt userPersistanceBolt = new PsqlBolt(cp, mapper) > .withInsertQuery("insert into user_details (id, user_name, > created_timestamp) values (?,?,?)"); > > builder.setSpout("myspout", new UserSpout(), 1); > > builder.setBolt("Psql_Bolt", > userPersistanceBolt,1).shuffleGrouping("myspout"); > > jdbcMapper = new SimpleJdbcMapper("My_details", cp); > > List schemaColumns1 = Lists.newArrayList(new Column("my_id", > Types.INTEGER), new Column ("my_name",Types.VARCHAR)); > > JdbcMapper mapper1 = new SimpleJdbcMapper(schemaColumns1); > > PsqlBolt1 userPersistanceBolt1 = new PsqlBolt1(cp, mapper1) > .withInsertQuery("insert into My_details (my_id, my_name) values (?,?)"); > > //builder.setSpout("myspout", new UserSpout(), 1); > > builder.setBolt("Psql_Bolt1", > userPersistanceBolt1,1).shuffleGrouping("myspout"); > Config conf = new Config(); > conf.put(JDBC_CONF, map); > conf.setDebug(true); > conf.setNumWorkers(3); > > if (argument.equalsIgnoreCase("runLocally")) > { System.out.println("Running topology locally..."); LocalCluster cluster > = new LocalCluster(); cluster.submitTopology("Twitter Test > Storm-postgresql", conf, builder.createTopology()); } > > else > { System.out.println("Running topology on cluster..."); > StormSubmitter.submitTopology("Topology_psql", conf, > builder.createTopology()); } > > }} > > PsqlBolt: > > public class PsqlBolt extends AbstractJdbcBolt { > private static final Logger LOG = Logger.getLogger(PsqlBolt.class); > private String tableName; > private String insertQuery; > private JdbcMapper jdbcMapper; > > public PsqlBolt(ConnectionProvider connectionProvider, JdbcMapper > jdbcMapper) > { super(connectionProvider); this.jdbcMapper = jdbcMapper; } > public PsqlBolt withTableName(String tableName) { this.tableName = > tableName; return this; } > > public PsqlBolt withInsertQuery(String insertQuery) { this.insertQuery = > insertQuery; System.out.println("query passsed....."); return this; } > @Override > public void prepare(Map map, TopologyContext topologyContext, > OutputCollector collector) { > super.prepare(map, topologyContext, collector); > if(StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQuery)) { > throw new IllegalArgumentException("You must supply either a tableName or > an insert Query."); } > } > > @Override > public void execute(Tuple tuple) { > try { > > List columns = jdbcMapper.getColumns(tuple); > List> columnLists = new ArrayList>(); > columnLists.add(columns); > if(!StringUtils.isBlank(tableName)) { > this.jdbcClient.insert(this.tableName, columnLists); } else { > this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists); } > this.collector.ack(tuple); > } catch (Exception e) { this.collector.reportError(e); > this.collector.fail(tuple); } > } > > @Override > public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) > { > > }} > > > PsqlBolt1: > > public class PsqlBolt1 extends AbstractJdbcBolt { > private static final Logger LOG = Logger.getLogger(PsqlBolt1.class); > > private String tableName; > private String insertQuery; > private JdbcMapper jdbcMapper; > > public PsqlBolt1(ConnectionProvider connectionProvider, JdbcMapper > jdbcMapper) { super(connectionProvider); this.jdbcMapper = jdbcMapper; } > > public PsqlBolt1 withInsertQuery(String insertQuery) > { this.insertQuery = insertQuery; System.out.println("query > passsed....."); return this; } > > @Override > public void prepare(Map map, TopologyContext topologyContext, > OutputCollector collector) { > super.prepare(map, topologyContext, collector); > if(StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQuery)) > { throw new IllegalArgumentException("You must supply either a tableName > or an insert Query."); } > > } > > @Override > public void execute(Tuple tuple) { > try { > Thread.sleep(1000); > List columns = jdbcMapper.getColumns(tuple); > List> columnLists = new ArrayList>(); > columnLists.add(columns); > if(!StringUtils.isBlank(tableName)) > { this.jdbcClient.insert(this.tableName, columnLists); } > > else > { this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists); } > > this.collector.ack(tuple); > } catch (Exception e) > { this.collector.reportError(e); this.collector.fail(tuple); } > > } > > @Override > public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) > { > > }} > --047d7bdc0ce0b74eca052ab8f9a8 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable

You should wire the bolts one after the other, and the first= will emit the tuple to the second only when it has to. Don't use sleep= , that's probably not correct anyways.

On Jan 31, 2016 11:22 PM, "sujitha chinnu&q= uot; <chinnusujitha28@gmail= .com> wrote:
=
hai.,
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0My requirement is to first execute one= bolt and upon successful execution only next bolt have to execute and i am= giving the input for the bolts from same spout.For that for second bolt i = am using Thread.sleep() method, its working fine but have performance issue= s.Can anyone help me if there is any alternative for this problem.

Here is my sample= code:

Topology:

public class Topology {

ConnectionProvider cp;
protected static final String JDBC_CONF =3D = "jdbc.conf";
protected static final String TABLE_NAME =3D &quo= t;users";

public static void main(String[] args) throws E= xception{
String argument =3D args[0];
JdbcMapper jdbcMa= pper;
TopologyBuilder builder =3D new TopologyBuilder();
Map map =3D = Maps.newHashMap();
map.put("dataSourceClassName", "org.po= stgresql.ds.PGSimpleDataSource");
map.put("dataSource.url"= ;,"jdbc:postgresql://localhost:5432/analysis?user=3Dpostgres");ConnectionProvider cp =3D new MyConnectionProvider(map);

jdb= cMapper =3D new SimpleJdbcMapper(TABLE_NAME, cp);

List<Colu= mn> schemaColumns =3D Lists.newArrayList(new Column("user_id",= Types.INTEGER), new Column ("user_name",Types.VARCHAR),new Colum= n("create_date", Types.TIMESTAMP));

JdbcMapper mappe= r =3D new SimpleJdbcMapper(schemaColumns);

PsqlBolt userPersis= tanceBolt =3D new PsqlBolt(cp, mapper)
.withInsertQuery("insert int= o user_details (id, user_name, created_timestamp) values (?,?,?)");

builder.setSpout("myspout", new UserSpout(), 1);

builder.setBolt("Psql_Bolt", userPersistanceBolt,1).shuff= leGrouping("myspout");

jdbcMapper =3D new SimpleJdbc= Mapper("My_details", cp);

List<Column> schemaC= olumns1 =3D Lists.newArrayList(new Column("my_id", Types.INTEGER)= , new Column ("my_name",Types.VARCHAR));

JdbcMapper = mapper1 =3D new SimpleJdbcMapper(schemaColumns1);

PsqlBolt1 us= erPersistanceBolt1 =3D new PsqlBolt1(cp, mapper1)
.withInsertQuery("= ;insert into My_details (my_id, my_name) values (?,?)");

//builder.setSpout("myspout", new UserSpout(), 1);

builder.setBolt("Psql_Bolt1", userPersistanceBolt1,1).shuffleGr= ouping("myspout");
Config conf =3D new Config();
conf.put(J= DBC_CONF, map);
conf.setDebug(true);
conf.setNumWorkers(3);

if (argument.equalsIgnoreCase("runLocally"))

{ System.out.println("Run= ning topology locally..."); LocalCluster cluster =3D new LocalCluster(= ); cluster.submitTopology("Twitter Test Storm-postgresql", conf, = builder.createTopology()); }

else

{ System.out.println("Running topolo= gy on cluster..."); StormSubmitter.submitTopology("Topology_psql&= quot;, conf, builder.createTopology()); }

}}

P= sqlBolt:

public class PsqlBolt extends AbstractJdbcBolt {
p= rivate static final Logger LOG =3D Logger.getLogger(PsqlBolt.class);
pri= vate String tableName;
private String insertQuery;
private JdbcMapper= jdbcMapper;

public PsqlBolt(ConnectionProvider connectionProv= ider, JdbcMapper jdbcMapper)

{ super(connectionProvider); this.jdbcMapper =3D jdbcMapper; }=
public PsqlBolt withTableName(Str= ing tableName) { this.tableName =3D tableName; return this; }

public PsqlBolt withInsertQuery(String insertQuery) { this.insertQue= ry =3D insertQuery; System.out.println("query passsed....."); ret= urn this; }
@Override
public void prepare(Map map, TopologyContext t= opologyContext, OutputCollector collector) {
super.prepare(map, topologyContext, collector);
if(StringUtils.isBlank(tableName) && Stri= ngUtils.isBlank(insertQuery)) { throw new IllegalArgumentException("Yo= u must supply either a tableName or an insert Query."); }
}

@Override
public void execute(Tuple tuple) {
try {

List<Column> co= lumns =3D jdbcMapper.getColumns(tuple);
List<List<Column>> columnLists =3D new ArrayList<List&= lt;Column>>();
columnLists.a= dd(columns);
if(!StringUtils.isBla= nk(tableName)) { this.jdbcClient.insert(this.tableName, columnLists); } els= e { this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists); }
this.collector.ack(tuple);} catch (Exception e) { this.collector.repo= rtError(e); this.collector.fail(tuple); }
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDecl= arer) {

}}


PsqlBolt1:

public class P= sqlBolt1 extends AbstractJdbcBolt {

private String tableName;
private String insertQuery;
private JdbcMapper jdbcMapper;

<= span style=3D"color:rgb(51,51,51);font-family:Arial,sans-serif;font-size:14= px;line-height:20px;background-color:rgb(240,240,240)">public PsqlBolt1(Con= nectionProvider connectionProvider, JdbcMapper jdbcMapper) { super(connecti= onProvider); this.jdbcMapper =3D jdbcMapper; }

public PsqlB= olt1 withInsertQuery(String insertQuery)

{ this.insertQuery =3D insertQuery; System.out.pri= ntln("query passsed....."); return this; }

@Overr= ide
public void prepare(Map map, TopologyContext topologyContext, Output= Collector collector) {
super.prepare(map, topologyContext, collector);if(StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQu= ery))

{ throw ne= w IllegalArgumentException("You must supply either a tableName or an i= nsert Query."); }

}

@Override
public v= oid execute(Tuple tuple) {
try {
Thread.sleep(1000);
List<Colum= n> columns =3D jdbcMapper.getColumns(tuple);
List<List<Column&g= t;> columnLists =3D new ArrayList<List<Column>>();
column= Lists.add(columns);
if(!StringUtils.isBlank(tableName))

{ this.jdbcClient.insert(this.= tableName, columnLists); }

else

{ this.jdbcClient.executeInsertQuery(this.i= nsertQuery, columnLists); }

this.collector.ack(tuple);
}= catch (Exception e)

{ this.collector.reportError(e); this.collector.fail(tuple); }<= p style=3D"margin:10px 0px 0px;padding:0px;color:rgb(51,51,51);font-family:= Arial,sans-serif;font-size:14px;line-height:20px;background-color:rgb(240,2= 40,240)">}

@Override
public void declareOutputFields(Output= FieldsDeclarer outputFieldsDeclarer) {

}}

--047d7bdc0ce0b74eca052ab8f9a8--