hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1432339 - in /hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src: main/java/org/apache/hadoop/net/unix/DomainSocket.java test/java/org/apache/hadoop/net/unix/TestDomainSocket.java
Date Sat, 12 Jan 2013 00:12:30 GMT
Author: todd
Date: Sat Jan 12 00:12:30 2013
New Revision: 1432339

URL: http://svn.apache.org/viewvc?rev=1432339&view=rev
Log:
HDFS-4388. DomainSocket should throw AsynchronousCloseException when appropriate. Contributed
by Colin Patrick McCabe.

Modified:
    hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
    hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java

Modified: hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java?rev=1432339&r1=1432338&r2=1432339&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
(original)
+++ hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
Sat Jan 12 00:12:30 2013
@@ -25,6 +25,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.SocketException;
+import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -207,10 +209,13 @@ public class DomainSocket implements Clo
    */
   public DomainSocket accept() throws IOException {
     fdRef();
+    boolean exc = true;
     try {
-      return new DomainSocket(path, accept0(fd));
+      DomainSocket ret = new DomainSocket(path, accept0(fd));
+      exc = false;
+      return ret;
     } finally {
-      fdUnref();
+      fdUnref(exc);
     }
   }
 
@@ -235,20 +240,23 @@ public class DomainSocket implements Clo
    *
    * @throws SocketException  If the file descriptor is closed.
    */
