incubator-s4-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Karthik Kambatla <kkamb...@cs.purdue.edu>
Subject Re: Handling Interrupted Exceptions in Listener/Receiver
Date Wed, 28 Dec 2011 19:24:04 GMT
Listener implementation should ideally throw InterruptedException. Also, it
might be nice to add close() methods to both Emitters and Listeners - (1)
We can clean up the state gracefully, (2) We can write multiple tests in
the same TestClass; otherwise we need to do the cleanup from outside the
listeners.

Comments?

Thanks
Karthik

On Mon, Dec 19, 2011 at 1:34 AM, Leo Neumeyer <leoneumeyer@gmail.com> wrote:

> We need to make sure we can shutdown the Server object gracefully. In
> the top layer, when an App closes, it closes all the PEs, stops the
> Timer threads, and stops all the associated streams.
>
> I tried to add a close() method to receiver to stop the thread but
> noticed that netty listener doesn't throw InterruptedException. Also,
> Netty and UDP handle interrupted exceptions differently:
>
> UDPListener:
>
>  public void run() {
>        try {
>            while (!Thread.interrupted()) {
>                socket.receive(datagram);
>                byte[] data = new byte[datagram.getLength()];
>                System.arraycopy(datagram.getData(), datagram.getOffset(),
>                        data, 0, data.length);
>                datagram.setLength(BUFFER_LENGTH);
>                try {
>                    handoffQueue.put(data);
>                } catch (InterruptedException ie) {
>                    Thread.currentThread().interrupt();
>                }
>            }
>        } catch (IOException e) {
>            throw new RuntimeException(e);
>        }
>    }
>
>
> NettyListener:
>
>  public void messageReceived(ChannelHandlerContext ctx,
>                MessageEvent e) {
>            ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
>            try {
>                handoffQueue.put(buffer.array()); // this holds up the
> Netty upstream I/O thread if
>                                                  // there's no
> receiver at the other end of the handoff queue
>            } catch (InterruptedException ie) {
>                Thread.currentThread().interrupt();
>            }
>        }
>
> I don't know much about Netty so before breaking things I wanted to
> ask a few questions:
>
> Shouldn't the contract be that Listener implementations always throw
> InterruptedException?
>
> How can we impose the contract?
>
>
> --
>
> Leo Neumeyer (@leoneu)
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message