apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chandni Singh <chan...@datatorrent.com>
Subject Re: Possible to gracefully shutdown local cluster?
Date Thu, 03 Sep 2015 21:01:38 GMT
Hello,

Here is an example of how a test that uses LocalMode can be shutdown
gracefully.

https://github.com/chandnisingh/Malhar/blob/examples/library/src/test/java/com/datatorrent/lib/io/fs/ApplicationTest.java

Thanks,
Chandni


On Thu, Sep 3, 2015 at 1:40 PM, Chetan Narsude <chetan@datatorrent.com>
wrote:

> Hi Ilya,
>
>   It looks like the input operator is taking too long to do finish
> emitTuples. You can look at StreamingContainer.undeploy call to see what's
> happening soon after lc.shutdown is called. From the error it looks like
> the input operator is so busy emitting that it's even ignoring the
> interrupt.
>
>   Perhaps you can do emitTuples at smaller batches but if you are expecting
> that your app shuts itself down completely gracefully in 15 secs lest it's
> an error condition, the best approach  is to have your input operators
> raise the ShutdownException or simply call BaseOperator.shutdown().
>
>   Input operator shuts itself down when it raises ShutdownException. Other
> operators are shutdown when all upstream operators feeding to it have shut
> themselves down. The app shuts itself down when the last active operator
> shuts itself down.
>
> --
> Chetan
>
> On Thu, Sep 3, 2015 at 1:14 PM, Thomas Weise <thomas@datatorrent.com>
> wrote:
>
> > Ilya,
> >
> > In your code there is a hard stop after 15s. There are other options to
> do
> > this:
> >
> >    - The application itself exits when it is done, that is if the input
> >    operator raises the ShutdownException which leads to graceful
> > termination.
> >    - In your test code, let the cluster run asynchronously, check for
> >    existence of the expected output (file for example) and call shutdown
> > when
> >    the file is complete.
> >
> > Thomas
> >
> > On Thu, Sep 3, 2015 at 12:36 PM, Ganelin, Ilya <
> > Ilya.Ganelin@capitalone.com>
> > wrote:
> >
> > > Hello all – I’m using the following code to execute a topology locally:
> > >
> > > try {
> > >     LocalMode lma = LocalMode.newInstance();
> > >     Configuration conf = new Configuration(false);
> > >
> > >
> >
> conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
> > >     lma.prepareDAG(new Application(), conf);
> > >     LocalMode.Controller lc = lma.getController();
> > >     lc.run(15000); // runs for 10 seconds and quits
> > >     lc.shutdown();
> > > } catch (ConstraintViolationException e) {
> > >     Assert.fail("constraint violations: " +
> e.getConstraintViolations());
> > > }
> > >
> > > I am reading from HDFS, operating on the data, and writing it back to
> > > HDFS. The HDFS operator implementations extend
> AbstractfileInputOperator
> > > and AbstractfileOutputOperator respectively.
> > >
> > > My issue is that when the timer ends, my operators fail
> catastrophically,
> > > without completing their current operations (e.g. if reading/writing
> from
> > > HDFS they don’t close the file stream). Is there a way for this to
> close
> > > gracefully? Is this an issue with the implementations of the operators
> or
> > > with the way that local topologies are executed?
> > >
> > > Error stacks below:
> > >
> > > java.util.concurrent.RejectedExecutionException: Task
> > > java.util.concurrent.FutureTask@3848436c rejected from
> > > java.util.concurrent.ThreadPoolExecutor@770deac6[Terminated, pool
> size =
> > > 0, active threads = 0, queued tasks = 0, completed tasks = 20121]
> > > at
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
> > > at
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
> > > at
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
> > > at
> > >
> >
> java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
> > > at
> > >
> >
> java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:678)
> > > at
> > com.datatorrent.bufferserver.internal.DataList.flush(DataList.java:226)
> > > at
> > >
> >
> com.datatorrent.bufferserver.server.Server$Publisher.read(Server.java:642)
> > > at com.datatorrent.netlet.AbstractClient.read(AbstractClient.java:117)
> > > at
> > >
> >
> com.datatorrent.netlet.DefaultEventLoop.handleSelectedKey(DefaultEventLoop.java:295)
> > > at
> > >
> >
> com.datatorrent.netlet.DefaultEventLoop.runEventLoop(DefaultEventLoop.java:252)
> > > at
> com.datatorrent.netlet.DefaultEventLoop.run(DefaultEventLoop.java:100)
> > > at java.lang.Thread.run(Thread.java:745)
> > > 2015-09-03 12:23:20,545 [2/RecordMaker I:RecordMaker] DEBUG
> > > engine.StreamingContainer teardownNode - deactivated 2
> > > 2015-09-03 12:23:20,544 [IPC Parameter Sending Thread #0] DEBUG
> > ipc.Client
> > > run - IPC Client (467473545) connection to
> > > mdcilabpen01.kdc.capitalone.com/10.24.28.46:8020 from zjb238 sending
> #14
> > > 2015-09-03 12:23:20,543 [9/HdfsOutHdht:HdfsFileOutputOperator] DEBUG
> > > hdfs.DFSClient writeChunkImpl - DFSClient writeChunk allocating new
> > packet
> > > seqno=0, src=/tmp/citadel_out/latencies_hdht.txt.1441308190967.tmp,
> > > packetSize=65016, chunksPerPacket=126, bytesCurBlock=0
> > > 2015-09-03 12:23:20,546 [1/NewLines:NewlineFileInputOperator] ERROR
> > > fs.AbstractFileInputOperator failureHandling - FS reader error
> > > java.io.IOException: Filesystem closed
> > > at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:808)
> > > at
> > >
> >
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:830)
> > > at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:896)
> > > at java.io.DataInputStream.read(DataInputStream.java:149)
> > > at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
> > > at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
> > > at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
> > > at java.io.InputStreamReader.read(InputStreamReader.java:184)
> > > at java.io.BufferedReader.fill(BufferedReader.java:161)
> > > at java.io.BufferedReader.readLine(BufferedReader.java:324)
> > > at java.io.BufferedReader.readLine(BufferedReader.java:389)
> > > at
> > >
> >
> com.capitalone.vault8.citadel.operators.impl.NewlineFileInputOperator.readEntity(NewlineFileInputOperator.java:78)
> > > at
> > >
> >
> com.capitalone.vault8.citadel.operators.impl.NewlineFileInputOperator.readEntity(NewlineFileInputOperator.java:22)
> > > at
> > >
> >
> com.datatorrent.lib.io.fs.AbstractFileInputOperator.emitTuples(AbstractFileInputOperator.java:653)
> > > at com.datatorrent.stram.engine.InputNode.run(InputNode.java:115)
> > > at
> > >
> >
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1363)
> > >
> > >
> > >
> > >
> > > ________________________________________________________
> > >
> > > The information contained in this e-mail is confidential and/or
> > > proprietary to Capital One and/or its affiliates and may only be used
> > > solely in performance of work or services for Capital One. The
> > information
> > > transmitted herewith is intended only for use by the individual or
> entity
> > > to which it is addressed. If the reader of this message is not the
> > intended
> > > recipient, you are hereby notified that any review, retransmission,
> > > dissemination, distribution, copying or other use of, or taking of any
> > > action in reliance upon this information is strictly prohibited. If you
> > > have received this communication in error, please contact the sender
> and
> > > delete the material from your computer.
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message