-  private void fdRef() throws SocketException {
+  private void fdRef() throws ClosedChannelException {
     int bits = status.incrementAndGet();
     if ((bits & STATUS_CLOSED_MASK) != 0) {
       status.decrementAndGet();
-      throw new SocketException("Socket is closed.");
+      throw new ClosedChannelException();
     }
   }
 
   /**
    * Decrement the reference count of the underlying file descriptor.
    */
-  private void fdUnref() {
+  private void fdUnref(boolean checkClosed) throws AsynchronousCloseException {
     int newCount = status.decrementAndGet();
-    assert newCount >= 0;
+    assert (newCount & ~STATUS_CLOSED_MASK) >= 0;
+    if (checkClosed & ((newCount & STATUS_CLOSED_MASK) != 0)) {
+      throw new AsynchronousCloseException();
+    }
   }
 
   /**
@@ -298,10 +306,12 @@ public class DomainSocket implements Clo
 
   public void setAttribute(int type, int size) throws IOException {
     fdRef();
+    boolean exc = true;
     try {
       setAttribute0(fd, type, size);
+      exc = false;
     } finally {
-      fdUnref();
+      fdUnref(exc);
     }
   }
 
@@ -309,10 +319,14 @@ public class DomainSocket implements Clo
 
   public int getAttribute(int type) throws IOException {
     fdRef();
+    int attribute;
+    boolean exc = true;
     try {
-      return getAttribute0(fd, type);
+      attribute = getAttribute0(fd, type);
+      exc = false;
+      return attribute;
     } finally {
-      fdUnref();
+      fdUnref(exc);
     }
   }
 
@@ -396,10 +410,12 @@ public class DomainSocket implements Clo
   public void sendFileDescriptors(FileDescriptor jfds[],
       byte jbuf[], int offset, int length) throws IOException {
     fdRef();
+    boolean exc = true;
     try {
       sendFileDescriptors0(fd, jfds, jbuf, offset, length);
+      exc = false;
     } finally {
-      fdUnref();
+      fdUnref(exc);
     }
   }
 
@@ -430,10 +446,13 @@ public class DomainSocket implements Clo
   public int receiveFileDescriptors(FileDescriptor[] jfds,
       byte jbuf[], int offset, int length) throws IOException {
     fdRef();
+    boolean exc = true;
     try {
-      return receiveFileDescriptors0(fd, jfds, jbuf, offset, length);
+      int nBytes = receiveFileDescriptors0(fd, jfds, jbuf, offset, length);
+      exc = false;
+      return nBytes;
     } finally {
-      fdUnref();
+      fdUnref(exc);
     }
   }
 
@@ -462,7 +481,6 @@ public class DomainSocket implements Clo
       success = true;
       return ret;
     } finally {
-      fdUnref();
       if (!success) {
         for (int i = 0; i < fds.length; i++) {
           if (fds[i] != null) {
@@ -481,6 +499,7 @@ public class DomainSocket implements Clo
           }
         }
       }
+      fdUnref(!success);
     }
   }
 
@@ -505,32 +524,40 @@ public class DomainSocket implements Clo
     @Override
     public int read() throws IOException {
       fdRef();
+      boolean exc = true;
       try {
         byte b[] = new byte[1];
         int ret = DomainSocket.readArray0(DomainSocket.this.fd, b, 0, 1);
+        exc = false;
         return (ret >= 0) ? b[0] : -1;
       } finally {
-        fdUnref();
+        fdUnref(exc);
       }
     }
     
     @Override
     public int read(byte b[], int off, int len) throws IOException {
       fdRef();
+      boolean exc = true;
       try {
-        return DomainSocket.readArray0(DomainSocket.this.fd, b, off, len);
+        int nRead = DomainSocket.readArray0(DomainSocket.this.fd, b, off, len);
+        exc = false;
+        return nRead;
       } finally {
-        fdUnref();
+        fdUnref(exc);
       }
     }
 
     @Override
     public int available() throws IOException {
       fdRef();
+      boolean exc = true;
       try {
-        return DomainSocket.available0(DomainSocket.this.fd);
+        int nAvailable = DomainSocket.available0(DomainSocket.this.fd);
+        exc = false;
+        return nAvailable;
       } finally {
-        fdUnref();
+        fdUnref(exc);
       }
     }
 
@@ -553,22 +580,26 @@ public class DomainSocket implements Clo
     @Override
     public void write(int val) throws IOException {
       fdRef();
+      boolean exc = true;
       try {
         byte b[] = new byte[1];
         b[0] = (byte)val;
         DomainSocket.writeArray0(DomainSocket.this.fd, b, 0, 1);
+        exc = false;
       } finally {
-        fdUnref();
+        fdUnref(exc);
       }
     }
 
     @Override
     public void write(byte[] b, int off, int len) throws IOException {
       fdRef();
+        boolean exc = true;
       try {
         DomainSocket.writeArray0(DomainSocket.this.fd, b, off, len);
+        exc = false;
       } finally {
-        fdUnref();
+        fdUnref(exc);
       }
     }
   }
@@ -588,6 +619,7 @@ public class DomainSocket implements Clo
     @Override
     public int read(ByteBuffer dst) throws IOException {
       fdRef();
+      boolean exc = true;
       try {
         int nread = 0;
         if (dst.isDirect()) {
@@ -605,9 +637,10 @@ public class DomainSocket implements Clo
         if (nread > 0) {
           dst.position(dst.position() + nread);
         }
+        exc = false;
         return nread;
       } finally {
-        fdUnref();
+        fdUnref(exc);
       }
     }
   }

Modified: hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java?rev=1432339&r1=1432338&r2=1432339&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java
(original)
+++ hadoop/common/branches/HDFS-347/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java
Sat Jan 12 00:12:30 2013
@@ -25,6 +25,8 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.ClosedChannelException;
 import java.util.Arrays;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
@@ -96,8 +98,47 @@ public class TestDomainSocket {
   }
 
   /**
-   * Test that if one thread is blocking in accept(), another thread
-   * can close the socket and stop the accept.
+   * Test that we get a read result of -1 on EOF.
+   *
+   * @throws IOException
+   */
+  @Test(timeout=180000)
+  public void testSocketReadEof() throws Exception {
+    final String TEST_PATH = new File(sockDir.getDir(),
+        "testSocketReadEof").getAbsolutePath();
+    final DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH);
+    ExecutorService exeServ = Executors.newSingleThreadExecutor();
+    Callable<Void> callable = new Callable<Void>() {
+      public Void call(){
+        DomainSocket conn;
+        try {
+          conn = serv.accept();
+        } catch (IOException e) {
+          throw new RuntimeException("unexpected IOException", e);
+        }
+        byte buf[] = new byte[100];
+        for (int i = 0; i < buf.length; i++) {
+          buf[i] = 0;
+        }
+        try {
+          Assert.assertEquals(-1, conn.getInputStream().read());
+        } catch (IOException e) {
+          throw new RuntimeException("unexpected IOException", e);
+        }
+        return null;
+      }
+    };
+    Future<Void> future = exeServ.submit(callable);
+    DomainSocket conn = DomainSocket.connect(serv.getPath());
+    Thread.sleep(50);
+    conn.close();
+    serv.close();
+    future.get(2, TimeUnit.MINUTES);
+  }
+
+  /**
+   * Test that if one thread is blocking in a read or write operation, another
+   * thread can close the socket and stop the accept.
    *
    * @throws IOException
    */
@@ -113,8 +154,10 @@ public class TestDomainSocket {
           serv.accept();
           throw new RuntimeException("expected the accept() to be " +
               "interrupted and fail");
-        } catch (IOException e) {
+        } catch (AsynchronousCloseException e) {
           return null;
+        } catch (IOException e) {
+          throw new RuntimeException("unexpected IOException", e);
         }
       }
     };
