commons-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mt...@apache.org
Subject svn commit: r1144159 - in /commons/sandbox/runtime/trunk/src/main/native/os: unix/pollset.c win32/pollset.c win32/selectset.c
Date Fri, 08 Jul 2011 05:09:59 GMT
Author: mturk
Date: Fri Jul  8 05:09:58 2011
New Revision: 1144159

URL: http://svn.apache.org/viewvc?rev=1144159&view=rev
Log:
Implement native win32 legacy select based selector

Modified:
    commons/sandbox/runtime/trunk/src/main/native/os/unix/pollset.c
    commons/sandbox/runtime/trunk/src/main/native/os/win32/pollset.c
    commons/sandbox/runtime/trunk/src/main/native/os/win32/selectset.c

Modified: commons/sandbox/runtime/trunk/src/main/native/os/unix/pollset.c
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/unix/pollset.c?rev=1144159&r1=1144158&r2=1144159&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/unix/pollset.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/unix/pollset.c Fri Jul  8 05:09:58 2011
@@ -110,7 +110,7 @@ ACR_NET_EXPORT(jlong, PollSelector, crea
     if (ps->fdset == 0)
         return 0;
     ps->ooset    = ACR_MALLOC(acr_pollfd_t,  ps->size);
-    if (ps->fdset == 0) {
+    if (ps->ooset == 0) {
         AcrFree(ps->fdset);
         return 0;
     }

Modified: commons/sandbox/runtime/trunk/src/main/native/os/win32/pollset.c
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/win32/pollset.c?rev=1144159&r1=1144158&r2=1144159&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/win32/pollset.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/win32/pollset.c Fri Jul  8 05:09:58 2011
@@ -131,7 +131,7 @@ ACR_NET_EXPORT(jlong, PollSelector, crea
     if (ps->fdset == 0)
         return 0;
     ps->ooset    = ACR_MALLOC(acr_pollfd_t,  ps->size);
-    if (ps->fdset == 0) {
+    if (ps->ooset == 0) {
         AcrFree(ps->fdset);
         return 0;
     }

Modified: commons/sandbox/runtime/trunk/src/main/native/os/win32/selectset.c
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/win32/selectset.c?rev=1144159&r1=1144158&r2=1144159&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/win32/selectset.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/win32/selectset.c Fri Jul  8 05:09:58
2011
@@ -22,6 +22,43 @@
 #include "acr/iodefs.h"
 #include "acr/netapi.h"
 #include "arch_opts.h"
+#include "arch_sync.h"
+
+/* pollset operation states */
+#define PSS_DESTROY     1
+#define PSS_POLL        2
+#define PSS_WAIT        3
+#define PSS_WAKEUP      4
+
+#define WAKEUP_IF_POLL()                                \
+    if (AcrAtomic32Equ(&ps->state, PSS_POLL)) {         \
+        char ch = 1;                                    \
+        AcrAtomic32Set(&ps->state, PSS_WAKEUP);         \
+        send(ps->wpipe[1], &ch, 1, 0);                  \
+    } else (void)0
+
+typedef struct acr_pollfd_t {
+    SOCKET         sd;
+    short          ievents;
+    short          revents;
+    jobject        obj;
+    acr_time_t     ttl;
+    acr_time_t     exp;
+} acr_pollfd_t;
+
+typedef struct acr_pollset_t {
+    fd_set                      rdset;
+    fd_set                      wrset;
+    fd_set                      exset;
+    acr_pollfd_t               *ooset;
+    int                         used;
+    int                         size;
+    volatile acr_atomic32_t     state;
+    volatile acr_atomic32_t     waiters;
+    SOCKET                      wpipe[2];
+    CRITICAL_SECTION            mutex;
+    HANDLE                      wakeup;
+} acr_pollset_t;
 
 ACR_NET_EXPORT(jint, SocketSelectorFactory, type0)(JNI_STDARGS)
 {
@@ -48,3 +85,471 @@ ACR_NET_EXPORT(jint, LocalSelectorFactor
 {
     return MAXIMUM_WAIT_OBJECTS - 1;
 }
+
+static int wwait(acr_pollset_t *ps)
+{
+    DWORD ws;
+
+    AcrAtomic32Inc(&ps->waiters);
+    LeaveCriticalSection(&ps->mutex);
+    ws = WaitForSingleObject(ps->wakeup, INFINITE);
+    EnterCriticalSection(&ps->mutex);
+    if (AcrAtomic32Dec(&ps->waiters) == 0)
+        ResetEvent(ps->wakeup);
+    if (ws == WAIT_FAILED)
+        return GetLastError();
+    else
+        return 0;
+}
+
+ACR_NET_EXPORT(jlong, LegacySelector, create0)(JNI_STDARGS, jint size)
+{
+    int rc;
+    acr_pollset_t *ps;
+
+    if (!ACR_HAVE_LATE_DLL_FUNC(WSAPoll)) {
+        ACR_THROW_NET_ERROR(ACR_ENOTIMPL);
+        return 0;
+    }
+    ps = ACR_TALLOC(acr_pollset_t);
+    if (ps == 0)
+        return 0;
+    ps->wpipe[0] = -1;
+    ps->wpipe[1] = -1;
+    ps->size     = size + 1;
+    ps->used     = 1;
+
+    ps->ooset    = ACR_MALLOC(acr_pollfd_t,  ps->size);
+    if (ps->ooset == 0)
+        return 0;
+    if ((rc = AcrSocketPair(&ps->wpipe[0], &ps->wpipe[1], 0)) != 0) {
+        ACR_THROW_NET_ERROR(rc);
+        goto cleanup;
+    }
+    /* Add the wakeup pipe to the pset
+     */
+    ps->ooset[0].sd      = ps->wpipe[0];
+    ps->ooset[0].ievents = POLLIN;
+    ps->ooset[0].revents = 0;
+    ps->ooset[0].obj     = 0;
+    ps->ooset[0].ttl     = ACR_INFINITE;
+    ps->ooset[0].exp     = ACR_INFINITE;
+    /* Add wakeup pipe to the read set */
+    FD_SET(ps->ooset[0].sd, &ps->rdset);
+
+    if (!InitializeCriticalSectionAndSpinCount(&ps->mutex, 4000)) {
+        ACR_THROW_NET_ERRNO();
+        goto cleanup;
+    }
+    if ((ps->wakeup = CreateEvent(0, FALSE, FALSE, 0)) == 0) {
+        ACR_THROW_NET_ERRNO();
+        DeleteCriticalSection(&ps->mutex);
+        goto cleanup;
+    }
+    return P2J(ps);
+
+cleanup:
+    AcrFree(ps->ooset);
+    AcrFree(ps);
+    return 0;
+}
+
+ACR_NET_EXPORT(int, LegacySelector, destroy0)(JNI_STDARGS, jlong pollset)
+{
+    int i;
+    int rc = 0;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    EnterCriticalSection(&ps->mutex);
+    if (!AcrAtomic32Equ(&ps->state, 0)) {
+        int  state = AcrAtomic32Set(&ps->state, PSS_DESTROY);
+        if (state == PSS_POLL) {
+            char ch   = 1;
+            send(ps->wpipe[1], &ch, 1, 0);
+        }
+        /* Wait until the wait0 call breaks.
+         * Since we set the state to DESTROY
+         * wait0 will return 0.
+         */
+        if ((rc = wwait(ps)) != 0) {
+            LeaveCriticalSection(&ps->mutex);
+            return rc;
+        }
+    }
+    AcrAtomic32Set(&ps->state, PSS_DESTROY);
+    SetEvent(ps->wakeup);
+    for (i = 1; i < ps->used; i++) {
+        AcrSelectionKeyReset(env, ps->ooset[i].obj);
+        /* Invalidate the container. */
+        (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
+    }
+    ps->used = 0;
+    LeaveCriticalSection(&ps->mutex);
+    closesocket(ps->wpipe[0]);
+    closesocket(ps->wpipe[1]);
+    CloseHandle(ps->wakeup);
+    DeleteCriticalSection(&ps->mutex);
+    AcrFree(ps->ooset);
+    AcrFree(ps);
+    return rc;
+}
+
+ACR_NET_EXPORT(jint, LegacySelector, clr0)(JNI_STDARGS, jlong pollset,
+                                           jobjectArray rs)
+{
+    int i;
+    int cnt = 0;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    EnterCriticalSection(&ps->mutex);
+    while (!AcrAtomic32Equ(&ps->state, 0)) {
+        if (AcrAtomic32Equ(&ps->state, PSS_DESTROY)) {
+            /* Interrupted by destroy0 */
+            LeaveCriticalSection(&ps->mutex);
+            return 0;
+        }
+        WAKEUP_IF_POLL();
+        /* Wait until the wait0 call breaks.
+         * Since we set the state to DESTROY
+         * wait0 will return 0.
+         */
+        if (wwait(ps) != 0) {
+            LeaveCriticalSection(&ps->mutex);
+            ACR_THROW(ACR_EX_EILLEGAL, 0);
+            return 0;
+        }
+    }
+    for (i = 1; i < ps->used; i++) {
+        (*env)->SetObjectArrayElement(env, rs, cnt++, ps->ooset[i].obj);
+        /* Unref the container. */
+        (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
+    }
+    ps->used = 1;
+    LeaveCriticalSection(&ps->mutex);
+    return cnt;
+}
+
+ACR_NET_EXPORT(void, LegacySelector, wakeup0)(JNI_STDARGS, jlong pollset)
+{
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    EnterCriticalSection(&ps->mutex);
+    WAKEUP_IF_POLL();
+    LeaveCriticalSection(&ps->mutex);
+}
+
+ACR_NET_EXPORT(jint, LegacySelector, size0)(JNI_STDARGS, jlong pollset)
+{
+    int rv;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    EnterCriticalSection(&ps->mutex);
+    rv = ps->used - 1;
+    LeaveCriticalSection(&ps->mutex);
+    return rv;
+}
+
+ACR_NET_EXPORT(jint, LegacySelector, add0)(JNI_STDARGS, jlong pollset, jobject fo,
+                                           jlong fp, jint events, jint ttlms)
+{
+    int i, rc = 0;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+    acr_fd_t *fd      = J2P(fp, acr_fd_t *);
+
+    EnterCriticalSection(&ps->mutex);
+    while (!AcrAtomic32Equ(&ps->state, 0)) {
+        if (AcrAtomic32Equ(&ps->state, PSS_DESTROY)) {
+            rc = 0;
+            goto cleanup;
+        }
+        WAKEUP_IF_POLL();
+        if ((rc = wwait(ps)) != 0)
+            goto cleanup;
+    }
+    if (ps->used == ps->size) {
+        /* Overflow
+         */
+        rc = ACR_EOVERFLOW;
+        goto cleanup;
+    }
+    for (i = 1; i < ps->used; i++) {
+        if (ps->ooset[i].sd == fd->u.s) {
+            /* Duplicate descriptor
+             */
+            rc = ACR_EALREADY;
+            goto cleanup;
+        }
+    }
+    ps->ooset[ps->used].sd      = fd->u.s;
+    ps->ooset[ps->used].ievents = (short)events;
+    ps->ooset[ps->used].revents = 0;
+    ps->ooset[ps->used].obj     = (*env)->NewGlobalRef(env, fo);
+    if (ps->ooset[ps->used].obj == 0) {
+        /* In case the NewGlobalRef fails,
+         * OutOfMemoryError should be thrown already by the JVM.
+         */
+        rc = ACR_ENOMEM;
+        goto cleanup;
+    }
+    if (ttlms > 0) {
+        ps->ooset[ps->used].ttl = AcrTimeFromMsec(ttlms);
+        ps->ooset[ps->used].exp = AcrTimeNow() + ps->ooset[ps->used].ttl;
+    }
+    else {
+        ps->ooset[ps->used].ttl = ACR_INFINITE;
+        ps->ooset[ps->used].exp = ACR_INFINITE;
+    }
+    if (events & ACR_OP_INP) {
+        FD_SET(fd->u.s, &ps->rdset);
+    }
+    if (events & ACR_OP_OUT) {
+        FD_SET(fd->u.s, &ps->wrset);
+    }
+    if (events & ~(ACR_OP_INP | ACR_OP_OUT)) {
+        FD_SET(fd->u.s, &ps->exset);
+    }
+    ps->used++;
+cleanup:
+    LeaveCriticalSection(&ps->mutex);
+    return rc;
+}
+
+ACR_NET_EXPORT(jint, LegacySelector, del0)(JNI_STDARGS, jlong pollset,
+                                           jobject fo, jlong fp)
+{
+    int i, rc = ACR_EOF;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    EnterCriticalSection(&ps->mutex);
+    if (ps->used < 2)
+        goto cleanup;
+    while (!AcrAtomic32Equ(&ps->state, 0)) {
+        if (AcrAtomic32Equ(&ps->state, PSS_DESTROY)) {
+            rc = 0;
+            goto cleanup;
+        }
+        WAKEUP_IF_POLL();
+        if ((rc = wwait(ps)) != 0)
+            goto cleanup;
+    }
+
+    for (i = 1; i < ps->used; i++) {
+        if ((*env)->IsSameObject(env, ps->ooset[i].obj, fo) == JNI_TRUE) {
+            int dest = i;
+            int used = ps->used;
+            ps->used--;
+            /* Unref descriptor */
+            (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
+            FD_CLR(ps->ooset[i].sd, &ps->rdset);
+            FD_CLR(ps->ooset[i].sd, &ps->wrset);
+            FD_CLR(ps->ooset[i].sd, &ps->exset);
+            for (++i; i < used; i++) {
+                ps->ooset[dest] = ps->ooset[i];
+                dest++;
+            }
+            rc = 0;
+        }
+    }
+
+cleanup:
+    LeaveCriticalSection(&ps->mutex);
+    return rc;
+}
+
+ACR_NET_EXPORT(jint, LegacySelector, wait0)(JNI_STDARGS, jlong pollset,
+                                            jobjectArray rs, jshortArray revents,
+                                            jint timeout, jboolean autocancel)
+{
+    int i, ns, rc = 0;
+    int rv = 0;
+    acr_time_t now = 0;
+    struct timeval  tv;
+    struct timeval *tp = 0;
+    jshort *pevents;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    EnterCriticalSection(&ps->mutex);
+    if (!AcrAtomic32Equ(&ps->state, 0)) {
+        /* Note that this should never happen if api is correctly used.
+         * wait cannot be run from multiple threads and cannot be run
+         * after destroy.
+         */
+        LeaveCriticalSection(&ps->mutex);
+        ACR_THROW(ACR_EX_EILLEGAL, 0);
+        return 0;
+    }
+    if (ps->used == 1) {
+        /* We only have the wakeup pipe in the pollset
+         * so there is no point to wait.
+         */
+        LeaveCriticalSection(&ps->mutex);
+        return 0;
+    }
+
+    AcrAtomic32Set(&ps->state, PSS_POLL);
+    LeaveCriticalSection(&ps->mutex);
+    if (timeout > 0) {
+        tp = &tv;
+        tp->tv_sec  = (long)(timeout / 1000);
+        tp->tv_usec = (long)(timeout % 1000) * 1000;
+    }
+    ns = select(ps->used, &ps->rdset, &ps->wrset, &ps->exset, tp);
+    if (ns == SOCKET_ERROR)
+        rc = ACR_GET_OS_ERROR();
+    EnterCriticalSection(&ps->mutex);
+    if (AcrAtomic32Equ(&ps->state, PSS_DESTROY)) {
+        /* Interrupted by destroy0 */
+        if (!AcrAtomic32Equ(&ps->waiters, 0))
+            SetEvent(ps->wakeup);
+        LeaveCriticalSection(&ps->mutex);
+        return 0;
+    }
+    if (rc != 0) {
+        /* Error during poll */
+        if (rc == WSAENOTSOCK) {
+            /* Find the closed socket(s) */
+            int optval;
+            int optlen = ISIZEOF(optval);
+
+            AcrAtomic32Set(&ps->state, PSS_WAIT);
+            pevents = JARRAY_CRITICAL(jshort, revents);
+            for (i = 1; i < ps->used; i++) {
+                ps->ooset[i].revents = 0;
+                if (getsockopt(ps->ooset[i].sd, SOL_SOCKET, SO_TYPE, (char *)&optval,
&optlen) == -1) {
+                    if (WSAGetLastError() == WSAENOTSOCK) {
+                        ps->ooset[i].revents = ACR_OP_NVAL;
+                        pevents[i] = ACR_OP_ERROR | ACR_OP_NVAL;
+                        (*env)->SetObjectArrayElement(env, rs, rv++, ps->ooset[i].obj);
+                    }
+                }
+            }
+            if (rv != 0)
+                goto cleanup;
+            /* Fallback to the error state.
+             * XXX: This should never happen if we got WSAENOTSOCK?
+             */
+            RELEASE_CRITICAL(revents, pevents);
+        }
+        AcrAtomic32Set(&ps->state, 0);
+        if (!AcrAtomic32Equ(&ps->waiters, 0))
+            SetEvent(ps->wakeup);
+        LeaveCriticalSection(&ps->mutex);
+        ACR_THROW_NET_ERROR(rc);
+        return 0;
+    }
+    if (ns == 0) {
+        /* Timeout on select operation
+         * Check for the expired sockets
+         */
+        AcrAtomic32Set(&ps->state, PSS_WAIT);
+        pevents   = JARRAY_CRITICAL(jshort, revents);
+        for (i = 1; i < ps->used; i++) {
+            ps->ooset[i].revents = 0;
+            if (ps->ooset[i].ttl > 0) {
+                if (now == 0)
+                    now = AcrTimeNow();
+                if (now > ps->ooset[i].exp) {
+                    /* Expired descriptor */
+                    ps->ooset[i].revents = ACR_OP_HANGUP;
+                    pevents[rv] = ACR_OP_TIMEOUT;
+                    (*env)->SetObjectArrayElement(env, rs, rv++, ps->ooset[i].obj);
+                }
+            }
+        }
+        goto cleanup;
+    }
+    if (AcrAtomic32Equ(&ps->state, PSS_WAKEUP)) {
+        /* Drain the wakeup pipe.
+         * Wakeup pipe is always at index zero.
+         */
+        AcrDrainSocket(ps->wpipe[0]);
+        AcrAtomic32Set(&ps->state, 0);
+        if (!AcrAtomic32Equ(&ps->waiters, 0))
+            SetEvent(ps->wakeup);
+        LeaveCriticalSection(&ps->mutex);
+        return 0;
+    }
+    AcrAtomic32Set(&ps->state, PSS_WAIT);
+    pevents   = JARRAY_CRITICAL(jshort, revents);
+    /* Cycle trough the descriptors */
+    for (i = 0; i < ps->used; i++) {
+        /* Get selected events */
+        ps->ooset[i].revents = 0;
+        if (FD_ISSET(ps->ooset[i].sd, &ps->rdset))
+            ps->ooset[i].revents |= ACR_OP_INP;
+        if (FD_ISSET(ps->ooset[i].sd, &ps->wrset))
+            ps->ooset[i].revents |= ACR_OP_OUT;
+        if (FD_ISSET(ps->ooset[i].sd, &ps->exset))
+            ps->ooset[i].revents |= ACR_OP_ERROR;
+        if (ps->ooset[i].revents != 0) {
+            if (i == 0) {
+                /* Drain the wakeup pipe.
+                 * Wakeup pipe is always at index zero.
+                 */
+                AcrDrainSocket(ps->wpipe[0]);
+                continue;
+            }
+            else {
+                pevents[rv] = ps->ooset[i].revents;
+                (*env)->SetObjectArrayElement(env, rs, rv++, ps->ooset[i].obj);
+                if (ps->ooset[i].ttl > 0) {
+                    /* Reset TTL
+                     */
+                    if (now == 0)
+                        now = AcrTimeNow();
+                    ps->ooset[i].exp = now + ps->ooset[i].ttl;
+                }
+            }
+        }
+        else {
+            /* Check for the expired descriptors.
+             */
+            if (ps->ooset[i].ttl > 0) {
+                if (now == 0)
+                    now = AcrTimeNow();
+                if (now > ps->ooset[i].exp) {
+                    /* Expired descriptor */
+                    ps->ooset[i].revents = ACR_OP_HANGUP;
+                    pevents[rv] = ACR_OP_TIMEOUT;
+                    (*env)->SetObjectArrayElement(env, rs, rv++, ps->ooset[i].obj);
+                }
+            }
+        }
+    }
+cleanup:
+    RELEASE_CRITICAL(revents, pevents);
+    if (autocancel == JNI_TRUE && rv > 0) {
+        /* Remove all descriptors with revents set except
+         * the wakeup pipe at index zero.
+         */
+        for (i = 1; i < ps->used; i++) {
+            if (ps->ooset[i].revents != 0) {
+                int dest = i;
+                int used = ps->used;
+                ps->used--;
+                /* Unref descriptor */
+                (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
+                FD_CLR(ps->ooset[i].sd, &ps->rdset);
+                FD_CLR(ps->ooset[i].sd, &ps->wrset);
+                FD_CLR(ps->ooset[i].sd, &ps->exset);
+                for (++i; i < used; i++) {
+                    if (ps->ooset[i].revents != 0) {
+                        /* Skip signaled descriptor */
+                        (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
+                        ps->used--;
+                    }
+                    else {
+                        ps->ooset[dest] = ps->ooset[i];
+                        dest++;
+                    }
+                }
+            }
+        }
+    }
+    AcrAtomic32Set(&ps->state, 0);
+    if (!AcrAtomic32Equ(&ps->waiters, 0))
+        SetEvent(ps->wakeup);
+    LeaveCriticalSection(&ps->mutex);
+    return rv;
+}
+



Mime
View raw message