Hi William, I'm having the same problem running a multilang topology (written in python). If you find a solution, please post it here, it will sure help us. To upgrade from 0.9.2-incubating we updated storm.py ( https://raw.githubusercontent.com/apache/storm/master/storm-core/src/multilang/py/storm.py) and pom.xml. Downgrading to 0.9.2-incubating (downgrading storm.py and pom.xml), it works like hell. Best regards, On Tue, Feb 10, 2015 at 8:59 PM, William Oberman wrote: > I'm not sure the best way to share a test case. I'll copy and paste code > below.... If you run the below code (and find the worker that was running > it's log file), you should see in ~30 seconds: > ==== > 2015-02-10T14:34:02.649-0500 b.s.s.ShellSpout [ERROR] Halting process: > ShellSpout died. > java.lang.RuntimeException: subprocess heartbeat timeout > at > backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255) > [storm-core-0.9.3.jar:0.9.3] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > [na:1.7.0_71] > at > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) > [na:1.7.0_71] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > [na:1.7.0_71] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > [na:1.7.0_71] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > [na:1.7.0_71] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > [na:1.7.0_71] > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71] > 2015-02-10T14:34:02.649-0500 b.s.d.executor [ERROR] > java.lang.RuntimeException: subprocess heartbeat timeout > at > backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255) > [storm-core-0.9.3.jar:0.9.3] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > [na:1.7.0_71] > at > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) > [na:1.7.0_71] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > [na:1.7.0_71] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > [na:1.7.0_71] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > [na:1.7.0_71] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > [na:1.7.0_71] > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71] > ======= > > But, the topology will run in a kind of weird zombie state forever. More > specifically I see the multilang bolt process all tuples in the pending > queue, and then an infinite loop of nextTuple()/fail() from the multilang > spout. But, as noted in my original email, if I comment out: > _collector.reportError(exception); > in the Java ShellSpout then the worker will immediately die and respawn. > > If no one can help, the next step for me is rough, as I'll have to learn > how to actually develop and debug storm itself, which is usually at least > 10x harder than just using something :-) > > In any case, my test code: > > Topology = 1 process with two tasks (multilang spout and bolt), and small > pool of pending messages (yes, using the word count example in > storm-starter as a starting point....) > ============= > public class SlowTopology { > public static class SlowPhpBolt extends ShellBolt implements IRichBolt { > > public SlowPhpBolt() { > super("php", "slowBolt.php"); > } > > @Override > public void declareOutputFields(OutputFieldsDeclarer declarer) { > declarer.declare(new Fields()); > } > > @Override > public Map getComponentConfiguration() { > return null; > } > } > > public static class SlowPhpSpout extends ShellSpout implements > IRichSpout { > > public SlowPhpSpout() { > super("php", "slowSpout.php"); > } > > @Override > public void declareOutputFields(OutputFieldsDeclarer ofd) { > ofd.declare(new Fields("output")); > } > > @Override > public Map getComponentConfiguration() { > return null; > } > } > > public static void main(String[] args) throws Exception { > > TopologyBuilder builder = new TopologyBuilder(); > > builder.setSpout("spout", new SlowPhpSpout(), > 1).setNumTasks(1).setMaxSpoutPending(3); > builder.setBolt("bolt", new SlowPhpBolt(), > 1).setNumTasks(1).shuffleGrouping("spout"); > > Config conf = new Config(); > conf.setDebug(true); > > if (args != null && args.length > 0) { > conf.setNumWorkers(1); > StormSubmitter.submitTopologyWithProgressBar(args[0], conf, > builder.createTopology()); > } > else { > LocalCluster cluster = new LocalCluster(); > cluster.submitTopology("slow", conf, builder.createTopology()); > Thread.sleep(10000); > cluster.shutdown(); > } > } > } > =========== > > slowSpout.php > ========== > require_once "storm.php"; > class slowSpout extends \ShellSpout { > protected function nextTuple() { > $value = rand(0,100); > $id = rand(0, 100); > $this->emit(array($value), $id); > file_put_contents("/tmp/storm_slow.log", "nextTuple()->value[$value] > id[$id]\n", FILE_APPEND); > sleep(1); > } > protected function ack($id) { > file_put_contents("/tmp/storm_slow.log", "ack($id)\n", FILE_APPEND); > } > > protected function fail($id) { > file_put_contents("/tmp/storm_slow.log", "fail($id)\n", FILE_APPEND); > } > } > > (new slowSpout())->run(); > =========== > > slowBolt.php > ============ > require_once "storm.php"; > class slowBolt extends \BasicBolt { > protected function process(\Tuple $t) { > $sleep = rand(1, 180); > file_put_contents("/tmp/storm_slow.log", "process(".print_r($t, > true)."), sleeping for sleep[$sleep]\n", FILE_APPEND); > sleep($sleep); > } > } > (new slowBolt())->run(); > ============ > > storm.php (from https://github.com/lazyshot/storm-php, and I think I > added more error checking on reads/writes to standard in/out, added sync() > to the ShellSpout to make new classes easier to write, and the new > heartbeat protocol) > ========= > interface iShellBolt { > } > > interface iShellSpout { > } > > class Tuple { > public $id, $component, $stream, $task, $values; > > public function __construct($id, $component, $stream, $task, $values) { > $this->id = $id; > $this->component = $component; > $this->stream = $stream; > $this->task = $task; > $this->values = $values; > } > } > > abstract class ShellComponent { > protected $pid; > protected $stormConf; > protected $topologyContext; > > protected $stormInc = null; > > public function __construct() { > $this->pid = getmypid(); > $this->sendCommand(array("pid" => $this->pid)); > > $handshake = $this->parseMessage($this->waitForMessage()); > > $this->stormConf = $handshake['conf']; > $this->topologyContext = $handshake['context']; > $pidDir = $handshake['pidDir']; > > @fclose(@fopen($pidDir . "/" . $this->pid, "w")); > } > > protected function readLine() { > $raw = fgets(STDIN); > > if ($raw === false) { > throw new Exception("STDIN is broken"); > } > > $line = trim($raw); > > return $line; > } > > protected function waitForMessage() { > $message = ''; > while (true) { > $line = trim($this->readLine()); > > if (strlen($line) == 0) { > continue; > } else if ($line == 'end') { > break; > } else if ($line == 'sync') { > $message = ''; > continue; > } > > $message .= $line . "\n"; > } > > return trim($message); > } > > protected function sendCommand(array $command) { > $this->sendMessage(json_encode($command)); > } > > protected function sendLog($message) { > return $this->sendCommand(array( > 'command' => 'log', > 'msg' => $message > )); > } > > protected function parseMessage($message) { > $msg = json_decode($message, true); > > if ($msg) { > return $msg; > } else { > return $message; > } > } > > protected function sendMessage($message) { > $message = "$message\nend\n"; > $bytesWritten = fwrite(STDOUT, $message); > fflush(STDOUT); > if ($bytesWritten === false) { > throw new Exception("STDOUT is broken"); > } > if ($bytesWritten != strlen($message)) { > throw new Exception("Unable to write all bytes to STDOUT > (message=$message, bytesWritten=$bytesWritten)"); > } > } > > final protected function sync() { > $command = array( > 'command' => 'sync', > ); > > $this->sendCommand($command); > } > > } > > abstract class ShellBolt extends ShellComponent implements iShellBolt { > > public $anchor_tuple = null; > > public function __construct() { > parent::__construct(); > > $this->init($this->stormConf, $this->topologyContext); > } > > public function run() { > try { > while (true) { > $command = $this->parseMessage($this->waitForMessage()); > > if (is_array($command)) { > if (isset($command['tuple'])) { > $tupleMap = array_merge(array( > 'id' => null, > 'comp' => null, > 'stream' => null, > 'task' => null, > 'tuple' => null > ), > > $command); > > if($tupleMap['task'] == -1 && $tupleMap['stream'] > == "__heartbeat") { > $this->sync(); > } else { > $tuple = new Tuple($tupleMap['id'], > $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'], > $tupleMap['tuple']); > $this->process($tuple); > } > } > } > } > } catch (Exception $e) { > $this->sendLog((string)$e); > } > } > > abstract protected function process(Tuple $tuple); > > protected function init($conf, $topology) { > return; > } > > protected function emitTuple(array $tuple, $stream = null, $anchors = > array(), $directTask = null) { > if ($this->anchor_tuple !== null) { > $anchors = array($this->anchor_tuple); > } > > $command = array( > 'command' => 'emit' > ); > > if ($stream !== null) { > $command['stream'] = $stream; > } > > $command['anchors'] = array_map(function ($a) { > return $a->id; > }, $anchors); > > if ($directTask !== null) { > $command['task'] = $directTask; > } > > $command['tuple'] = $tuple; > > $this->sendCommand($command); > } > > protected function emit($tuple, $stream = null, $anchors = array()) { > $this->emitTuple($tuple, $stream, $anchors); > } > > protected function emitDirect($directTask, $tuple, $stream = null, > $anchors = array()) { > $this->emitTuple($tuple, $stream, $anchors, $directTask); > } > > protected function ack(Tuple $tuple) { > $command = array( > 'command' => 'ack', > 'id' => $tuple->id > ); > > $this->sendCommand($command); > } > > protected function fail(Tuple $tuple) { > $command = array( > 'command' => 'fail', > 'id' => $tuple->id > ); > > $this->sendCommand($command); > } > } > > abstract class BasicBolt extends ShellBolt { > public function run() { > try { > while (true) { > $command = $this->parseMessage($this->waitForMessage()); > > if (is_array($command)) { > if (isset($command['tuple'])) { > $tupleMap = array_merge(array( > 'id' => null, > 'comp' => null, > 'stream' => null, > 'task' => null, > 'tuple' => null > ), > > $command); > > if($tupleMap['task'] == -1 && $tupleMap['stream'] > == "__heartbeat") { > $this->sync(); > } else { > $tuple = new Tuple($tupleMap['id'], > $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'], > $tupleMap['tuple']); > > $this->anchor_tuple = $tuple; > > try { > $processed = $this->process($tuple); > > $this->ack($tuple); > } catch (BoltProcessException $e) { > $this->fail($tuple); > } > } > } > } > } > } catch (Exception $e) { > $this->sendLog((string)$e); > } > > } > } > > abstract class ShellSpout extends ShellComponent implements iShellSpout { > protected $tuples = array(); > > public function __construct() { > parent::__construct(); > > $this->init($this->stormConf, $this->topologyContext); > } > > > abstract protected function nextTuple(); > > abstract protected function ack($tuple_id); > > abstract protected function fail($tuple_id); > > public function run() { > try { > while (true) { > $command = $this->parseMessage($this->waitForMessage()); > > if (is_array($command)) { > if (isset($command['command'])) { > if ($command['command'] == 'ack') { > $this->ack($command['id']); > $this->sync(); > } else if ($command['command'] == 'fail') { > $this->fail($command['id']); > $this->sync(); > } else if ($command['command'] == 'next') { > $this->nextTuple(); > $this->sync(); > } > } > } > } > } catch (Exception $e) { > $this->sendLog((string)$e); > $this->sync(); > } > } > > protected function init($stormConf, $topologyContext) { > return; > } > > final protected function emit(array $tuple, $messageId = null, > $streamId = null) { > return $this->emitTuple($tuple, $messageId, $streamId, null); > } > > final protected function emitDirect($directTask, array $tuple, > $messageId = null, $streamId = null) { > return $this->emitTuple($tuple, $messageId, $streamId, > $directTask); > } > > final private function emitTuple(array $tuple, $messageId = null, > $streamId = null, $directTask = null) { > $command = array( > 'command' => 'emit' > ); > > if ($messageId !== null) { > $command['id'] = $messageId; > } > > if ($streamId !== null) { > $command['stream'] = $streamId; > } > > if ($directTask !== null) { > $command['task'] = $directTask; > } > > $command['tuple'] = $tuple; > > return $this->sendCommand($command); > } > } > > class BoltProcessException extends Exception { > } > > ========================= > > > On Fri, Feb 6, 2015 at 9:48 AM, William Oberman > wrote: > >> Hi, >> >> For reference, I'm talking about 0.9.3 ShellSpout, line 234. >> >> I'll try to cover the important facts that led to this issue: >> >> -I was on 0.9.2 using multilang to bridge to PHP to get to some existing >> business logic >> >> -I'm testing the 0.9.3 upgrade (yes, I see the new heartbeat addition to >> the ShellBolt protocol) >> >> -I have some odd topologies where I try to do some legacy background >> processing. This processing takes a highly variable amount time in the >> Bolts, from milliseconds to minutes. But, eventually due to randomness the >> spout's "pending" pool fills up, causing the spout to block on nextTuple, >> which eventually causes a heartbeat timeout. (I believe my only fix is to >> increase the heartbeat timeout at the topology level. that's not the >> purpose of this email, though confirmation of this as my only workaround >> would be appreciated! I feel like this wasn't anticipated when the >> heartbeat patch was designed, as it was assumed the spout's nextTuple >> wouldn't block I guess?) >> >> -The purpose of this email is the fact that the topology "jams up" when >> the ShellSpout has a heartbeat timeout. I can see my PHP spout/bolt still >> running (I added logging to them), but Storm itself is doing nothing. >> >> -I added logging to ShellSpout and recompiled, because I saw the log >> message on like 233 (Halting process: ShellSpout died) but as noted the PHP >> process was still running, so I was curious if _process.destroy(); failed. >> But, my logging didn't appear. I assumed I was compiling/deploying wrong. >> Eventually I commented out line 234: _collector.reportError(exception); >> and everything started working!!! >> >> Does this make *any* sense? Why would _collector.reportError(exception); >> block and never return (I waited quite a long time, 10's of minutes). When >> I comment out line 234, Storm immediately kills my bad tasks and respawns >> almost instantly. >> >> I feel fairly confident that this will be recreatable. My topology: >> -1 spout (ShellSpout) >> -1 bolt (ShellBolt) >> -The ShellSpout has a heartbeat timeout due to slow tasks in ShellBolt + >> the pending queue is full >> >> Thanks for any feedback! >> >> will >> >> >> > > > -- Alex Sobrino Beltrán Registered Linux User #273657 http://v5tech.es