@@ -125,6 +168,96 @@ public class TestDomainSocket {
   }
 
   /**
+   * Test that we get an AsynchronousCloseException when the DomainSocket
+   * we're using is closed during a read or write operation.
+   *
+   * @throws IOException
+   */
+  private void testAsyncCloseDuringIO(final boolean closeDuringWrite)
+      throws Exception {
+    final String TEST_PATH = new File(sockDir.getDir(),
+        "testAsyncCloseDuringIO(" + closeDuringWrite + ")").getAbsolutePath();
+    final DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH);
+    ExecutorService exeServ = Executors.newFixedThreadPool(2);
+    Callable<Void> serverCallable = new Callable<Void>() {
+      public Void call() {
+        DomainSocket serverConn = null;
+        try {
+          serverConn = serv.accept();
+          byte buf[] = new byte[100];
+          for (int i = 0; i < buf.length; i++) {
+            buf[i] = 0;
+          }
+          // The server just continues either writing or reading until someone
+          // asynchronously closes the client's socket.  At that point, all our
+          // reads return EOF, and writes get a socket error.
+          if (closeDuringWrite) {
+            try {
+              while (true) {
+                serverConn.getOutputStream().write(buf);
+              }
+            } catch (IOException e) {
+            }
+          } else {
+            do { ; } while 
+              (serverConn.getInputStream().read(buf, 0, buf.length) != -1);
+          }
+        } catch (IOException e) {
+          throw new RuntimeException("unexpected IOException", e);
+        } finally {
+          IOUtils.cleanup(DomainSocket.LOG, serverConn);
+        }
+        return null;
+      }
+    };
+    Future<Void> serverFuture = exeServ.submit(serverCallable);
+    final DomainSocket clientConn = DomainSocket.connect(serv.getPath());
+    Callable<Void> clientCallable = new Callable<Void>() {
+      public Void call(){
+        // The client writes or reads until another thread
+        // asynchronously closes the socket.  At that point, we should
+        // get ClosedChannelException, or possibly its subclass
+        // AsynchronousCloseException.
+        byte buf[] = new byte[100];
+        for (int i = 0; i < buf.length; i++) {
+          buf[i] = 0;
+        }
+        try {
+          if (closeDuringWrite) {
+            while (true) {
+              clientConn.getOutputStream().write(buf);
+            }
+          } else {
+            while (true) {
+              clientConn.getInputStream().read(buf, 0, buf.length);
+            }
+          }
+        } catch (ClosedChannelException e) {
+          return null;
+        } catch (IOException e) {
+          throw new RuntimeException("unexpected IOException", e);
+        }
+      }
+    };
+    Future<Void> clientFuture = exeServ.submit(clientCallable);
+    Thread.sleep(500);
+    clientConn.close();
+    serv.close();
+    clientFuture.get(2, TimeUnit.MINUTES);
+    serverFuture.get(2, TimeUnit.MINUTES);
+  }
+  
+  @Test(timeout=180000)
+  public void testAsyncCloseDuringWrite() throws Exception {
+    testAsyncCloseDuringIO(true);
+  }
+  
+  @Test(timeout=180000)
+  public void testAsyncCloseDuringRead() throws Exception {
+    testAsyncCloseDuringIO(false);
+  }
+  
+  /**
    * Test that attempting to connect to an invalid path doesn't work.
    *
    * @throws IOException



Mime
View raw message