commons-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mt...@apache.org
Subject svn commit: r1146842 - in /commons/sandbox/runtime/trunk/src/main: java/org/apache/commons/runtime/net/ native/os/unix/ native/os/win32/
Date Thu, 14 Jul 2011 18:47:20 GMT
Author: mturk
Date: Thu Jul 14 18:47:20 2011
New Revision: 1146842

URL: http://svn.apache.org/viewvc?rev=1146842&view=rev
Log:
Retain socket for long operations

Modified:
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Connection.java
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/LocalEndpoint.java
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SocketEndpoint.java
    commons/sandbox/runtime/trunk/src/main/native/os/unix/inetsock.c
    commons/sandbox/runtime/trunk/src/main/native/os/unix/localsock.c
    commons/sandbox/runtime/trunk/src/main/native/os/win32/localsock.c

Modified: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Connection.java
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Connection.java?rev=1146842&r1=1146841&r2=1146842&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Connection.java
(original)
+++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Connection.java
Thu Jul 14 18:47:20 2011
@@ -38,6 +38,45 @@ public abstract class Connection extends
     }
 
     /**
+     * Connects this endpoint to the given remote host address specified
+     * by the EndpointAddress {@code endpoint} with the specified timeout.
+     * The connecting method will block until the connection is established
+     * or an error occurred.
+     *
+     * @param endpoint
+     *          the address of the remote host to connect to.
+     * @param timeout
+     *          the timeout value in milliseconds or {@code -1} for an
+     *          infinite timeout. If value is {@code 0} the endpoint
+     *          timeout is used.
+     * @throws IllegalArgumentException
+     *          if the given EndpointAddress is invalid or not supported.
+     * @throws IOException
+     *          if the socket is already connected or an error
+     *          occurs while connecting.
+     */
+    public abstract void connect(EndpointAddress endpoint, int timeout)
+        throws IllegalArgumentException, IOException;
+
+    /**
+     * Connects this endpoint to the given remote host address specified
+     * by the EndpointAddress {@code endpoint}.
+     *
+     * @param endpoint
+     *          the address of the remote host to connect to.
+     * @throws IllegalArgumentException
+     *          if the given EndpointAddress is invalid or not supported.
+     * @throws IOException
+     *          if the socket is already connected or an error
+     *          occurs while connecting.
+     */
+    public synchronized final void connect(EndpointAddress endpoint)
+        throws IllegalArgumentException, IOException
+    {
+        connect(endpoint, -1);
+    }
+        
+    /**
      * Shut down part of a full-duplex connection.
      * Any further data read or sent to this
      * socket will be discarded depending on the how flag.

Modified: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/LocalEndpoint.java
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/LocalEndpoint.java?rev=1146842&r1=1146841&r2=1146842&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/LocalEndpoint.java
(original)
+++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/LocalEndpoint.java
Thu Jul 14 18:47:20 2011
@@ -46,8 +46,6 @@ public class LocalEndpoint extends Conne
     private EndpointAddress             ea;
     private SelectionKeyImpl            key;
     private boolean                     connected = false;
-    private boolean                     canread   = true;
-    private boolean                     canwrite  = true;
     private SocketStream                stream = null;
     private static native int           connect0(long fd, byte[] sa, int timeout);
 
@@ -74,6 +72,7 @@ public class LocalEndpoint extends Conne
         connected = true;
     }
 
+    @Override
     public void connect(EndpointAddress endpoint, int timeout)
         throws IOException
     {
@@ -97,12 +96,6 @@ public class LocalEndpoint extends Conne
         connected = true;
     }
 
-    public final void connect(EndpointAddress endpoint)
-        throws IOException
-    {
-        connect(endpoint, -1);
-    }
-
     @Override
     public Descriptor descriptor()
     {
@@ -195,10 +188,8 @@ public class LocalEndpoint extends Conne
         throws IOException
     {
         sd.shutdown(how);
-        if (how == ShutdownHow.RDWR || how == ShutdownHow.READ)
-            canread  = false;
-        if (how == ShutdownHow.RDWR || how == ShutdownHow.WRITE)
-            canwrite = false;
+        if (how == ShutdownHow.RDWR)
+            connected  = false;
     }
 
     @Override
@@ -211,7 +202,7 @@ public class LocalEndpoint extends Conne
     public final Stream getStream()
         throws IOException
     {
-        if (canread || canwrite) {
+        if (connected) {
             if (stream != null)
                 return stream;
             stream = new SocketStream(sd);

Modified: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SocketEndpoint.java
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SocketEndpoint.java?rev=1146842&r1=1146841&r2=1146842&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SocketEndpoint.java
(original)
+++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SocketEndpoint.java
Thu Jul 14 18:47:20 2011
@@ -64,22 +64,20 @@ public class SocketEndpoint extends Conn
         connected = true;
     }
 
-    public void connect(SocketAddress endpoint, int timeout)
+    @Override
+    public void connect(EndpointAddress endpoint, int timeout)
         throws IOException
     {
         if (connected)
             throw new IOException(Local.sm.get("endpoint.ECONNECTED"));
+        if (!(endpoint instanceof SocketAddress))
+            throw new IllegalArgumentException();
+        SocketAddress sockaddr = (SocketAddress)endpoint;
         if (sd.closed())
-            sd.create(endpoint.getFamily(), SocketType.STREAM);
-        
-        ea = endpoint;
-        connected = true;
-    }
+            sd.create(sockaddr.getFamily(), SocketType.STREAM);
 
-    public synchronized final void connect(SocketAddress endpoint)
-        throws IOException
-    {
-        connect(endpoint, -1);
+        ea = sockaddr;
+        connected = true;
     }
 
     @Override
@@ -178,16 +176,27 @@ public class SocketEndpoint extends Conn
         throws IOException
     {
         sd.shutdown(how);
+        if (how == ShutdownHow.RDWR)
+            connected  = false;
     }
 
     @Override
     public final Stream getStream()
         throws IOException
     {
-        if (stream != null)
+        if (connected) {
+            if (stream != null)
+                return stream;
+            stream = new SocketStream(sd);
             return stream;
-        stream = new SocketStream(sd);
-        return stream;
+        }
+        else {
+            /* We only provide bidirectional streams
+             * unlike Java Socket api which has input and
+             * output streams separated.
+             */
+            throw new IOException(Local.sm.get("endpoint.SHUTRDWR"));
+        }
     }
 
     @Override

