qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [30/38] qpid-proton git commit: PROTON-881: make unittests check for Java I/O object leaks
Date Sun, 05 Jul 2015 23:45:31 GMT
PROTON-881: make unittests check for Java I/O object leaks

Implements something similar to pn_io so that the various Java I/O resources are
created in a single place.  Reactor unit tests are parameterized to run once with
the reactor implementation returned from Proton.reactor() and once from a
unittest extension of ReactorImpl which checks that all Java I/O resources, used
by testcase, are closed.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2d3cefc4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2d3cefc4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2d3cefc4

Branch: refs/heads/master
Commit: 2d3cefc4562e2246a9007d500cf2885ad9b8adc6
Parents: 3c37992
Author: Adrian Preston <prestona@uk.ibm.com>
Authored: Mon May 18 23:07:22 2015 +0100
Committer: Adrian Preston <prestona@uk.ibm.com>
Committed: Mon May 18 23:07:22 2015 +0100

----------------------------------------------------------------------
 .../org/apache/qpid/proton/reactor/Reactor.java |   6 -
 .../qpid/proton/reactor/impl/AcceptorImpl.java  |   8 +-
 .../org/apache/qpid/proton/reactor/impl/IO.java |  44 +++++++
 .../qpid/proton/reactor/impl/IOHandler.java     |   8 +-
 .../apache/qpid/proton/reactor/impl/IOImpl.java |  52 ++++++++
 .../qpid/proton/reactor/impl/ReactorImpl.java   |  15 ++-
 .../qpid/proton/reactor/impl/SelectorImpl.java  |   6 +-
 .../apache/qpid/proton/reactor/ReactorTest.java |  63 ++++++++--
 .../proton/reactor/impl/AcceptorImplTest.java   |  32 ++---
 .../proton/reactor/impl/LeakTestReactor.java    | 118 +++++++++++++++++++
 10 files changed, 301 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3cefc4/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
