hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r889848 - in /hadoop/zookeeper/trunk: CHANGES.txt src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java src/java/test/org/apache/zookeeper/test/MaxCnxnsTest.java
Date Fri, 11 Dec 2009 23:03:34 GMT
Author: mahadev
Date: Fri Dec 11 23:03:33 2009
New Revision: 889848

URL: http://svn.apache.org/viewvc?rev=889848&view=rev
Log:
ZOOKEEPER-614. Improper synchronisation in getClientCnxnCount (henry via mahadev)

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/MaxCnxnsTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=889848&r1=889847&r2=889848&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Fri Dec 11 23:03:33 2009
@@ -166,6 +166,9 @@
   ZOOKEEPER-587.  client should log timeout negotiated with server (phunt via
   mahadev)
 
+  ZOOKEEPER-614. Improper synchronisation in getClientCnxnCount (henry via
+  mahadev) 
+
 IMPROVEMENTS:
   ZOOKEEPER-473. cleanup junit tests to eliminate false positives due to
   "socket reuse" and failure to close client (phunt via mahadev)

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=889848&r1=889847&r2=889848&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Fri
Dec 11 23:03:33 2009
@@ -201,10 +201,15 @@
             return new NIOServerCnxn(zks, sock, sk, this);
         }
 
-        private int getClientCnxnCount( InetAddress cl) {
-            Set<NIOServerCnxn> s = ipMap.get(cl);
-            if (s == null) return 0;
-            return s.size();
+        private int getClientCnxnCount(InetAddress cl) {
+            // The ipMap lock covers both the map, and its contents
+            // (that is, the cnxn sets shouldn't be modified outside of
+            // this lock)
+            synchronized (ipMap) {
+                Set<NIOServerCnxn> s = ipMap.get(cl);
+                if (s == null) return 0;
+                return s.size();
+            }
         }
 
         public void run() {

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/MaxCnxnsTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/MaxCnxnsTest.java?rev=889848&r1=889847&r2=889848&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/MaxCnxnsTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/MaxCnxnsTest.java Fri Dec
11 23:03:33 2009
@@ -23,81 +23,103 @@
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.zookeeper.proto.ConnectRequest;
 
 public class MaxCnxnsTest extends ClientBase {
-
-    final private int numCnxns = 5;
+    final private int numCnxns = 30;
+    AtomicInteger numConnected = new AtomicInteger(0); 
+    String host;
+    int port;
     
     protected void setUp() throws Exception {
         maxCnxns = numCnxns;
         super.setUp();
     }
     
-    /**
-     * Verify the ability to limit the number of concurrent connections. 
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    public void testMaxCnxns() throws IOException, InterruptedException{
-        SocketChannel[] sockets = new SocketChannel[numCnxns+5];
-        String split[] = hostPort.split(":");
-        String host = split[0];
-        int port = Integer.parseInt(split[1]);
-        int numConnected = 0;
-
-        /*
-         * For future unwary socket programmers: although connect 'blocks' it 
-         * does not require an accept on the server side to return. Therefore
-         * you can not assume that all the sockets are connected at the end of
-         * this for loop.
-         */
-        for (int i=0;i<sockets.length;++i) {            
-          SocketChannel sChannel = SocketChannel.open();
-          sChannel.connect(new InetSocketAddress(host,port));          
-          sockets[i] = sChannel;
+    class CnxnThread extends Thread {
+        int i;
+        SocketChannel socket;
+        public CnxnThread(int i) {
+            super("CnxnThread-"+i);
+            this.i = i;
         }
-        // Construct a connection request
-        ConnectRequest conReq = new ConnectRequest(0, 0,
-                10000, 0, "password".getBytes());
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
-        boa.writeInt(-1, "len");
-        conReq.serialize(boa, "connect");
-        baos.close();
-        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
-        bb.putInt(bb.capacity() - 4);
         
-        /* Send a connect request. Any socket that has been closed (or at least
-         * not added to the cnxn list on the server) will not have any bytes to
-         * read and get an eof.
-         * 
-         *  The trick here was finding a call that caused the server to put
-         *  bytes in the input stream without closing the cnxn. None of
-         *  the four letter commands do that, so we actually try to create
-         *  a session which should send us something back, while maintaining
-         *  the connection.
-         */
-        for (int i=0;i<sockets.length;++i) {
+        public void run() {
             try {
+                /*
+                 * For future unwary socket programmers: although connect 'blocks' it 
+                 * does not require an accept on the server side to return. Therefore
+                 * you can not assume that all the sockets are connected at the end of
+                 * this for loop.
+                 */                
+                SocketChannel sChannel = SocketChannel.open();
+                sChannel.connect(new InetSocketAddress(host,port));                     
    
+                // Construct a connection request
+                ConnectRequest conReq = new ConnectRequest(0, 0,
+                        10000, 0, "password".getBytes());
+                ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+                boa.writeInt(-1, "len");
+                conReq.serialize(boa, "connect");
+                baos.close();
+                ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+                bb.putInt(bb.capacity() - 4);                
                 bb.rewind();
-                int eof = sockets[i].write(bb);
+                
+                /* Send a connect request. Any socket that has been closed (or at least
+                 * not added to the cnxn list on the server) will not have any bytes to
+                 * read and get an eof.
+                 * 
+                 *  The trick here was finding a call that caused the server to put
+                 *  bytes in the input stream without closing the cnxn. None of
+                 *  the four letter commands do that, so we actually try to create
+                 *  a session which should send us something back, while maintaining
+                 *  the connection.
+                 */
+                
+                int eof = sChannel.write(bb);
                 // If the socket times out, we count that as failed - 
                 // the server should respond within 10s
-                sockets[i].socket().setSoTimeout(10000);                
-                if (!sockets[i].socket().isClosed()){
-                    eof = sockets[i].socket().getInputStream().read(); 
+                sChannel.socket().setSoTimeout(10000);                
+                if (!sChannel.socket().isClosed()){
+                    eof = sChannel.socket().getInputStream().read(); 
                     if (eof != -1) {
-                        numConnected++;
+                        numConnected.incrementAndGet();
                     }
-                }
-            }            
+                }                   
+            }
             catch (IOException io) {
                 // "Connection reset by peer"
             }
         }
-        assertSame(numCnxns,numConnected);
+    }
+    
+    /**
+     * Verify the ability to limit the number of concurrent connections. 
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public void testMaxCnxns() throws IOException, InterruptedException{        
+        String split[] = hostPort.split(":");
+        host = split[0];
+        port = Integer.parseInt(split[1]);
+        int numThreads = numCnxns + 5;
+        CnxnThread[] threads = new CnxnThread[numThreads];
+
+        for (int i=0;i<numCnxns;++i) {            
+          threads[i] = new CnxnThread(i);
+        }
+        
+        for (int i=0;i<numCnxns;++i) {
+            threads[i].start();
+        }
+                        
+        for (int i=0;i<numCnxns;++i) {
+            threads[i].join();
+        }
+        assertSame(numCnxns,numConnected.get());
     }
 }



Mime
View raw message