Modified: commons/sandbox/runtime/trunk/src/main/native/os/unix/inetsock.c
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/unix/inetsock.c?rev=1146842&r1=1146841&r2=1146842&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/unix/inetsock.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/unix/inetsock.c Thu Jul 14 18:47:20 2011
@@ -25,6 +25,24 @@
 #include <poll.h>
 #include <sys/un.h>
 
+ACR_INLINE(int) _retain_sd(acr_fd_t *sd)
+{
+    AcrAtomic32Inc(&sd->refs);
+    return sd->u.s;
+}
+
+ACR_INLINE(void) _release_sd(acr_fd_t *sd)
+{
+    if (AcrAtomic32Dec(&sd->refs) == 0) {
+        /* Socket was closed while we were
+         * executing the native method.
+         * Since Socket didn't free the fd
+         * we have to do it here.
+         */
+        AcrFree(sd);
+    }
+}
+
 ACR_NET_EXPORT(jlong, SocketDescriptor, socket0)(JNI_STDARGS, jint saf,
                                                  jint stype, jboolean block)
 {
@@ -99,15 +117,16 @@ ACR_NET_EXPORT(jlong, SocketDescriptor, 
 
 ACR_NET_EXPORT(jint, SocketDescriptor, close0)(JNI_STDARGS, jlong fp)
 {
-    int rc = 0;
+    int sd, rc = 0;
     acr_fd_t *fd  = J2P(fp, acr_fd_t *);
 
     if (fd == 0)
         return ACR_EBADF;
-    if (fd->u.s != -1) {
-        if (r_close(fd->u.s) == -1)
-            rc = ACR_GET_OS_ERROR();
+    sd = fd->u.s;
+    if (sd != -1) {
         fd->u.s = -1;
+        if (r_close(sd) == -1)
+            rc = ACR_GET_OS_ERROR();
     }
     if (AcrAtomic32Dec(&fd->refs) == 0)
         AcrFree(fd);
@@ -129,6 +148,7 @@ ACR_NET_EXPORT(jint, SocketDescriptor, s
                                                   jint how)
 {
     int rc = 0;
+    int sd;
     acr_fd_t *fd = J2P(fp, acr_fd_t *);
 
     if (fd == 0)
@@ -139,8 +159,10 @@ ACR_NET_EXPORT(jint, SocketDescriptor, s
         how = SHUT_WR;
     else
         how = SHUT_RDWR;
+    sd = _retain_sd(fd);
     if (shutdown(fd->u.s, how) == -1)
         rc = ACR_GET_NETOS_ERROR();
+    _release_sd(fd);
     if (how != 1)
         fd->flags |= ACR_DT_HITEOF;
     return rc;

Modified: commons/sandbox/runtime/trunk/src/main/native/os/unix/localsock.c
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/unix/localsock.c?rev=1146842&r1=1146841&r2=1146842&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/unix/localsock.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/unix/localsock.c Thu Jul 14 18:47:20
2011
@@ -34,17 +34,36 @@
 #define SOCKADDR_RELEASE(BA, SA) \
     AcrReleaseArrayCritical(env, (BA), (SA))
 
+ACR_INLINE(int) _retain_sd(acr_fd_t *sd)
+{
+    AcrAtomic32Inc(&sd->refs);
+    return sd->u.s;
+}
+
+ACR_INLINE(void) _release_sd(acr_fd_t *sd)
+{
+    if (AcrAtomic32Dec(&sd->refs) == 0) {
+        /* Socket was closed while we were
+         * executing the native method.
+         * Since Socket didn't free the fd
+         * we have to do it here.
+         */
+        AcrFree(sd);
+    }
+}
+
 ACR_NET_EXPORT(jint, LocalDescriptor, close0)(JNI_STDARGS, jlong fp)
 {
-    int rc = 0;
+    int sd, rc = 0;
     acr_fd_t *fd = J2P(fp, acr_fd_t *);
 
     if (fd == 0)
         return ACR_EBADF;
-    if (fd->u.s != -1) {
-        if (r_close(fd->u.s) == -1)
-            rc = ACR_GET_OS_ERROR();
+    sd = fd->u.s; 
+    if (sd != -1) {
         fd->u.s = -1;
+        if (r_close(sd) == -1)
+            rc = ACR_GET_OS_ERROR();
     }
     if (AcrAtomic32Dec(&fd->refs) == 0)
         AcrFree(fd);
@@ -55,6 +74,7 @@ ACR_NET_EXPORT(jint, LocalDescriptor, sh
                                                  jint how)
 {
     int rc = 0;
+    int sd;
     acr_fd_t *fd = J2P(fp, acr_fd_t *);
 
     if (fd == 0)
@@ -65,10 +85,12 @@ ACR_NET_EXPORT(jint, LocalDescriptor, sh
         how = SHUT_WR;
     else
         how = SHUT_RDWR;
-    if (shutdown(fd->u.s, how) == -1)
+    sd = _retain_sd(fd);
+    if (shutdown(sd, how) == -1)
         rc = ACR_GET_NETOS_ERROR();
     if (how != 1)
         fd->flags |= ACR_DT_HITEOF;
+    _release_sd(fd);
     return rc;
 }
 
@@ -258,20 +280,22 @@ ACR_NET_EXPORT(jint, LocalEndpoint, conn
                                               jbyteArray cb, jint timeout)
 {
     int rc;
+    int sd;
     acr_sockaddr_t *ca = SOCKADDR_CAST(cb);
     acr_fd_t *fd       = J2P(fp, acr_fd_t *);
 
+    sd = _retain_sd(fd);
     if (timeout == 0)
         timeout = fd->timeout;
     if (timeout > 0 && (fd->flags & ACR_DT_NONBLOCK) == 0) {
         /* Turn the socket to non-blocking mode
          */
-        if ((rc = AcrNonblock(fd->u.s, 1)) != 0)
-            return rc;
+        if ((rc = AcrNonblock(sd, 1)) != 0)
+            goto finally;
     }
     do {
         /* Restartable connect */
-        rc = connect(fd->u.s, (const struct sockaddr *)&ca->sa.sin, ca->salen);
+        rc = connect(sd, (const struct sockaddr *)&ca->sa.sin, ca->salen);
     } while (rc == -1 && errno == EINTR);
 
     if (rc == -1)
@@ -285,7 +309,7 @@ ACR_NET_EXPORT(jint, LocalEndpoint, conn
                 if (rc == 0) {
                     int       err;
                     socklen_t len = sizeof(err);
-                    if (getsockopt(fd->u.s, SOL_SOCKET, SO_ERROR, (char *)&err, &len)
== -1);
+                    if (getsockopt(sd, SOL_SOCKET, SO_ERROR, (char *)&err, &len)
== -1);
                         rc = errno;
                     if (err != 0)
                         rc = err;
@@ -296,6 +320,8 @@ ACR_NET_EXPORT(jint, LocalEndpoint, conn
                  AcrNonblock(fd->u.s, 0);
         }
     }
+finally:
+    _release_sd(fd);
     return rc;
 }
 
@@ -331,6 +357,7 @@ ACR_NET_EXPORT(jlong, LocalServerEndpoin
                                                     jboolean block)
 {
     int sd;
+    int ad;
     acr_sockaddr_t  aa;
     socklen_t       aalen;
     acr_fd_t *fd  = J2P(fp, acr_fd_t *);
@@ -343,17 +370,17 @@ ACR_NET_EXPORT(jlong, LocalServerEndpoin
         flags |= SOCK_NONBLOCK;
 # endif
 #endif
-
+    ad = _retain_sd(fd);
     memset(&aa, 0, sizeof(aa));
     aalen = ISIZEOF(struct sockaddr_un);
     do {
 #if HAVE_ACCEPT4
-        sd = accept4(fd->u.s, (struct sockaddr *)&aa.sa, &aalen, flags);
+        sd = accept4(ad, (struct sockaddr *)&aa.sa, &aalen, flags);
 #else
-        sd = accept(fd->u.s,  (struct sockaddr *)&aa.sa, &aalen);
+        sd = accept(ad,  (struct sockaddr *)&aa.sa, &aalen);
 #endif
     } while (sd == -1 && errno == EINTR);
-
+    _release_sd(fd);
     if (sd == -1) {
         ACR_THROW_NET_ERRNO();
         return 0;

Modified: commons/sandbox/runtime/trunk/src/main/native/os/win32/localsock.c
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/win32/localsock.c?rev=1146842&r1=1146841&r2=1146842&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/win32/localsock.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/win32/localsock.c Thu Jul 14 18:47:20
2011
@@ -46,17 +46,38 @@ typedef struct wls_fd_t {
     HANDLE   fh;
 } wls_fd_t;
 
+ACR_INLINE(SOCKET) _retain_sd(acr_fd_t *sd)
+{
+    AcrAtomic32Inc(&sd->refs);
+    return sd->u.s;
+}
+
+ACR_INLINE(void) _release_sd(acr_fd_t *sd)
+{
+    if (AcrAtomic32Dec(&sd->refs) == 0) {
+        /* Socket was closed while we were
+         * executing the native method.
+         * Since Socket didn't free the fd
+         * we have to do it here.
+         */
+        AcrFree(sd);
+    }
+}
+
 ACR_NET_EXPORT(jint, LocalDescriptor, close0)(JNI_STDARGS, jlong fp)
 {
     int rc = 0;
+    SOCKET sd;
     wls_fd_t *wd = J2P(fp, wls_fd_t *);
 
     if (wd == 0)
         return ACR_EBADF;
-    if (wd->fd.u.s != INVALID_SOCKET) {
+    sd = wd->fd.u.s;
+    if (sd != INVALID_SOCKET) {
+        wd->fd.u.s = INVALID_SOCKET;
         SAFE_CLOSE_HANDLE(wd->fh);
         wd->fh = 0;
-        if (closesocket(wd->fd.u.s) == SOCKET_ERROR)
+        if (closesocket(sd) == SOCKET_ERROR)
             rc = ACR_GET_NETOS_ERROR();
         wd->fd.u.s = INVALID_SOCKET;
     }
@@ -69,6 +90,7 @@ ACR_NET_EXPORT(jint, LocalDescriptor, sh
                                                  jint how)
 {
     int rc = 0;
+    SOCKET sd;
     acr_fd_t *fd = J2P(fp, acr_fd_t *);
 
     if (fd == 0)
@@ -79,8 +101,10 @@ ACR_NET_EXPORT(jint, LocalDescriptor, sh
         how = SD_SEND;
     else
         how = SD_BOTH;
+    sd = _retain_sd(fd);
     if (shutdown(fd->u.s, how) == SOCKET_ERROR)
         rc = ACR_GET_NETOS_ERROR();
+    _release_sd(fd);
     return rc;
 }
 
@@ -231,6 +255,7 @@ ACR_NET_EXPORT(jint, LocalEndpoint, conn
 {
     int rc;
     char   ssbuf[32];
+    SOCKET sd;
     HANDLE sfp;
     DWORD  pid, port;
     struct sockaddr_in sa;
@@ -266,34 +291,36 @@ ACR_NET_EXPORT(jint, LocalEndpoint, conn
     sa.sin_addr.s_addr = inet_addr("127.0.0.1");
     if (timeout == 0)
         timeout = wd->fd.timeout;
+    sd = _retain_sd((acr_fd_t *)wd);
     if (timeout > 0 && (wd->fd.flags & ACR_DT_NONBLOCK) == 0) {
         /* Turn the socket to non-blocking mode
          * for the duration of the connect call.
          */
-        if ((rc = AcrNonblock(wd->fd.u.s, 1)) != 0)
-            return rc;
+        if ((rc = AcrNonblock(sd, 1)) != 0)
+            goto finally;;
     }
-
-    rc = connect(wd->fd.u.s, (SOCKADDR *)&sa, sas);
+    rc = connect(sd, (SOCKADDR *)&sa, sas);
     if (rc == SOCKET_ERROR)
         rc = ACR_GET_NETOS_ERROR();
     if (rc != 0) {
         if (timeout > 0) {
             if (rc == WSAEINPROGRESS || rc == WSAEALREADY || rc == WSAEWOULDBLOCK) {
-                rc = AcrWaitIO(wd->fd.u.s, timeout, FD_WRITE);
+                rc = AcrWaitIO(sd, timeout, FD_WRITE);
                 if (rc == 0) {
                     int       err;
                     socklen_t len = sizeof(err);
-                    if (getsockopt(wd->fd.u.s, SOL_SOCKET, SO_ERROR, (char *)&err,
&len) == -1);
+                    if (getsockopt(sd, SOL_SOCKET, SO_ERROR, (char *)&err, &len)
== -1);
                         rc = ACR_GET_NETOS_ERROR();
                     if (err != 0)
                         rc = err;
                 }
             }
             if ((wd->fd.type & ACR_DT_NONBLOCK) == 0)
-                 AcrNonblock(wd->fd.u.s, 0);            
+                 AcrNonblock(sd, 0);
         }
     }
+finally:
+    _release_sd((acr_fd_t)wd);
     return rc;
 }
 
@@ -378,6 +405,7 @@ ACR_NET_EXPORT(jlong, LocalServerEndpoin
                                                     jboolean block)
 {
     SOCKET sd;
+    SOCKET ad;
     acr_sockaddr_t    *aa;
     struct sockaddr_in sa;
     int sas = ISIZEOF(sa);
@@ -385,8 +413,9 @@ ACR_NET_EXPORT(jlong, LocalServerEndpoin
     wls_fd_t *wd  = J2P(fp, wls_fd_t *);
 
     memset(&sa, 0, sizeof(sa));
-    sd = accept(wd->fd.u.s,  0, 0);
-
+    ad = _retain_sd((acr_fd_t *)wd);
+    sd = accept(ad,  0, 0);
+    _release_sd((acr_fd_t *)wd);
     if (sd == INVALID_SOCKET) {
         ACR_THROW_NET_ERRNO();
         return 0;



Mime
View raw message