index 5756b34..d6cca72 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
@@ -56,12 +56,6 @@ public interface Reactor {
 
     public void setHandler(Handler handler);
 
-    // XXX: The C reactor has a pn_reactor_io() function.  The closest Java equivalent
-    //      would be a factory for creating SocketChannel's, ServerSocketChannelsm and Selectors.
-    //      This seems like overkill unless there's a use for this in unit testing, or the
-    //      Reactor needs to be integrated with an exotic Java environment which provides
its
-    //      own networking implementation.
-
     public Set<ReactorChild> children();
 
     public Collector collector();

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3cefc4/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
index 7300b50..12006ad 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
@@ -90,14 +90,8 @@ public class AcceptorImpl implements Acceptor {
 
     private final Selectable sel;
 
-    // Split out from AcceptorImpl to make it easier for unittests to mock this class
-    // without having to open an actual port.
-    protected ServerSocketChannel openServerSocket() throws IOException {
-        return ServerSocketChannel.open();
-    }
-
     protected AcceptorImpl(Reactor reactor, String host, int port, Handler handler) throws
IOException {
-        ServerSocketChannel ssc = openServerSocket();
+        ServerSocketChannel ssc = ((ReactorImpl)reactor).getIO().serverSocketChannel();
         ssc.bind(new InetSocketAddress(host, port));
         sel = ((ReactorImpl)reactor).selectable(this);
         sel.setChannel(ssc);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3cefc4/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IO.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IO.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IO.java
new file mode 100644
index 0000000..1028ae8
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IO.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor.impl;
+
+import java.io.IOException;
+import java.nio.channels.Pipe;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+// Java equivalent to pn_io.
+// This is, currently, in the reactor.impl package because it is not
+// used elsewhere in the proton-j codebase.  Instead it is present to
+// facilitate mocking of various Java I/O related resources so that
+// the unit tests can check for leaks.
+public interface IO {
+
+    Pipe pipe() throws IOException;
+
+    Selector selector() throws IOException;
+
+    ServerSocketChannel serverSocketChannel() throws IOException;
+
+    SocketChannel socketChannel() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3cefc4/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
index ed14628..f810742 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
@@ -102,7 +102,7 @@ public class IOHandler extends BaseHandler {
         Transport transport = event.getConnection().getTransport();
         Socket socket = null;   // In this case, 'null' is the proton-j equivalent of PN_INVALID_SOCKET
         try {
-            SocketChannel socketChannel = SocketChannel.open();
+            SocketChannel socketChannel = ((ReactorImpl)reactor).getIO().socketChannel();
             socketChannel.connect(new InetSocketAddress(hostname, port));
             socket = socketChannel.socket();
         } catch(IOException ioException) {
@@ -166,8 +166,6 @@ public class IOHandler extends BaseHandler {
             Transport transport = selectable.getTransport();
             int capacity = transport.capacity();
             if (capacity > 0) {
-                // TODO: we shouldn't be doing this cast.  Instead - selectable should return
an
-                //       object with 1) a getter for the SelectableChannel, 2) read/write
methods.
                 SocketChannel socketChannel = (SocketChannel)selectable.getChannel();
                 try {
                     int n = socketChannel.read(transport.tail());
@@ -200,7 +198,7 @@ public class IOHandler extends BaseHandler {
             Transport transport = selectable.getTransport();
             int pending = transport.pending();
             if (pending > 0) {
-                SocketChannel channel = (SocketChannel)selectable.getChannel(); // TODO:
can't rely on this cast always working!
+                SocketChannel channel = (SocketChannel)selectable.getChannel();
                 try {
                     int n = channel.write(transport.head());
                     if (n < 0) {
@@ -299,7 +297,7 @@ public class IOHandler extends BaseHandler {
             ReactorImpl reactor = (ReactorImpl)event.getReactor();
             Selector selector = reactor.getSelector();
             if (selector == null) {
-                selector = new SelectorImpl();
+                selector = new SelectorImpl(reactor.getIO());
                 reactor.setSelector(selector);
             }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3cefc4/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOImpl.java
new file mode 100644
index 0000000..6376b16
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOImpl.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor.impl;
+
+import java.io.IOException;
+import java.nio.channels.Pipe;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+public class IOImpl implements IO {
+
+    @Override
+    public Pipe pipe() throws IOException {
+        return Pipe.open();
+    }
+
+    @Override
+    public Selector selector() throws IOException {
+        return Selector.open();
+    }
+
+    @Override
+    public ServerSocketChannel serverSocketChannel() throws IOException {
+        return ServerSocketChannel.open();
+    }
+
+    @Override
+    public SocketChannel socketChannel() throws IOException {
+        return SocketChannel.open();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3cefc4/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
index e48bd3b..db20601 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
@@ -63,6 +63,7 @@ public class ReactorImpl implements Reactor, Extendable {
     private final Pipe wakeup;
     private Selector selector;
     private Record attachments;
+    private final IO io;
 
     @Override
     public long mark() {
@@ -75,18 +76,23 @@ public class ReactorImpl implements Reactor, Extendable {
         return now;
     }
 
-    public ReactorImpl() throws IOException {
+    protected ReactorImpl(IO io) throws IOException {
         collector = (CollectorImpl)Proton.collector();
         global = new IOHandler();
         handler = new BaseHandler();
         children = new HashSet<ReactorChild>();
         selectables = 0;
         timer = new Timer(collector);
-        wakeup = Pipe.open();
+        this.io = io;
+        wakeup = this.io.pipe();
         mark();
         attachments = new RecordImpl();
     }
 
+    public ReactorImpl() throws IOException {
+        this(new IOImpl());
+    }
+
     @Override
     public void free() {
         if (wakeup.source().isOpen()) {
@@ -113,6 +119,7 @@ public class ReactorImpl implements Reactor, Extendable {
         }
     }
 
+    @Override
     public Record attachments() {
         return attachments;
     }
@@ -412,4 +419,8 @@ public class ReactorImpl implements Reactor, Extendable {
     public Acceptor acceptor(String host, int port, Handler handler) throws IOException {
         return new AcceptorImpl(this, host, port, handler);
     }
+
+    public IO getIO() {
+        return io;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3cefc4/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
index 28ea1ae..1145158 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
@@ -30,7 +30,7 @@ import java.util.Iterator;
 import org.apache.qpid.proton.reactor.Selectable;
 import org.apache.qpid.proton.reactor.Selector;
 
-public class SelectorImpl implements Selector {
+class SelectorImpl implements Selector {
 
     private final java.nio.channels.Selector selector;
     private final HashSet<Selectable> selectables = new HashSet<Selectable>();
@@ -39,8 +39,8 @@ public class SelectorImpl implements Selector {
     private final HashSet<Selectable> expired = new HashSet<Selectable>();
     private final HashSet<Selectable> error = new HashSet<Selectable>();
 
-    public SelectorImpl() throws IOException {
-        selector = java.nio.channels.Selector.open();
+    protected SelectorImpl(IO io) throws IOException {
+        selector = io.selector();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3cefc4/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
index 76fc632..c784849 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
@@ -29,6 +29,8 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.engine.BaseHandler;
@@ -40,10 +42,61 @@ import org.apache.qpid.proton.engine.Handler;
 import org.apache.qpid.proton.engine.Sender;
 import org.apache.qpid.proton.engine.Session;
 import org.apache.qpid.proton.reactor.impl.AcceptorImpl;
+import org.apache.qpid.proton.reactor.impl.LeakTestReactor;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
+@RunWith(Parameterized.class)
 public class ReactorTest {
 
+    public ReactorFactory reactorFactory;
+    private Reactor reactor;
+
+    private static interface ReactorFactory {
+        Reactor newReactor() throws IOException;
+    }
+
+    // Parameterize the tests, and run them once with a reactor obtained by calling
+    // 'Proton.reactor()' and once with the LeakTestReactor.
+    @Parameters
+    public static Collection<ReactorFactory[]> data() throws IOException {
+        ReactorFactory classicReactor = new ReactorFactory() {
+            @Override public Reactor newReactor() throws IOException {
+                return Proton.reactor();
+            }
+        };
+        ReactorFactory newLeakDetection = new ReactorFactory() {
+            @Override public Reactor newReactor() throws IOException {
+                return new LeakTestReactor();
+            }
+        };
+        return Arrays.asList(new ReactorFactory[][]{{classicReactor}, {newLeakDetection}});
+    }
+
+    public ReactorTest(ReactorFactory reactorFactory) {
+        this.reactorFactory = reactorFactory;
+    }
+
+    @Before
+    public void before() throws IOException {
+        reactor = reactorFactory.newReactor();
+    }
+
+    private void checkForLeaks() {
+        if (reactor instanceof LeakTestReactor) {
+            ((LeakTestReactor)reactor).assertNoLeaks();
+        }
+    }
+
+    @After
+    public void after() {
+        checkForLeaks();
+    }
+
     /**
      * Tests that creating a reactor and running it:
      * <ul>
@@ -54,7 +107,6 @@ public class ReactorTest {
      */
     @Test
     public void runEmpty() throws IOException {
-        Reactor reactor = Proton.reactor();
         assertNotNull(reactor);
         reactor.run();
         reactor.free();
@@ -81,7 +133,6 @@ public class ReactorTest {
      */
     @Test
     public void handlerRun() throws IOException {
-        Reactor reactor = Proton.reactor();
         Handler handler = reactor.getHandler();
         assertNotNull(handler);
         TestHandler testHandler = new TestHandler();
@@ -104,7 +155,6 @@ public class ReactorTest {
      */
     @Test
     public void connection() throws IOException {
-        Reactor reactor = Proton.reactor();
         TestHandler connectionHandler = new TestHandler();
         Connection connection = reactor.connection(connectionHandler);
         assertNotNull(connection);
@@ -129,7 +179,6 @@ public class ReactorTest {
      */
     @Test
     public void acceptor() throws IOException {
-        Reactor reactor = Proton.reactor();
         final Acceptor acceptor = reactor.acceptor("127.0.0.1", 0);
         assertNotNull(acceptor);
         assertTrue("acceptor should be one of the reactor's children", reactor.children().contains(acceptor));
@@ -176,8 +225,6 @@ public class ReactorTest {
      */
     @Test
     public void connect() throws IOException {
-        Reactor reactor = Proton.reactor();
-
         ServerHandler sh = new ServerHandler();
         Acceptor acceptor = reactor.acceptor("127.0.0.1",  0, sh);
         final int listeningPort = ((AcceptorImpl)acceptor).getPortNumber();
@@ -288,7 +335,7 @@ public class ReactorTest {
     }
 
     private void transfer(int count, int window) throws IOException {
-        Reactor reactor = Proton.reactor();
+        reactor = reactorFactory.newReactor();
         ServerHandler sh = new ServerHandler();
         Acceptor acceptor = reactor.acceptor("127.0.0.1", 0, sh);
         sh.setAcceptor(acceptor);
@@ -305,6 +352,7 @@ public class ReactorTest {
         reactor.run();
         reactor.free();
         assertEquals("Did not receive the expected number of messages", count, snk.received);
+        checkForLeaks();
     }
 
     @Test
@@ -326,7 +374,6 @@ public class ReactorTest {
 
     @Test
     public void schedule() throws IOException {
-        Reactor reactor = Proton.reactor();
         TestHandler reactorHandler = new TestHandler();
         reactor.getHandler().add(reactorHandler);
         TestHandler taskHandler = new TestHandler();

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3cefc4/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java
b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java
index 85750ea..9ac0538 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java
@@ -24,8 +24,6 @@ package org.apache.qpid.proton.reactor.impl;
 import java.io.IOException;
 import java.nio.channels.ServerSocketChannel;
 
-import org.apache.qpid.proton.engine.Handler;
-import org.apache.qpid.proton.reactor.Reactor;
 import org.apache.qpid.proton.reactor.ReactorChild;
 import org.apache.qpid.proton.reactor.Selectable.Callback;
 import org.junit.Test;
@@ -44,21 +42,18 @@ public class AcceptorImplTest {
         final SelectableImpl selectable = new SelectableImpl();
         selectable.onError(mockCallback);
         ReactorImpl mockReactor = Mockito.mock(ReactorImpl.class);
-        Mockito.when(mockReactor.selectable(Mockito.any(ReactorChild.class))).thenReturn(selectable);
-        class MockAcceptorImpl extends AcceptorImpl {
-
-            protected MockAcceptorImpl(Reactor reactor, String host, int port, Handler handler)
throws IOException {
-                super(reactor, host, port, handler);
-            }
-
+        class MockIO extends IOImpl {
             @Override
-            protected ServerSocketChannel openServerSocket() throws IOException {
+            public ServerSocketChannel serverSocketChannel() throws IOException {
                 ServerSocketChannel result = Mockito.mock(ServerSocketChannel.class);
                 Mockito.when(result.accept()).thenThrow(new IOException());
                 return result;
             }
         }
-        new MockAcceptorImpl(mockReactor, "host", 1234, null);
+        IO mockIO = new MockIO();
+        Mockito.when(mockReactor.getIO()).thenReturn(mockIO);
+        Mockito.when(mockReactor.selectable(Mockito.any(ReactorChild.class))).thenReturn(selectable);
+        new AcceptorImpl(mockReactor, "host", 1234, null);
         selectable.readable();
         Mockito.verify(mockCallback).run(selectable);
     }
@@ -75,21 +70,18 @@ public class AcceptorImplTest {
         final SelectableImpl selectable = new SelectableImpl();
         selectable.onError(mockCallback);
         ReactorImpl mockReactor = Mockito.mock(ReactorImpl.class);
-        Mockito.when(mockReactor.selectable(Mockito.any(ReactorChild.class))).thenReturn(selectable);
-        class MockAcceptorImpl extends AcceptorImpl {
-
-            protected MockAcceptorImpl(Reactor reactor, String host, int port, Handler handler)
throws IOException {
-                super(reactor, host, port, handler);
-            }
-
+        class MockIO extends IOImpl {
             @Override
-            protected ServerSocketChannel openServerSocket() throws IOException {
+            public ServerSocketChannel serverSocketChannel() throws IOException {
                 ServerSocketChannel result = Mockito.mock(ServerSocketChannel.class);
                 Mockito.when(result.accept()).thenReturn(null);
                 return result;
             }
         }
-        new MockAcceptorImpl(mockReactor, "host", 1234, null);
+        IO mockIO = new MockIO();
+        Mockito.when(mockReactor.getIO()).thenReturn(mockIO);
+        Mockito.when(mockReactor.selectable(Mockito.any(ReactorChild.class))).thenReturn(selectable);
+        new AcceptorImpl(mockReactor, "host", 1234, null);
         selectable.readable();
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3cefc4/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/LeakTestReactor.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/LeakTestReactor.java
b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/LeakTestReactor.java
new file mode 100644
index 0000000..d06ec0a
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/LeakTestReactor.java
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor.impl;
+
+import java.io.IOException;
+import java.nio.channels.Pipe;
+import java.nio.channels.Pipe.SinkChannel;
+import java.nio.channels.Pipe.SourceChannel;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Map.Entry;
+
+import junit.framework.AssertionFailedError;
+
+// Extends the Reactor to substitute a unit-test implementation of the
+// IO class.  This detects, and reports, situations where the reactor code
+// fails to close one of the Java I/O related resources that it has created.
+public class LeakTestReactor extends ReactorImpl {
+
+    private static class TestIO implements IO {
+
+        private final HashMap<Object, Exception> resources = new HashMap<Object,
Exception>();
+
+        @Override
+        public Pipe pipe() throws IOException {
+            Pipe pipe = Pipe.open();
+            resources.put(pipe.source(), new Exception());
+            resources.put(pipe.sink(), new Exception());
+            return pipe;
+        }
+
+        @Override
+        public Selector selector() throws IOException {
+            Selector selector = Selector.open();
+            resources.put(selector, new Exception());
+            return selector;
+
+        }
+
+        @Override
+        public ServerSocketChannel serverSocketChannel() throws IOException {
+            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+            resources.put(serverSocketChannel, new Exception());
+            return serverSocketChannel;
+        }
+
+        @Override
+        public SocketChannel socketChannel() throws IOException {
+            SocketChannel socketChannel = SocketChannel.open();
+            resources.put(socketChannel, new Exception());
+            return socketChannel;
+        }
+
+        private boolean isOpen(Object resource) {
+            if (resource instanceof SourceChannel) {
+                return ((SourceChannel)resource).isOpen();
+            } else if (resource instanceof SinkChannel) {
+                return ((SinkChannel)resource).isOpen();
+            } else if (resource instanceof Selector) {
+                return ((Selector)resource).isOpen();
+            } else if (resource instanceof ServerSocketChannel) {
+                return ((ServerSocketChannel)resource).isOpen();
+            } else if (resource instanceof SocketChannel) {
+                return ((SocketChannel)resource).isOpen();
+            } else {
+                throw new AssertionFailedError("Don't know how to check if this type is open:
" + resource.getClass());
+            }
+        }
+
+        protected void assertNoLeaks() throws AssertionFailedError {
+            boolean fail = false;
+            for (Entry<Object, Exception> entry : resources.entrySet()) {
+                if (isOpen(entry.getKey())) {
+                    System.out.println("Leaked an instance of '" + entry.getKey() + "' from:");
+                    entry.getValue().printStackTrace(System.out);
+                    fail = true;
+                }
+            }
+            if (fail) {
+                throw new AssertionFailedError("Resources leaked");
+            }
+            resources.clear();
+        }
+    }
+
+    private final TestIO testIO;
+
+    public LeakTestReactor() throws IOException {
+        super(new TestIO());
+        testIO = (TestIO)getIO();
+    }
+
+    public void assertNoLeaks() throws AssertionFailedError {
+        testIO.assertNoLeaks();
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message