storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alex Sobrino <a...@v5tech.es>
Subject Re: ShellSpout hangs on reportError?
Date Wed, 11 Feb 2015 08:09:15 GMT
Hi William,

I'm having the same problem running a multilang topology (written in
python). If you find a solution, please post it here, it will sure help us.

To upgrade from 0.9.2-incubating we updated storm.py (
https://raw.githubusercontent.com/apache/storm/master/storm-core/src/multilang/py/storm.py)
and pom.xml.

Downgrading to 0.9.2-incubating (downgrading storm.py and pom.xml), it
works like hell.

Best regards,

On Tue, Feb 10, 2015 at 8:59 PM, William Oberman <oberman@civicscience.com>
wrote:

> I'm not sure the best way to share a test case.  I'll copy and paste code
> below....  If you run the below code (and find the worker that was running
> it's log file), you should see in ~30 seconds:
> ====
> 2015-02-10T14:34:02.649-0500 b.s.s.ShellSpout [ERROR] Halting process:
> ShellSpout died.
> java.lang.RuntimeException: subprocess heartbeat timeout
>         at
> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
> [storm-core-0.9.3.jar:0.9.3]
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> [na:1.7.0_71]
>         at
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> [na:1.7.0_71]
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> [na:1.7.0_71]
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> [na:1.7.0_71]
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> [na:1.7.0_71]
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> [na:1.7.0_71]
>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
> 2015-02-10T14:34:02.649-0500 b.s.d.executor [ERROR]
> java.lang.RuntimeException: subprocess heartbeat timeout
>         at
> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
> [storm-core-0.9.3.jar:0.9.3]
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> [na:1.7.0_71]
>         at
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> [na:1.7.0_71]
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> [na:1.7.0_71]
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> [na:1.7.0_71]
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> [na:1.7.0_71]
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> [na:1.7.0_71]
>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
> =======
>
> But, the topology will run in a kind of weird zombie state forever.  More
> specifically I see the multilang bolt process all tuples in the pending
> queue, and then an infinite loop of nextTuple()/fail() from the multilang
> spout.  But, as noted in my original email, if I comment out:
>  _collector.reportError(exception);
> in the Java ShellSpout then the worker will immediately die and respawn.
>
> If no one can help, the next step for me is rough, as I'll have to learn
> how to actually develop and debug storm itself, which is usually at least
> 10x harder than just using something :-)
>
> In any case, my test code:
>
> Topology = 1 process with two tasks (multilang spout and bolt), and small
> pool of pending messages (yes, using the word count example in
> storm-starter as a starting point....)
> =============
> public class SlowTopology {
>   public static class SlowPhpBolt extends ShellBolt implements IRichBolt {
>
>     public SlowPhpBolt() {
>       super("php", "slowBolt.php");
>     }
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>       declarer.declare(new Fields());
>     }
>
>     @Override
>     public Map<String, Object> getComponentConfiguration() {
>       return null;
>     }
>   }
>
>   public static class SlowPhpSpout extends ShellSpout implements
> IRichSpout {
>
>       public SlowPhpSpout() {
>           super("php", "slowSpout.php");
>       }
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer ofd) {
>         ofd.declare(new Fields("output"));
>     }
>
>     @Override
>     public Map<String, Object> getComponentConfiguration() {
>         return null;
>     }
>   }
>
>   public static void main(String[] args) throws Exception {
>
>     TopologyBuilder builder = new TopologyBuilder();
>
>     builder.setSpout("spout", new SlowPhpSpout(),
> 1).setNumTasks(1).setMaxSpoutPending(3);
>     builder.setBolt("bolt", new SlowPhpBolt(),
> 1).setNumTasks(1).shuffleGrouping("spout");
>
>     Config conf = new Config();
>     conf.setDebug(true);
>
>     if (args != null && args.length > 0) {
>       conf.setNumWorkers(1);
>       StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
> builder.createTopology());
>     }
>     else {
>       LocalCluster cluster = new LocalCluster();
>       cluster.submitTopology("slow", conf, builder.createTopology());
>       Thread.sleep(10000);
>       cluster.shutdown();
>     }
>   }
> }
> ===========
>
> slowSpout.php
> ==========
> <?php
> require_once "storm.php";
> class slowSpout extends \ShellSpout {
>   protected function nextTuple() {
>     $value = rand(0,100);
>     $id = rand(0, 100);
>     $this->emit(array($value), $id);
>     file_put_contents("/tmp/storm_slow.log", "nextTuple()->value[$value]
> id[$id]\n", FILE_APPEND);
>     sleep(1);
>   }
>   protected function ack($id) {
>     file_put_contents("/tmp/storm_slow.log", "ack($id)\n", FILE_APPEND);
>   }
>
>   protected function fail($id) {
>     file_put_contents("/tmp/storm_slow.log", "fail($id)\n", FILE_APPEND);
>   }
> }
>
> (new slowSpout())->run();
> ===========
>
> slowBolt.php
> ============
> <?php
> require_once "storm.php";
> class slowBolt extends \BasicBolt {
>   protected function process(\Tuple $t) {
>     $sleep = rand(1, 180);
>     file_put_contents("/tmp/storm_slow.log", "process(".print_r($t,
> true)."), sleeping for sleep[$sleep]\n", FILE_APPEND);
>     sleep($sleep);
>   }
> }
> (new slowBolt())->run();
> ============
>
> storm.php  (from  https://github.com/lazyshot/storm-php, and I think I
> added more error checking on reads/writes to standard in/out, added sync()
> to the ShellSpout to make new classes easier to write, and the new
> heartbeat protocol)
> =========
> <?php
> interface iShellBolt {
> }
>
> interface iShellSpout {
> }
>
> class Tuple {
>     public $id, $component, $stream, $task, $values;
>
>     public function __construct($id, $component, $stream, $task, $values) {
>         $this->id = $id;
>         $this->component = $component;
>         $this->stream = $stream;
>         $this->task = $task;
>         $this->values = $values;
>     }
> }
>
> abstract class ShellComponent {
>     protected $pid;
>     protected $stormConf;
>     protected $topologyContext;
>
>     protected $stormInc = null;
>
>     public function __construct() {
>         $this->pid = getmypid();
>         $this->sendCommand(array("pid" => $this->pid));
>
>         $handshake = $this->parseMessage($this->waitForMessage());
>
>         $this->stormConf = $handshake['conf'];
>         $this->topologyContext = $handshake['context'];
>         $pidDir = $handshake['pidDir'];
>
>         @fclose(@fopen($pidDir . "/" . $this->pid, "w"));
>     }
>
>     protected function readLine() {
>         $raw = fgets(STDIN);
>
>         if ($raw === false) {
>             throw new Exception("STDIN is broken");
>         }
>
>         $line = trim($raw);
>
>         return $line;
>     }
>
>     protected function waitForMessage() {
>         $message = '';
>         while (true) {
>             $line = trim($this->readLine());
>
>             if (strlen($line) == 0) {
>                 continue;
>             } else if ($line == 'end') {
>                 break;
>             } else if ($line == 'sync') {
>                 $message = '';
>                 continue;
>             }
>
>             $message .= $line . "\n";
>         }
>
>         return trim($message);
>     }
>
>     protected function sendCommand(array $command) {
>         $this->sendMessage(json_encode($command));
>     }
>
>     protected function sendLog($message) {
>         return $this->sendCommand(array(
>             'command' => 'log',
>             'msg' => $message
>         ));
>     }
>
>     protected function parseMessage($message) {
>         $msg = json_decode($message, true);
>
>         if ($msg) {
>             return $msg;
>         } else {
>             return $message;
>         }
>     }
>
>     protected function sendMessage($message) {
>         $message = "$message\nend\n";
>         $bytesWritten = fwrite(STDOUT, $message);
>         fflush(STDOUT);
>         if ($bytesWritten === false) {
>             throw new Exception("STDOUT is broken");
>         }
>         if ($bytesWritten != strlen($message)) {
>             throw new Exception("Unable to write all bytes to STDOUT
> (message=$message, bytesWritten=$bytesWritten)");
>         }
>     }
>
>     final protected function sync() {
>         $command = array(
>             'command' => 'sync',
>         );
>
>         $this->sendCommand($command);
>     }
>
> }
>
> abstract class ShellBolt extends ShellComponent implements iShellBolt {
>
>     public $anchor_tuple = null;
>
>     public function __construct() {
>         parent::__construct();
>
>         $this->init($this->stormConf, $this->topologyContext);
>     }
>
>     public function run() {
>         try {
>             while (true) {
>                 $command = $this->parseMessage($this->waitForMessage());
>
>                 if (is_array($command)) {
>                     if (isset($command['tuple'])) {
>                         $tupleMap = array_merge(array(
>                                 'id' => null,
>                                 'comp' => null,
>                                 'stream' => null,
>                                 'task' => null,
>                                 'tuple' => null
>                             ),
>
>                             $command);
>
>                         if($tupleMap['task'] == -1 && $tupleMap['stream']
> == "__heartbeat") {
>                             $this->sync();
>                         } else {
>                             $tuple = new Tuple($tupleMap['id'],
> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
> $tupleMap['tuple']);
>                             $this->process($tuple);
>                         }
>                     }
>                 }
>             }
>         } catch (Exception $e) {
>             $this->sendLog((string)$e);
>         }
>     }
>
>     abstract protected function process(Tuple $tuple);
>
>     protected function init($conf, $topology) {
>         return;
>     }
>
>     protected function emitTuple(array $tuple, $stream = null, $anchors =
> array(), $directTask = null) {
>         if ($this->anchor_tuple !== null) {
>             $anchors = array($this->anchor_tuple);
>         }
>
>         $command = array(
>             'command' => 'emit'
>         );
>
>         if ($stream !== null) {
>             $command['stream'] = $stream;
>         }
>
>         $command['anchors'] = array_map(function ($a) {
>             return $a->id;
>         }, $anchors);
>
>         if ($directTask !== null) {
>             $command['task'] = $directTask;
>         }
>
>         $command['tuple'] = $tuple;
>
>         $this->sendCommand($command);
>     }
>
>     protected function emit($tuple, $stream = null, $anchors = array()) {
>         $this->emitTuple($tuple, $stream, $anchors);
>     }
>
>     protected function emitDirect($directTask, $tuple, $stream = null,
> $anchors = array()) {
>         $this->emitTuple($tuple, $stream, $anchors, $directTask);
>     }
>
>     protected function ack(Tuple $tuple) {
>         $command = array(
>             'command' => 'ack',
>             'id' => $tuple->id
>         );
>
>         $this->sendCommand($command);
>     }
>
>     protected function fail(Tuple $tuple) {
>         $command = array(
>             'command' => 'fail',
>             'id' => $tuple->id
>         );
>
>         $this->sendCommand($command);
>     }
> }
>
> abstract class BasicBolt extends ShellBolt {
>     public function run() {
>         try {
>             while (true) {
>                 $command = $this->parseMessage($this->waitForMessage());
>
>                 if (is_array($command)) {
>                     if (isset($command['tuple'])) {
>                         $tupleMap = array_merge(array(
>                                 'id' => null,
>                                 'comp' => null,
>                                 'stream' => null,
>                                 'task' => null,
>                                 'tuple' => null
>                             ),
>
>                             $command);
>
>                         if($tupleMap['task'] == -1 && $tupleMap['stream']
> == "__heartbeat") {
>                             $this->sync();
>                         } else {
>                             $tuple = new Tuple($tupleMap['id'],
> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
> $tupleMap['tuple']);
>
>                             $this->anchor_tuple = $tuple;
>
>                             try {
>                                 $processed = $this->process($tuple);
>
>                                 $this->ack($tuple);
>                             } catch (BoltProcessException $e) {
>                                 $this->fail($tuple);
>                             }
>                         }
>                     }
>                 }
>             }
>         } catch (Exception $e) {
>             $this->sendLog((string)$e);
>         }
>
>     }
> }
>
> abstract class ShellSpout extends ShellComponent implements iShellSpout {
>     protected $tuples = array();
>
>     public function __construct() {
>         parent::__construct();
>
>         $this->init($this->stormConf, $this->topologyContext);
>     }
>
>
>     abstract protected function nextTuple();
>
>     abstract protected function ack($tuple_id);
>
>     abstract protected function fail($tuple_id);
>
>     public function run() {
>         try {
>             while (true) {
>                 $command = $this->parseMessage($this->waitForMessage());
>
>                 if (is_array($command)) {
>                     if (isset($command['command'])) {
>                         if ($command['command'] == 'ack') {
>                             $this->ack($command['id']);
>                             $this->sync();
>                         } else if ($command['command'] == 'fail') {
>                             $this->fail($command['id']);
>                             $this->sync();
>                         } else if ($command['command'] == 'next') {
>                             $this->nextTuple();
>                             $this->sync();
>                         }
>                     }
>                 }
>             }
>         } catch (Exception $e) {
>             $this->sendLog((string)$e);
>             $this->sync();
>         }
>     }
>
>     protected function init($stormConf, $topologyContext) {
>         return;
>     }
>
>     final protected function emit(array $tuple, $messageId = null,
> $streamId = null) {
>         return $this->emitTuple($tuple, $messageId, $streamId, null);
>     }
>
>     final protected function emitDirect($directTask, array $tuple,
> $messageId = null, $streamId = null) {
>         return $this->emitTuple($tuple, $messageId, $streamId,
> $directTask);
>     }
>
>     final private function emitTuple(array $tuple, $messageId = null,
> $streamId = null, $directTask = null) {
>         $command = array(
>             'command' => 'emit'
>         );
>
>         if ($messageId !== null) {
>             $command['id'] = $messageId;
>         }
>
>         if ($streamId !== null) {
>             $command['stream'] = $streamId;
>         }
>
>         if ($directTask !== null) {
>             $command['task'] = $directTask;
>         }
>
>         $command['tuple'] = $tuple;
>
>         return $this->sendCommand($command);
>     }
> }
>
> class BoltProcessException extends Exception {
> }
>
> =========================
>
>
> On Fri, Feb 6, 2015 at 9:48 AM, William Oberman <oberman@civicscience.com>
> wrote:
>
>> Hi,
>>
>> For reference, I'm talking about 0.9.3 ShellSpout, line 234.
>>
>> I'll try to cover the important facts that led to this issue:
>>
>> -I was on 0.9.2 using multilang to bridge to PHP to get to some existing
>> business logic
>>
>> -I'm testing the 0.9.3 upgrade (yes, I see the new heartbeat addition to
>> the ShellBolt protocol)
>>
>> -I have some odd topologies where I try to do some legacy background
>> processing.  This processing takes a highly variable amount time in the
>> Bolts, from milliseconds to minutes.  But, eventually due to randomness the
>> spout's "pending" pool fills up, causing the spout to block on nextTuple,
>> which eventually causes a heartbeat timeout. (I believe my only fix is to
>> increase the heartbeat timeout at the topology level. that's not the
>> purpose of this email, though confirmation of this as my only workaround
>> would be appreciated!  I feel like this wasn't anticipated when the
>> heartbeat patch was designed, as it was assumed the spout's nextTuple
>> wouldn't block I guess?)
>>
>> -The purpose of this email is the fact that the topology "jams up" when
>> the ShellSpout has a heartbeat timeout.  I can see my PHP spout/bolt still
>> running (I added logging to them), but Storm itself is doing nothing.
>>
>> -I added logging to ShellSpout and recompiled, because I saw the log
>> message on like 233 (Halting process: ShellSpout died) but as noted the PHP
>> process was still running, so I was curious if _process.destroy(); failed.
>> But, my logging didn't appear.  I assumed I was compiling/deploying wrong.
>> Eventually I commented out line 234: _collector.reportError(exception);
>>  and everything started working!!!
>>
>> Does this make *any* sense?  Why would _collector.reportError(exception);
>> block and never return (I waited quite a long time, 10's of minutes).  When
>> I comment out line 234, Storm immediately kills my bad tasks and respawns
>> almost instantly.
>>
>> I feel fairly confident that this will be recreatable.  My topology:
>> -1 spout (ShellSpout)
>> -1 bolt (ShellBolt)
>> -The ShellSpout has a heartbeat timeout due to slow tasks in ShellBolt +
>> the pending queue is full
>>
>> Thanks for any feedback!
>>
>> will
>>
>>
>>
>
>
>


-- 
Alex Sobrino Beltrán
Registered Linux User #273657

http://v5tech.es

Mime
View raw message