storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Montalenti <and...@parsely.com>
Subject Re: Storm 0.9.2 fault tolerance and scalability issue
Date Wed, 23 Jul 2014 17:41:21 GMT
Ivan,

You don't happen to be running Ubuntu 14.04 on Xen kernel, do you? Eg on
Amazon EC2.

I discovered an issue where running Storm across many workers on that OS
led to me hitting an annoying network driver bug that would cause timeouts
and topology freezes like you are seeing. Check dmesg for odd messages from
your network stack. Just a guess.

-AM
On Jul 23, 2014 9:01 AM, "Ivan Bondarenko" <bondarenko.ivan.v@gmail.com>
wrote:

> Hi,
>
> We are trying to run new storm version with Netty (currently we are using
> 0.9.0.1 with ZeroMQ) and continuously getting a Netty error when:
> - trying to scale to 6 workers (from the start)
> - running on 3 workers and manually killing one of them to check how it
> will recover
> After that it hangs and stays in that state forever.
>
> *Errors which we are getting:*
> This one:
> 2014-07-23 08:06:13 Topology: Tuple-bomb-Topology [ERROR]
> backtype.storm.messaging.netty.StormServerHandler server errors in
> handling the request
> java.lang.NullPointerException
> at backtype.storm.messaging.netty.Server.groupMessages(Server.java:124)
> at backtype.storm.messaging.netty.Server.enqueue(Server.java:163)
> at
> backtype.storm.messaging.netty.StormServerHandler.messageReceived(StormServerHandler.java:54)
> at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)
> at
> org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:317)
> at
> org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:299)
> at
> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216)
> at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
> at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
> at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:350)
> at
> org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:281)
> at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:201)
> at
> org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
>
> Or this one:
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:64)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:107)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:88)
> at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
>
> Or this one:
> java.nio.channels.ClosedChannelException
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.cleanUpWriteBuffer(AbstractNioWorker.java:409)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromUserCode(AbstractNioWorker.java:127)
> at
> org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:83)
> at org.jboss.netty.channel.Channels.write(Channels.java:725)
> at
> org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:71)
> at
> org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59)
> at org.jboss.netty.channel.Channels.write(Channels.java:704)
> at org.jboss.netty.channel.Channels.write(Channels.java:671)
>
> *Our environment are - 3 worker nodes + 1 Nimbus nodes:*
> CentOS release 6.3 (Final) (x86_64)
> MemTotal:       32880716 kB
> CPU: Intel(R) Xeon(R) CPU           X5675  @ 3.07GHz (2 processors, 8
> cores)
>
> *Topology code, we are using to test:*
> public class Main {
> private static final Logger LOG = LoggerFactory.getLogger(Main.class);
> public static final String TOPOLOGY_NAME = "Tuple-bomb-Topology";
>
> public static void main(String[] args) {
> try {
> LaunchOptions launchOptions = new LaunchOptions();
> CmdLineParser parser = new CmdLineParser(launchOptions);
> parser.parseArgument(args);
>
> if (launchOptions.isHelp()){
> parser.printUsage(System.out);
> System.exit(0);
> }
>
> TridentTopology topology = new TridentTopology();
> Stream stream = topology.newStream(TOPOLOGY_NAME, getSpout());
> stream = stream.shuffle().name("Processing")
> .each(new Fields("id"), getProcessingFunction(), new
> Fields()).parallelismHint(launchOptions.getParallelism());
>
> stream.shuffle().name("Counter")
> .each(new Fields("id"), getReporter(launchOptions.getReportInterval()),
> new Fields()).parallelismHint(1);
>
> Config conf = new Config();
> conf.put("topology.workers", launchOptions.getWorkersNumber());
> int n = 1000;
> conf.put("topology.trident.batch.emit.interval.millis", n * 1000.0 /
> launchOptions.getRate());
> conf.put("topology.spout.max.batch.size", n);
>
> StormSubmitter.submitTopology(TOPOLOGY_NAME, conf, topology.build());
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> }
>
> private static BaseFunction getProcessingFunction() {
> return new BaseFunction() {
> @Override
> public void execute(TridentTuple objects, TridentCollector
> tridentCollector) {
> for (int i = 0; i < 1000; i++) {
> Random random = new Random();
> Math.cos(random.nextDouble());
> }
> // LOG.info("Process");
> tridentCollector.emit(new Values());
> }
> };
> }
>
> private static BaseRichSpout getSpout() {
> return new BaseRichSpout() {
>
> SpoutOutputCollector collector;
> long messageId;
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("id"));
> }
>
> @Override
> public void open(Map conf, TopologyContext context, SpoutOutputCollector
> collector) {
> this.collector = collector;
> }
>
> @Override
> public void nextTuple() {
> collector.emit(new Values(messageId++));
> }
> };
> }
>
> private static BaseFunction getReporter(final int reportInterval) {
> return new BaseFunction() {
> long tuplesPerReport = reportInterval;
> long counter = 0;
> long lastReportTime = 0;
>
> @Override
> public void execute(TridentTuple objects, TridentCollector
> tridentCollector) {
> if (lastReportTime == 0) {
> lastReportTime = System.currentTimeMillis();
> }
> counter++;
> if (counter % tuplesPerReport == 0) {
> long time = System.currentTimeMillis();
> String rate = String.format("%.2f", 1000.0 * tuplesPerReport / (time -
> lastReportTime));
> lastReportTime = time;
> LOG.info("Processed {} tuples with rate {}ps", tuplesPerReport, rate);
> }
> }
> };
> }
> }
>
> *Storm settings (storm.yaml):*
> nimbus.childopts: "-Xmx2g -Djava.net.preferIPv4Stack=true"
> ui.childopts: "-Xmx2g -Djava.net.preferIPv4Stack=true"
> supervisor.childopts: "-Xmx2g -Djava.net.preferIPv4Stack=true"
> worker.childopts: "-Xms3G -Xmx6G -Djava.net.preferIPv4Stack=true
> -XX:MaxTenuringThreshold=1 -XX:SurvivorRatio=6  -XX:+UseParNewGC
> -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled
>  -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
>  -server -XX:+AggressiveOpts -XX:+UseCompressedOops"
> worker.heartbeat.frequency.secs: 1
>
> storm.messaging.transport: "backtype.storm.messaging.netty.Context"
> storm.messaging.netty.server_worker_threads: 2 #tried with different
> number of thread - 1, 4, 8
> storm.messaging.netty.client_worker_threads: 2
> storm.messaging.netty.buffer_size: 5242880
> storm.messaging.netty.max_retries: 100 #tried with different number of
> retries - 1000, 10000
> storm.messaging.netty.max_wait_ms: 20000 #tried with different number of
> wait - 1000, 10000
> storm.messaging.netty.min_wait_ms: 100
>
> There are no firewalls, no network policies or issues, dns lookups/etc -
> nodes directly connects to each other. Please help to resolve this issue.
> What we are doing wrong?
>
> Best regards,
> Ivan
>
>

Mime
View raw message