Hi William,

I'm having the same problem running a multilang topology (written in python). If you find a solution, please post it here, it will sure help us.

To upgrade from 0.9.2-incubating we updated storm.py (https://raw.githubusercontent.com/apache/storm/master/storm-core/src/multilang/py/storm.py) and pom.xml.

Downgrading to 0.9.2-incubating (downgrading storm.py and pom.xml), it works like hell.

Best regards,

On Tue, Feb 10, 2015 at 8:59 PM, William Oberman <oberman@civicscience.com> wrote:
I'm not sure the best way to share a test case.  I'll copy and paste code below....  If you run the below code (and find the worker that was running it's log file), you should see in ~30 seconds:
====
2015-02-10T14:34:02.649-0500 b.s.s.ShellSpout [ERROR] Halting process: ShellSpout died.
java.lang.RuntimeException: subprocess heartbeat timeout
        at backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255) [storm-core-0.9.3.jar:0.9.3]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_71]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) [na:1.7.0_71]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) [na:1.7.0_71]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.7.0_71]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
2015-02-10T14:34:02.649-0500 b.s.d.executor [ERROR] 
java.lang.RuntimeException: subprocess heartbeat timeout
        at backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255) [storm-core-0.9.3.jar:0.9.3]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_71]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) [na:1.7.0_71]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) [na:1.7.0_71]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.7.0_71]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
=======

But, the topology will run in a kind of weird zombie state forever.  More specifically I see the multilang bolt process all tuples in the pending queue, and then an infinite loop of nextTuple()/fail() from the multilang spout.  But, as noted in my original email, if I comment out:
 _collector.reportError(exception);
in the Java ShellSpout then the worker will immediately die and respawn.

If no one can help, the next step for me is rough, as I'll have to learn how to actually develop and debug storm itself, which is usually at least 10x harder than just using something :-)

In any case, my test code:

Topology = 1 process with two tasks (multilang spout and bolt), and small pool of pending messages (yes, using the word count example in storm-starter as a starting point....)
=============
public class SlowTopology {
  public static class SlowPhpBolt extends ShellBolt implements IRichBolt {

