apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chetan Narsude <che...@datatorrent.com>
Subject Re: Possible to gracefully shutdown local cluster?
Date Thu, 03 Sep 2015 20:40:07 GMT
Hi Ilya,

  It looks like the input operator is taking too long to do finish
emitTuples. You can look at StreamingContainer.undeploy call to see what's
happening soon after lc.shutdown is called. From the error it looks like
the input operator is so busy emitting that it's even ignoring the
interrupt.

  Perhaps you can do emitTuples at smaller batches but if you are expecting
that your app shuts itself down completely gracefully in 15 secs lest it's
an error condition, the best approach  is to have your input operators
raise the ShutdownException or simply call BaseOperator.shutdown().

  Input operator shuts itself down when it raises ShutdownException. Other
operators are shutdown when all upstream operators feeding to it have shut
themselves down. The app shuts itself down when the last active operator
shuts itself down.

--
Chetan

On Thu, Sep 3, 2015 at 1:14 PM, Thomas Weise <thomas@datatorrent.com> wrote:

> Ilya,
>
> In your code there is a hard stop after 15s. There are other options to do
> this:
>
>    - The application itself exits when it is done, that is if the input
>    operator raises the ShutdownException which leads to graceful
> termination.
>    - In your test code, let the cluster run asynchronously, check for
>    existence of the expected output (file for example) and call shutdown
> when
>    the file is complete.
>
> Thomas
>
> On Thu, Sep 3, 2015 at 12:36 PM, Ganelin, Ilya <
> Ilya.Ganelin@capitalone.com>
> wrote:
>
> > Hello all – I’m using the following code to execute a topology locally:
> >
> > try {
> >     LocalMode lma = LocalMode.newInstance();
> >     Configuration conf = new Configuration(false);
> >
> >
> conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
> >     lma.prepareDAG(new Application(), conf);
> >     LocalMode.Controller lc = lma.getController();
> >     lc.run(15000); // runs for 10 seconds and quits
> >     lc.shutdown();
> > } catch (ConstraintViolationException e) {
> >     Assert.fail("constraint violations: " + e.getConstraintViolations());
> > }
> >
> > I am reading from HDFS, operating on the data, and writing it back to
> > HDFS. The HDFS operator implementations extend AbstractfileInputOperator
> > and AbstractfileOutputOperator respectively.
> >
> > My issue is that when the timer ends, my operators fail catastrophically,
> > without completing their current operations (e.g. if reading/writing from
> > HDFS they don’t close the file stream). Is there a way for this to close
> > gracefully? Is this an issue with the implementations of the operators or
> > with the way that local topologies are executed?
> >
> > Error stacks below:
> >
> > java.util.concurrent.RejectedExecutionException: Task
> > java.util.concurrent.FutureTask@3848436c rejected from
> > java.util.concurrent.ThreadPoolExecutor@770deac6[Terminated, pool size =
> > 0, active threads = 0, queued tasks = 0, completed tasks = 20121]
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
> > at
> >
> java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
> > at
> >
> java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:678)
> > at
> com.datatorrent.bufferserver.internal.DataList.flush(DataList.java:226)
> > at
> >
> com.datatorrent.bufferserver.server.Server$Publisher.read(Server.java:642)
> > at com.datatorrent.netlet.AbstractClient.read(AbstractClient.java:117)
> > at
> >
> com.datatorrent.netlet.DefaultEventLoop.handleSelectedKey(DefaultEventLoop.java:295)
> > at
> >
> com.datatorrent.netlet.DefaultEventLoop.runEventLoop(DefaultEventLoop.java:252)
> > at com.datatorrent.netlet.DefaultEventLoop.run(DefaultEventLoop.java:100)
> > at java.lang.Thread.run(Thread.java:745)
> > 2015-09-03 12:23:20,545 [2/RecordMaker I:RecordMaker] DEBUG
> > engine.StreamingContainer teardownNode - deactivated 2
> > 2015-09-03 12:23:20,544 [IPC Parameter Sending Thread #0] DEBUG
> ipc.Client
> > run - IPC Client (467473545) connection to
> > mdcilabpen01.kdc.capitalone.com/10.24.28.46:8020 from zjb238 sending #14
> > 2015-09-03 12:23:20,543 [9/HdfsOutHdht:HdfsFileOutputOperator] DEBUG
> > hdfs.DFSClient writeChunkImpl - DFSClient writeChunk allocating new
> packet
> > seqno=0, src=/tmp/citadel_out/latencies_hdht.txt.1441308190967.tmp,
> > packetSize=65016, chunksPerPacket=126, bytesCurBlock=0
> > 2015-09-03 12:23:20,546 [1/NewLines:NewlineFileInputOperator] ERROR
> > fs.AbstractFileInputOperator failureHandling - FS reader error
> > java.io.IOException: Filesystem closed
> > at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:808)
> > at
> >
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:830)
> > at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:896)
> > at java.io.DataInputStream.read(DataInputStream.java:149)
> > at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
> > at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
> > at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
> > at java.io.InputStreamReader.read(InputStreamReader.java:184)
> > at java.io.BufferedReader.fill(BufferedReader.java:161)
> > at java.io.BufferedReader.readLine(BufferedReader.java:324)
> > at java.io.BufferedReader.readLine(BufferedReader.java:389)
> > at
> >
> com.capitalone.vault8.citadel.operators.impl.NewlineFileInputOperator.readEntity(NewlineFileInputOperator.java:78)
> > at
> >
> com.capitalone.vault8.citadel.operators.impl.NewlineFileInputOperator.readEntity(NewlineFileInputOperator.java:22)
> > at
> >
> com.datatorrent.lib.io.fs.AbstractFileInputOperator.emitTuples(AbstractFileInputOperator.java:653)
> > at com.datatorrent.stram.engine.InputNode.run(InputNode.java:115)
> > at
> >
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1363)
> >
> >
> >
> >
> > ________________________________________________________
> >
> > The information contained in this e-mail is confidential and/or
> > proprietary to Capital One and/or its affiliates and may only be used
> > solely in performance of work or services for Capital One. The
> information
> > transmitted herewith is intended only for use by the individual or entity
> > to which it is addressed. If the reader of this message is not the
> intended
> > recipient, you are hereby notified that any review, retransmission,
> > dissemination, distribution, copying or other use of, or taking of any
> > action in reliance upon this information is strictly prohibited. If you
> > have received this communication in error, please contact the sender and
> > delete the material from your computer.
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message