incubator-s4-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sky Zhao <sky.z...@ericsson.com>
Subject RE: About 200,000 events/s
Date Mon, 24 Jun 2013 10:00:09 GMT
I create new separate project with Adapter to send s4 events, how to improve the code or configuration,
give me some suggestions pls.
My java code here:





public class CsvFileAdapter extends AdapterApp {

               @Inject
               @Named("metrics.path")
               String mpath;

               @Inject
               @Named("file.csv")
               String csvFileName;

               private static Logger logger = LoggerFactory
                                             .getLogger(CsvFileAdapter.class);

               private Thread t;
               private StringBuffer sb = new StringBuffer();
               private long nEvents = 0;
               long tStop = 0;
               long tStart = 0;

               @Override
               protected void onClose() {
               }

               @Override
               protected void onInit() {
                              super.onInit();

                              t = new Thread();

               }

               public void openAndRead() throws Exception {

                              if (mpath != null && !mpath.isEmpty()) {
                                             CsvReporter.enable(new File(mpath), 20, TimeUnit.SECONDS);
                              }

                              Reader inputReader = null;
                              BufferedReader br = null;

                              String kpi_name;
                              String mo_name;
                              long timestamp;
                              double kpi_value;

                              tStart = System.currentTimeMillis();

                              String csvPath = csvFileName;

                              if (mpath != null && !mpath.isEmpty()) {
                                             FileInputStream fis = new FileInputStream(csvPath);
                                             inputReader = new InputStreamReader(fis);
                                             br = new BufferedReader(inputReader);

                                             String delimiter = ",";
                                             String s;

                                             while ((s = br.readLine()) != null) {

                                                            nEvents++;

                                                            String[] temp = s.replace("\"","").split(delimiter);

                                                            kpi_name = temp[0];
                                                            mo_name = temp[1];
                                                            timestamp = Long.valueOf(temp[2]);
                                                            kpi_value = Double.valueOf(temp[3]);

                                                            sb.append("").append(kpi_name).append(",").append(mo_name)
                                                                                         
.append(",").append(timestamp).append(",")
                                                                                         
.append(kpi_value);

                                                            // ////////sending to s4
                                                            DataEvent event = new DataEvent(kpi_name,
mo_name, kpi_value,
                                                                                         
timestamp);

                                                            event.put("seadata", String.class,
sb.toString());
                                                            sb.setLength(0);
                                                            getRemoteStream().put(event);

                                             }
                                             fis.close();
                                             br.close();
                                             inputReader.close();
                                             tStop = System.currentTimeMillis();

                                             Gauge<Long> g2 = Metrics.newGauge(CsvFileAdapter.class,
                                                                           "sea total event
length", new Gauge<Long>() {
                                                                                         
@Override
                                                                                         
public Long value() {
                                                                                         
               return nEvents;
                                                                                         
}
                                                                           });

                                             Metrics.newGauge(CsvFileAdapter.class, "sea total
event time",
                                                                           new Gauge<Long>()
{
                                                                                         
@Override
                                                                                         
public Long value() {
                                                                                         
               return (tStop - tStart);
                                                                                         
}
                                                                           });
                              }

               }

               @Override
               protected void onStart() {
                              try {
                                             t.start();
                                             openAndRead();
                              } catch (Exception e) {
                                             throw new RuntimeException(e);
                              }
               }




From: Sky Zhao [mailto:sky.zhao@ericsson.com]
Sent: Monday, June 24, 2013 5:20 PM
To: s4-user@incubator.apache.org
Subject: About 200,000 events/s

I try to use Adapter to send s4 events. With metrics report,
20,10462,88.63259092217602,539.6449108859357,18.577650313690874,6.241814566462701
40,36006,417.83633322358764,914.1057643161282,97.55624823196746,33.40088245418529
60,63859,674.1012974987167,1075.2326549158463,176.33878995148274,61.646803531230724
80,97835,953.6282787690939,1232.2934375999696,271.48890371088254,96.56144395108957
100,131535,1162.2060916405578,1323.3704459079934,363.98505627735324,131.98430793014757
120,165282,1327.52314133145,1384.2675551261093,453.5195236495672,167.61679021575551
140,190776,1305.7285112621298,1368.4361242524062,504.7782182758366,191.36049732440895

20,000 events per 20s  => 1000 EVENTS/s

Very slow, I modify the S4_HOME/subprojects/s4-comm/bin/default.s4.comm.properties

s4.comm.emitter.class=org.apache.s4.comm.tcp.TCPEmitter
s4.comm.emitter.remote.class=org.apache.s4.comm.tcp.TCPRemoteEmitter
s4.comm.listener.class=org.apache.s4.comm.tcp.TCPListener

# I/O channel connection timeout, when applicable (e.g. used by netty)
s4.comm.timeout=1000

# NOTE: the following numbers should be tuned according to the application, use case, and
infrastructure

# how many threads to use for the sender stage (i.e. serialization)
#s4.sender.parallelism=1
s4.sender.parallelism=100
# maximum number of events in the buffer of the sender stage
#s4.sender.workQueueSize=10000
s4.sender.workQueueSize=100000
# maximum sending rate from a given node, in events / s (used with throttling sender executors)
s4.sender.maxRate=200000

# how many threads to use for the *remote* sender stage (i.e. serialization)
#s4.remoteSender.parallelism=1
s4.remoteSender.parallelism=100
# maximum number of events in the buffer of the *remote* sender stage
#s4.remoteSender.workQueueSize=10000
s4.remoteSender.workQueueSize=100000
# maximum *remote* sending rate from a given node, in events / s (used with throttling *remote*
sender executors)
s4.remoteSender.maxRate=200000

# maximum number of pending writes to a given comm channel
#s4.emitter.maxPendingWrites=1000
s4.emitter.maxPendingWrites=10000

# maximum number of events in the buffer of the processing stage
#s4.stream.workQueueSize=10000
s4.stream.workQueueSize=100000

only improve from 500 events 1000 events,

I read file 88m only need 8s, but send events cost 620s now for 1,237,632 events, why slow,
s4 can trigger 200,000 events/s, how I can do up to this values, pls give me detail instructions.





Mime
View raw message