    public SlowPhpBolt() {
      super("php", "slowBolt.php");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields());
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
      return null;
    }
  }

  public static class SlowPhpSpout extends ShellSpout implements IRichSpout {

      public SlowPhpSpout() {
          super("php", "slowSpout.php");
      }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
        ofd.declare(new Fields("output"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
  }

  public static void main(String[] args) throws Exception {

    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("spout", new SlowPhpSpout(), 1).setNumTasks(1).setMaxSpoutPending(3);
    builder.setBolt("bolt", new SlowPhpBolt(), 1).setNumTasks(1).shuffleGrouping("spout");

    Config conf = new Config();
    conf.setDebug(true);

    if (args != null && args.length > 0) {
      conf.setNumWorkers(1);
      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
    }
    else {
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("slow", conf, builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
    }
  }
}
===========

slowSpout.php 
==========
<?php
require_once "storm.php";
class slowSpout extends \ShellSpout {
  protected function nextTuple() {
    $value = rand(0,100);
    $id = rand(0, 100);
    $this->emit(array($value), $id);
    file_put_contents("/tmp/storm_slow.log", "nextTuple()->value[$value] id[$id]\n", FILE_APPEND);
    sleep(1);
  }
  protected function ack($id) {
    file_put_contents("/tmp/storm_slow.log", "ack($id)\n", FILE_APPEND);
  }

  protected function fail($id) {
    file_put_contents("/tmp/storm_slow.log", "fail($id)\n", FILE_APPEND);
  }
}

(new slowSpout())->run();
===========

slowBolt.php
============
<?php
require_once "storm.php";
class slowBolt extends \BasicBolt {
  protected function process(\Tuple $t) {
    $sleep = rand(1, 180);
    file_put_contents("/tmp/storm_slow.log", "process(".print_r($t, true)."), sleeping for sleep[$sleep]\n", FILE_APPEND);    
    sleep($sleep);
  }
}
(new slowBolt())->run();
============

storm.php  (from  https://github.com/lazyshot/storm-php, and I think I added more error checking on reads/writes to standard in/out, added sync() to the ShellSpout to make new classes easier to write, and the new heartbeat protocol)
=========
<?php
interface iShellBolt {
}

interface iShellSpout {
}

class Tuple {
    public $id, $component, $stream, $task, $values;

    public function __construct($id, $component, $stream, $task, $values) {
        $this->id = $id;
        $this->component = $component;
        $this->stream = $stream;
        $this->task = $task;
        $this->values = $values;
    }
}

abstract class ShellComponent {
    protected $pid;
    protected $stormConf;
    protected $topologyContext;

    protected $stormInc = null;

    public function __construct() {
        $this->pid = getmypid();
        $this->sendCommand(array("pid" => $this->pid));

        $handshake = $this->parseMessage($this->waitForMessage());

        $this->stormConf = $handshake['conf'];
        $this->topologyContext = $handshake['context'];
        $pidDir = $handshake['pidDir'];

        @fclose(@fopen($pidDir . "/" . $this->pid, "w"));
    }

    protected function readLine() {
        $raw = fgets(STDIN);

        if ($raw === false) {
            throw new Exception("STDIN is broken");
        }

        $line = trim($raw);

        return $line;
    }

    protected function waitForMessage() {
        $message = '';
        while (true) {
            $line = trim($this->readLine());

            if (strlen($line) == 0) {
                continue;
            } else if ($line == 'end') {
                break;
            } else if ($line == 'sync') {
                $message = '';
                continue;
            }

            $message .= $line . "\n";
        }

        return trim($message);
    }

    protected function sendCommand(array $command) {
        $this->sendMessage(json_encode($command));
    }

    protected function sendLog($message) {
        return $this->sendCommand(array(
            'command' => 'log',
            'msg' => $message
        ));
    }

    protected function parseMessage($message) {
        $msg = json_decode($message, true);

        if ($msg) {
            return $msg;
        } else {
            return $message;
        }
    }

    protected function sendMessage($message) {
        $message = "$message\nend\n";
        $bytesWritten = fwrite(STDOUT, $message);
        fflush(STDOUT);
        if ($bytesWritten === false) {
            throw new Exception("STDOUT is broken");
        }
        if ($bytesWritten != strlen($message)) {
            throw new Exception("Unable to write all bytes to STDOUT (message=$message, bytesWritten=$bytesWritten)");
        }
    }

    final protected function sync() {
        $command = array(
            'command' => 'sync',
        );

        $this->sendCommand($command);
    }

}

abstract class ShellBolt extends ShellComponent implements iShellBolt {

    public $anchor_tuple = null;

    public function __construct() {
        parent::__construct();

        $this->init($this->stormConf, $this->topologyContext);
    }

    public function run() {
        try {
            while (true) {
                $command = $this->parseMessage($this->waitForMessage());

                if (is_array($command)) {
                    if (isset($command['tuple'])) {
                        $tupleMap = array_merge(array(
                                'id' => null,
                                'comp' => null,
                                'stream' => null,
                                'task' => null,
                                'tuple' => null
                            ),

                            $command);

                        if($tupleMap['task'] == -1 && $tupleMap['stream'] == "__heartbeat") {
                            $this->sync();
                        } else {
                            $tuple = new Tuple($tupleMap['id'], $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'], $tupleMap['tuple']);
                            $this->process($tuple);
                        }
                    }
                }
            }
        } catch (Exception $e) {
            $this->sendLog((string)$e);
        }
    }

    abstract protected function process(Tuple $tuple);

    protected function init($conf, $topology) {
        return;
    }

    protected function emitTuple(array $tuple, $stream = null, $anchors = array(), $directTask = null) {
        if ($this->anchor_tuple !== null) {
            $anchors = array($this->anchor_tuple);
        }

        $command = array(
            'command' => 'emit'
        );

        if ($stream !== null) {
            $command['stream'] = $stream;
        }

        $command['anchors'] = array_map(function ($a) {
            return $a->id;
        }, $anchors);

        if ($directTask !== null) {
            $command['task'] = $directTask;
        }

        $command['tuple'] = $tuple;

        $this->sendCommand($command);
    }

    protected function emit($tuple, $stream = null, $anchors = array()) {
        $this->emitTuple($tuple, $stream, $anchors);
    }

    protected function emitDirect($directTask, $tuple, $stream = null, $anchors = array()) {
        $this->emitTuple($tuple, $stream, $anchors, $directTask);
    }

    protected function ack(Tuple $tuple) {
        $command = array(
            'command' => 'ack',
            'id' => $tuple->id
        );

        $this->sendCommand($command);
    }

    protected function fail(Tuple $tuple) {
        $command = array(
            'command' => 'fail',
            'id' => $tuple->id
        );

        $this->sendCommand($command);
    }
}

abstract class BasicBolt extends ShellBolt {
    public function run() {
        try {
            while (true) {
                $command = $this->parseMessage($this->waitForMessage());

                if (is_array($command)) {
                    if (isset($command['tuple'])) {
                        $tupleMap = array_merge(array(
                                'id' => null,
                                'comp' => null,
                                'stream' => null,
                                'task' => null,
                                'tuple' => null
                            ),

                            $command);

                        if($tupleMap['task'] == -1 && $tupleMap['stream'] == "__heartbeat") {
                            $this->sync();
                        } else {
                            $tuple = new Tuple($tupleMap['id'], $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'], $tupleMap['tuple']);

                            $this->anchor_tuple = $tuple;

                            try {
                                $processed = $this->process($tuple);

                                $this->ack($tuple);
                            } catch (BoltProcessException $e) {
                                $this->fail($tuple);
                            }
                        }
                    }
                }
            }
        } catch (Exception $e) {
            $this->sendLog((string)$e);
        }

    }
}

abstract class ShellSpout extends ShellComponent implements iShellSpout {
    protected $tuples = array();

    public function __construct() {
        parent::__construct();

        $this->init($this->stormConf, $this->topologyContext);
    }


    abstract protected function nextTuple();

    abstract protected function ack($tuple_id);

    abstract protected function fail($tuple_id);

    public function run() {
        try {
            while (true) {
                $command = $this->parseMessage($this->waitForMessage());

                if (is_array($command)) {
                    if (isset($command['command'])) {
                        if ($command['command'] == 'ack') {
                            $this->ack($command['id']);
                            $this->sync();
                        } else if ($command['command'] == 'fail') {
                            $this->fail($command['id']);
                            $this->sync();
                        } else if ($command['command'] == 'next') {
                            $this->nextTuple();
                            $this->sync();
                        }
                    }
                }
            }
        } catch (Exception $e) {
            $this->sendLog((string)$e);
            $this->sync();
        }
    }

    protected function init($stormConf, $topologyContext) {
        return;
    }

    final protected function emit(array $tuple, $messageId = null, $streamId = null) {
        return $this->emitTuple($tuple, $messageId, $streamId, null);
    }

    final protected function emitDirect($directTask, array $tuple, $messageId = null, $streamId = null) {
        return $this->emitTuple($tuple, $messageId, $streamId, $directTask);
    }

    final private function emitTuple(array $tuple, $messageId = null, $streamId = null, $directTask = null) {
        $command = array(
            'command' => 'emit'
        );

        if ($messageId !== null) {
            $command['id'] = $messageId;
        }

        if ($streamId !== null) {
            $command['stream'] = $streamId;
        }

        if ($directTask !== null) {
            $command['task'] = $directTask;
        }

        $command['tuple'] = $tuple;

        return $this->sendCommand($command);
    }
}

class BoltProcessException extends Exception {
}

=========================


On Fri, Feb 6, 2015 at 9:48 AM, William Oberman <oberman@civicscience.com> wrote:
Hi,

For reference, I'm talking about 0.9.3 ShellSpout, line 234.

I'll try to cover the important facts that led to this issue:

-I was on 0.9.2 using multilang to bridge to PHP to get to some existing business logic

-I'm testing the 0.9.3 upgrade (yes, I see the new heartbeat addition to the ShellBolt protocol)

-I have some odd topologies where I try to do some legacy background processing.  This processing takes a highly variable amount time in the Bolts, from milliseconds to minutes.  But, eventually due to randomness the spout's "pending" pool fills up, causing the spout to block on nextTuple, which eventually causes a heartbeat timeout. (I believe my only fix is to increase the heartbeat timeout at the topology level. that's not the purpose of this email, though confirmation of this as my only workaround would be appreciated!  I feel like this wasn't anticipated when the heartbeat patch was designed, as it was assumed the spout's nextTuple wouldn't block I guess?)

-The purpose of this email is the fact that the topology "jams up" when the ShellSpout has a heartbeat timeout.  I can see my PHP spout/bolt still running (I added logging to them), but Storm itself is doing nothing.

-I added logging to ShellSpout and recompiled, because I saw the log message on like 233 (Halting process: ShellSpout died) but as noted the PHP process was still running, so I was curious if _process.destroy(); failed.  But, my logging didn't appear.  I assumed I was compiling/deploying wrong.  Eventually I commented out line 234: _collector.reportError(exception);  and everything started working!!!

Does this make *any* sense?  Why would _collector.reportError(exception); block and never return (I waited quite a long time, 10's of minutes).  When I comment out line 234, Storm immediately kills my bad tasks and respawns almost instantly.

I feel fairly confident that this will be recreatable.  My topology:
-1 spout (ShellSpout)
-1 bolt (ShellBolt)
-The ShellSpout has a heartbeat timeout due to slow tasks in ShellBolt + the pending queue is full

Thanks for any feedback!  

will

 





--
Alex Sobrino Beltrán
Registered Linux User #273657

http://v5tech.es