commons-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mt...@apache.org
Subject svn commit: r1147948 - in /commons/sandbox/runtime/trunk/src/main/native: Makefile.unx.in os/darwin/kqueue.c os/unix/arch_defs.h
Date Mon, 18 Jul 2011 16:15:06 GMT
Author: mturk
Date: Mon Jul 18 16:15:05 2011
New Revision: 1147948

URL: http://svn.apache.org/viewvc?rev=1147948&view=rev
Log:
Add mac/bsd kqueue selector. Needs some optimization and fixes

Added:
    commons/sandbox/runtime/trunk/src/main/native/os/darwin/kqueue.c   (with props)
Modified:
    commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in
    commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h

Modified: commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in?rev=1147948&r1=1147947&r2=1147948&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in (original)
+++ commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in Mon Jul 18 16:15:05 2011
@@ -80,8 +80,10 @@ UNIX_SOURCES=\
 	$(TOPDIR)/os/unix/util.c
 
 BSDX_SOURCES=\
+	$(TOPDIR)/os/darwin/kqueue.c \
 	$(TOPDIR)/os/bsdx/os.c
 DARWIN_SOURCES=\
+	$(TOPDIR)/os/darwin/kqueue.c \
 	$(TOPDIR)/os/darwin/os.c
 HPUX_SOURCES=\
 	$(TOPDIR)/os/hpux/os.c

Added: commons/sandbox/runtime/trunk/src/main/native/os/darwin/kqueue.c
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/darwin/kqueue.c?rev=1147948&view=auto
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/darwin/kqueue.c (added)
+++ commons/sandbox/runtime/trunk/src/main/native/os/darwin/kqueue.c Mon Jul 18 16:15:05 2011
@@ -0,0 +1,636 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "acr/clazz.h"
+#include "acr/memory.h"
+#include "acr/jniapi.h"
+#include "acr/port.h"
+#include "acr/time.h"
+#include "acr/iodefs.h"
+#include "acr/netapi.h"
+#include "acr/ring.h"
+#include "acr/misc.h"
+#include "arch_opts.h"
+#include "arch_sync.h"
+#include <poll.h>
+#include <sys/event.h>
+
+/* pollset operation states */
+#define PSS_DESTROY     1
+#define PSS_POLL        2
+#define PSS_WAIT        3
+#define PSS_WAKEUP      4
+/* Max events for a single run
+ * Choosen to reflect the 4k page size
+ * PSS_SIZE * sizeof(struct kevent) ~= 4096
+ */
+#define PSS_SIZE        256
+
+typedef struct pfd_elem_t pfd_elem_t;
+struct pfd_elem_t {
+    ACR_RING_ENTRY(pfd_elem_t) link;
+    int            fd;
+    short          efilter;
+    short          revents;
+    jobject        obj;
+    acr_time_t     ttl;
+    acr_time_t     exp;
+};
+
+typedef struct acr_pollset_t {
+    /* A ring containing all of the pfd_elem_t that are active
+     */
+    ACR_RING_HEAD(pfd_eset_ring_t, pfd_elem_t) eset_ring;
+    /* A ring of pfd_elem_t that have been used, and then deleted
+     */
+    ACR_RING_HEAD(pfd_free_ring_t, pfd_elem_t) free_ring;
+    /* A ring of pfd_elem_t where rings that have been deleted but
+     * might still be inside a _epoll()
+     */
+    ACR_RING_HEAD(pfd_dead_ring_t, pfd_elem_t) dead_ring;
+    struct kevent               kev;
+    struct kevent              *keset;
+    int                         kqfd;
+    int                         used;
+    volatile acr_atomic32_t     state;
+    int                         wpipe[2];
+    pthread_mutex_t             mutex;
+    pthread_cond_t              wakeup;
+} acr_pollset_t;
+
+J_DECLARE_CLAZZ = {
+    INVALID_FIELD_OFFSET,
+    0,
+    0,
+    0,
+    ACR_NET_CP "UnixSelector"
+};
+
+J_DECLARE_M_ID(0000) = {
+    0,
+    "addSelected",
+    "(L" ACR_NET_CP "SelectionKeyImpl;S)V"
+};
+
+static short iefilter(short event)
+{
+    if (event & ACR_OP_INP)
+        return EVFILT_READ;
+    else
+        return EVFILT_WRITE;
+}
+
+static short reventt(short event, short flag)
+{
+    short rv = 0;
+
+    if (event == EVFILT_READ)
+        rv |= ACR_OP_INP;
+    else if (event == EVFILT_WRITE)
+        rv |= ACR_OP_OUT;
+    if (flags & EV_EOF)
+        rv |= ACR_OP_HANGUP;
+    /*
+     * TODO: See if EV_ERROR + certain system errors in the returned data field
+     * should map to ACR_OP_NVAL.
+     */
+    return rv;
+}
+
+ACR_NET_EXPORT(void, UnixSelector, init0)(JNI_STDARGS)
+{
+    _clazzn.i = (jclass)(*env)->NewGlobalRef(env, obj);
+    if (_clazzn.i == 0)
+        return;
+    V_LOAD_METHOD(0000);
+    _clazzn.u = 1;
+}
+
+ACR_NET_EXPORT(jlong, UnixSelector, create0)(JNI_STDARGS)
+{
+    int rc;
+    acr_pollset_t *ps;
+    pfd_elem_t    *pe = 0;
+
+    ps = ACR_TALLOC(acr_pollset_t);
+    if (ps == 0)
+        return 0;
+    ps->wpipe[0] = -1;
+    ps->wpipe[1] = -1;
+    ps->kqfd     = -1;
+    ps->used     = 1;
+    /* Create the result epoll set.
+     */
+    ps->keset    = ACR_MALLOC(struct kevent, PSS_SIZE);
+    if (ps->keset == 0)
+        return 0;
+    ACR_RING_INIT(&ps->eset_ring, pfd_elem_t, link);
+    ACR_RING_INIT(&ps->free_ring, pfd_elem_t, link);
+    ACR_RING_INIT(&ps->dead_ring, pfd_elem_t, link);
+
+    if ((rc = AcrSocketPair(ps->wpipe, 0)) != 0) {
+        ACR_THROW_NET_ERROR(rc);
+        goto cleanup;
+    }
+    ps->kqfd = kqueue();
+    if (ps->kqfd == -1) {
+        ACR_THROW_NET_ERRNO();
+        goto cleanup;
+    }
+    if ((rc = AcrCloseOnExec(ps->kqfd, 1)) != 0) {
+        ACR_THROW_NET_ERROR(rc);
+        goto cleanup;
+    }
+    pe = ACR_TALLOC(pfd_elem_t);
+    if (pe == 0) {
+        goto cleanup;
+    }
+    ACR_RING_ELEM_INIT(pe, link);
+
+    /* Add the wakeup pipe to the pset
+     */
+    pe->fd      = ps->wpipe[0];
+    pe->obj     = 0;
+    pe->ttl     = ACR_INFINITE;
+    pe->exp     = ACR_INFINITE;
+    pe->efilter = EVFILT_READ;
+    pe->revents = 0;
+
+    ACR_RING_INSERT_TAIL(&ps->eset_ring, pe, pfd_elem_t, link);
+    EV_SET(&ps->kev, pe->fd, pe->efilter, EV_ADD, 0, 0, pe);
+
+    if (kevent(ps->kqfd, &ps->kev, 1, 0, 0, 0) == -1) {
+        /* Failed adding pipe to the pollset
+         */
+        ACR_THROW_NET_ERRNO();
+        goto cleanup;
+    }
+    if (pthread_cond_init(&ps->wakeup, 0) != 0) {
+        ACR_THROW_NET_ERRNO();
+        goto cleanup;
+    }
+    if (pthread_mutex_init(&ps->mutex, 0) != 0) {
+        ACR_THROW_NET_ERRNO();
+        pthread_cond_destroy(&ps->wakeup);
+        goto cleanup;
+    }
+    return P2J(ps);
+
+cleanup:
+    r_close(ps->wpipe[0]);
+    r_close(ps->wpipe[1]);
+    r_close(ps->kqfd);
+    AcrFree(ps->keset);
+    AcrFree(ps);
+    AcrFree(pe);
+    return 0;
+}
+
+ACR_NET_EXPORT(void, UnixSelector, clr0)(JNI_STDARGS, jlong pollset,
+                                         jobject sset)
+{
+    pfd_elem_t *np, *pe;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    pthread_mutex_lock(&ps->mutex);
+    while (!AcrAtomic32Equ(&ps->state, 0)) {
+        if (AcrAtomic32Equ(&ps->state, PSS_DESTROY)) {
+            /* Interrupted by destroy0 */
+            pthread_mutex_unlock(&ps->mutex);
+            return;
+        }
+        if (AcrAtomic32Equ(&ps->state, PSS_POLL)) {
+            char ch = 1;
+            AcrAtomic32Set(&ps->state, PSS_WAKEUP);
+            r_write(ps->wpipe[1], &ch, 1);
+        }
+        /* Wait until the wait0 call breaks.
+         * Since we set the state to DESTROY
+         * wait0 will return 0.
+         */
+        if (pthread_cond_wait(&ps->wakeup, &ps->mutex) != 0) {
+            pthread_mutex_unlock(&ps->mutex);
+            ACR_THROW(ACR_EX_EILLEGAL, 0);
+            return;
+        }
+    }
+    if (ps->used == 1) {
+        pthread_mutex_unlock(&ps->mutex);
+        return;
+    }
+    /* Make sure we have enough storage */
+    AcrArrayListEnsureCapacity(env, sset, ps->used - 1);    
+    ACR_RING_FOREACH_SAFE(pe, np, &ps->eset_ring, pfd_elem_t, link) {
+        if (ps->wpipe[0] != pe->fd) {
+            EV_SET(&ps->kev, pe->fd, pe->efilter, EV_DELETE, 0, 0, 0);
+            kevent(ps->kqfd, &ps->kev, 1, 0, 0, 0);
+            CALL_VMETHOD2(0000, obj, pe->obj, 0);
+            /* Unref the container. */
+            (*env)->DeleteGlobalRef(env, pe->obj);
+            ACR_RING_REMOVE(pe, link);
+            ACR_RING_INSERT_TAIL(&ps->free_ring, pe, pfd_elem_t, link);
+        }
+    }
+    
+    ps->used = 1;
+    pthread_mutex_unlock(&ps->mutex);
+}
+
+ACR_NET_EXPORT(void, UnixSelector, wakeup0)(JNI_STDARGS, jlong pollset)
+{
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    pthread_mutex_lock(&ps->mutex);
+    if (AcrAtomic32Equ(&ps->state, PSS_POLL)) {
+        char ch   = 1;
+        AcrAtomic32Set(&ps->state, PSS_WAKEUP);
+        r_write(ps->wpipe[1], &ch, 1);
+    }
+    pthread_mutex_unlock(&ps->mutex);
+}
+
+ACR_NET_EXPORT(jint, UnixSelector, size0)(JNI_STDARGS, jlong pollset)
+{
+    int rv;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    pthread_mutex_lock(&ps->mutex);
+    rv = ps->used - 1;
+    pthread_mutex_unlock(&ps->mutex);
+    return rv;
+}
+
+ACR_NET_EXPORT(jint, UnixSelector, add0)(JNI_STDARGS, jlong pollset, jobject fo,
+                                          jlong fp, jint events, jint ttlms)
+{
+    int rc = 0;
+    pfd_elem_t    *pe;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+    acr_sd_t *fd      = J2P(fp, acr_sd_t *);
+
+    pthread_mutex_lock(&ps->mutex);
+    if (AcrAtomic32Equ(&ps->state, PSS_DESTROY)) {
+        /* Already destroyed */
+        goto cleanup;
+    }
+    ACR_RING_FOREACH(pe, &ps->eset_ring, pfd_elem_t, link) {
+        if (fd->s == pe->fd) {
+            /* Duplicate descriptor
+             */
+            rc = ACR_EALREADY;
+            goto cleanup;
+        }
+    }
+    
+    if (!ACR_RING_EMPTY(&ps->free_ring, pfd_elem_t, link)) {
+        pe = ACR_RING_FIRST(&ps->free_ring);
+        ACR_RING_REMOVE(pe, link);
+    }
+    else {
+        pe = ACR_TALLOC(pfd_elem_t);
+        if (pe == 0) {
+            rc = ACR_ENOMEM;
+            goto cleanup;
+        }
+        ACR_RING_ELEM_INIT(pe, link);
+    }
+    
+    pe->fd      = fd->s;
+    pe->efilter = iefilter(events);
+    pe->revents = 0;
+    pe->obj     = (*env)->NewGlobalRef(env, fo);
+    if (pe->obj == 0) {
+        /* In case the NewGlobalRef fails,
+         * OutOfMemoryError should be thrown already by the JVM.
+         */
+        ACR_RING_INSERT_TAIL(&ps->free_ring, pe, pfd_elem_t, link);
+        rc = ACR_ENOMEM;
+        goto cleanup;
+    }
+    if (ttlms > 0) {
+        pe->ttl = AcrTimeFromMsec(ttlms);
+        pe->exp = AcrTimeNow() + pe->ttl;
+    }
+    else {
+        pe->ttl = ACR_INFINITE;
+        pe->exp = ACR_INFINITE;
+    }
+    EV_SET(&ps->kev, pe->fd, pe->efilter, EV_ADD, 0, 0, pe);
+    if (kevent(ps->kqfd, &ps->kev, 1, 0, 0, 0) == 0) {
+        ps->used++;
+        ACR_RING_INSERT_TAIL(&ps->eset_ring, pe, pfd_elem_t, link);
+    }
+    else {
+        rc = ACR_GET_OS_ERROR();
+        if (ACR_STATUS_IS_ENOSPC(rc))
+            rc = ACR_EOVERFLOW;
+        ACR_RING_INSERT_TAIL(&ps->free_ring, pe, pfd_elem_t, link);
+    }
+cleanup:
+    pthread_mutex_unlock(&ps->mutex);
+    return rc;
+}
+
+ACR_NET_EXPORT(jint, UnixSelector, del0)(JNI_STDARGS, jlong pollset,
+                                          jobject fo, jlong fp)
+{
+    int rc = ACR_EOF;
+    pfd_elem_t    *pe = 0;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+    acr_sd_t *fd      = J2P(fp, acr_sd_t *);
+
+    pthread_mutex_lock(&ps->mutex);
+    if (AcrAtomic32Equ(&ps->state, PSS_DESTROY) || ps->used < 2) {
+        /* Already destroyed */
+        goto cleanup;
+    }
+    /* We don't care about the kevent errors.
+     * They usually mean the fd was not registered with this
+     * kevent instance or already closed.
+     */
+    EV_SET(&ps->kev, pe->fd, pe->efilter, EV_DELETE, 0, 0, 0);
+    kevent(ps->kqfd, &ps->kev, 1, 0, 0, 0);
+    ACR_RING_FOREACH(pe, &ps->eset_ring, pfd_elem_t, link) {
+        if (fd->s == pe->fd) {
+            /* Unref descriptor */
+            (*env)->DeleteGlobalRef(env, pe->obj);
+            ACR_RING_REMOVE(pe, link);
+            ACR_RING_INSERT_TAIL(&ps->dead_ring, pe, pfd_elem_t, link);
+            ps->used--;
+            rc = 0;
+            break;
+        }
+    }
+
+cleanup:
+    pthread_mutex_unlock(&ps->mutex);
+    return rc;
+}
+
+ACR_NET_EXPORT(int, UnixSelector, destroy0)(JNI_STDARGS, jlong pollset)
+{
+    int rc = 0;
+    pfd_elem_t *np, *pe = 0;
+    acr_pollset_t   *ps = J2P(pollset, acr_pollset_t *);
+
+    pthread_mutex_lock(&ps->mutex);
+    if (!AcrAtomic32Equ(&ps->state, 0)) {
+        int  state = AcrAtomic32Set(&ps->state, PSS_DESTROY);
+        if (state == PSS_POLL) {
+            char ch   = 1;
+            r_write(ps->wpipe[1], &ch, 1);
+        }
+        /* Wait until the wait0 call breaks.
+         * Since we set the state to DESTROY
+         * wait0 will return 0.
+         */
+        if ((rc = pthread_cond_wait(&ps->wakeup, &ps->mutex)) != 0) {
+            pthread_mutex_unlock(&ps->mutex);
+            return rc;
+        }
+    }
+    AcrAtomic32Set(&ps->state, PSS_DESTROY);
+    ps->used = 0;
+    pthread_mutex_unlock(&ps->mutex);
+    r_close(ps->wpipe[0]);
+    r_close(ps->wpipe[1]);
+    r_close(ps->kqfd);
+    pthread_cond_destroy(&ps->wakeup);
+    pthread_mutex_destroy(&ps->mutex);
+    ACR_RING_FOREACH_SAFE(pe, np, &ps->eset_ring, pfd_elem_t, link) {
+        if (pe->obj != 0) {
+            AcrSelectionKeyReset(env, pe->obj);
+            /* Unref descriptor */
+            (*env)->DeleteGlobalRef(env, pe->obj);
+        }
+        ACR_RING_REMOVE(pe, link);
+        AcrFree(pe);
+    }
+    ACR_RING_FOREACH_SAFE(pe, np, &ps->free_ring, pfd_elem_t, link) {
+        ACR_RING_REMOVE(pe, link);
+        AcrFree(pe);
+    }
+    ACR_RING_FOREACH_SAFE(pe, np, &ps->dead_ring, pfd_elem_t, link) {
+        ACR_RING_REMOVE(pe, link);
+        AcrFree(pe);
+    }
+    AcrFree(ps->keset);
+    AcrFree(ps);
+    return rc;
+}
+
+static int call_port_getn(int port, struct kevent list[],
+                          unsigned int max, unsigned int *nget,
+                          int timeout)
+{
+    struct timespec tv, *tp = 0;
+    int ret;
+    int rv = 0;
+
+    if (timeout >= 0) {
+        tp = &tv;
+        tp->tv_sec  = (long)(timeout / 1000);
+        tp->tv_nsec = (long)(timeout % 1000) * 1000000;
+    }
+
+    list[0].portev_user = (void *)-1; /* so we can double check that an
+                                       * event was returned
+                                       */
+    ret = port_getn(port, list, max, nget, tp);
+    /* Note: 32-bit port_getn() on Solaris 10 x86 returns large negative 
+     * values instead of 0 when returning immediately.
+     */
+    if (ret == -1) {
+        rv = ACR_GET_NETOS_ERROR();
+        switch(rv) {
+            case EINTR:
+            case ETIME:
+                if (*nget > 0 && list[0].portev_user != (void *)-1) {
+                    /* This confusing API can return an event at the same time
+                     * that it reports EINTR or ETIME.  If that occurs, just
+                     * report the event.  With EINTR, nget can be > 0 without
+                     * any event, so check that portev_user was filled in.
+                     *
+                     * (Maybe it will be simplified; see thread
+                     *   http://mail.opensolaris.org
+                     *   /pipermail/networking-discuss/2009-August/011979.html
+                     *  This code will still work afterwards.)
+                     */
+                    rv = 0;
+                    break;
+                }
+                if (rv == ETIME)
+                    rv = ACR_TIMEUP;
+            /* fall-through */
+            default:
+                *nget = 0;
+            break;
+        }
+    }
+    else if (*nget == 0)
+        rv = ACR_TIMEUP;
+    return rv;
+}
+
+ACR_NET_EXPORT(void, UnixSelector, wait0)(JNI_STDARGS, jlong pollset,
+                                          jobject sset,
+                                          jint timeout, jboolean autocancel)
+{
+    int i, rc = 0;
+    unsigned int ns = 1;
+    pfd_elem_t *np, *pe = 0;
+    acr_time_t now = 0;
+    struct timespec  tv;
+    struct timespec *tp = 0;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    pthread_mutex_lock(&ps->mutex);
+    if (!AcrAtomic32Equ(&ps->state, 0)) {
+        /* Note that this should never happen if api is correctly used.
+         * wait cannot be run from multiple threads and cannot be run
+         * after destroy.
+         */
+        pthread_mutex_unlock(&ps->mutex);
+        ACR_THROW(ACR_EX_EILLEGAL, 0);
+        return;
+    }
+    if (ps->used == 1) {
+        /* We only have the wakeup pipe in the pollset
+         * so there is no point to wait.
+         */
+        pthread_mutex_unlock(&ps->mutex);
+        return;
+    }
+
+    AcrAtomic32Set(&ps->state, PSS_POLL);
+    pthread_mutex_unlock(&ps->mutex);
+
+    if (timeout >= 0) {
+        tp = &tv;
+        tp->tv_sec  = (long)(timeout / 1000);
+        tp->tv_nsec = (long)(timeout % 1000) * 1000000;
+    }
+    ns = kevent(ps->kqfd, 0, 0, ps->keset, PSS_SIZE, tp);
+    pthread_mutex_lock(&ps->mutex);
+    if (AcrAtomic32Equ(&ps->state, PSS_DESTROY)) {
+        /* Interrupted by destroy0 */
+        pthread_cond_broadcast(&ps->wakeup);
+        pthread_mutex_unlock(&ps->mutex);
+        return;
+    }
+    if (ns < 0) {
+        /* Error during poll */
+        AcrAtomic32Set(&ps->state, 0);
+        pthread_cond_broadcast(&ps->wakeup);
+        pthread_mutex_unlock(&ps->mutex);
+        ACR_THROW_NET_ERROR(rc);
+        return;
+    }
+    if (ns == 0) {
+        /* Timeout occured */
+        AcrAtomic32Set(&ps->state, PSS_WAIT);
+        goto cleanup;
+    }
+    if (AcrAtomic32Equ(&ps->state, PSS_WAKEUP)) {
+        /* Interrupted by wakeup0.
+         * Only drain the wakeup pipe without returning any descriptors.
+         */
+        AcrDrainPipe(ps->wpipe[0]);
+        AcrAtomic32Set(&ps->state, 0);
+        pthread_cond_broadcast(&ps->wakeup);
+        pthread_mutex_unlock(&ps->mutex);
+        return;
+    }
+    AcrAtomic32Set(&ps->state, PSS_WAIT);
+    while (ns > 0) {
+        AcrArrayListEnsureCapacity(env, sset, ns);
+        /* Cycle trough the descriptors */
+        for (i = 0; i < ns; i++) {
+            pe = (pfd_elem_t*)ps->keset[i].udata;
+            if (ps->keset[i].filter != 0) {
+                if (pe->fd == ps->wpipe[0]) {
+                    /* Drain the wakeup pipe.
+                    * Wakeup pipe is always at index zero.
+                    */
+                    AcrDrainPipe(ps->wpipe[0]);
+                }
+                else {
+                    pe->revents = reventt(ps->keset[i].filter, ps->keset[i].flags);
+                    CALL_VMETHOD2(0000, obj, pe->obj, pe->revents);
+                    if (autocancel == JNI_TRUE) {
+                        EV_SET(&ps->kev, pe->fd, pe->efilter, EV_DELETE, 0,
0, 0);
+                        kevent(ps->kqfd, &ps->kev, 1, 0, 0, 0);
+                        ps->used--;
+                        /* Unref descriptor */
+                        (*env)->DeleteGlobalRef(env, pe->obj);
+                        ACR_RING_REMOVE(pe, link);
+                        ACR_RING_INSERT_TAIL(&ps->free_ring, pe, pfd_elem_t, link);
+                    }
+                    else if (pe->ttl > 0) {
+                        /* Reset TTL
+                        */
+                        if (now == 0)
+                            now = AcrTimeNow();
+                        pe->exp = now + pe->ttl;
+                    }
+                }
+            }
+        }
+        if (i == PSS_SIZE) {
+            /* Maximum number of descriptors selected.
+             * Try another wait with 0 timeout which should
+             * return immediately the signaled descriptors if any
+             */
+            tp = &tv;
+            tp->tv_sec  = 0;
+            tp->tv_nsec = 0;            
+            ns = kevent(ps->kqfd, 0, 0, ps->keset, PSS_SIZE, tp);
+        }
+        else {
+            /* No more chunks */
+            break;
+        }
+    }
+
+cleanup:
+    /* Remove expired descriptors */
+    ACR_RING_FOREACH_SAFE(pe, np, &ps->eset_ring, pfd_elem_t, link) {
+        if (pe->ttl > 0 && pe->revents == 0) {
+            if (now == 0)
+                now = AcrTimeNow();
+            if (now > pe->exp) {
+                /* Expired descriptor */
+                pe->revents = ACR_OP_TIMEOUT;
+                CALL_VMETHOD2(0000, obj, pe->obj, pe->revents);
+                if (autocancel == JNI_TRUE) {
+                    /* Unref descriptor */
+                    EV_SET(&ps->kev, pe->fd, pe->efilter, EV_DELETE, 0, 0, 0);
+                    kevent(ps->kqfd, &ps->kev, 1, 0, 0, 0);
+                    (*env)->DeleteGlobalRef(env, pe->obj);
+                    ACR_RING_REMOVE(pe, link);
+                    ACR_RING_INSERT_TAIL(&ps->free_ring, pe, pfd_elem_t, link);
+                    ps->used--;
+                }
+            }
+        }
+    }    
+    /* Shift all PFDs in the Dead Ring to the Free Ring */
+    ACR_RING_CONCAT(&ps->free_ring, &ps->dead_ring, pfd_elem_t, link);
+    AcrAtomic32Set(&ps->state, 0);
+    pthread_cond_broadcast(&ps->wakeup);
+    pthread_mutex_unlock(&ps->mutex);
+}
+

Propchange: commons/sandbox/runtime/trunk/src/main/native/os/darwin/kqueue.c
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h?rev=1147948&r1=1147947&r2=1147948&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h Mon Jul 18 16:15:05
2011
@@ -137,7 +137,9 @@ typedef struct stat         struct_stat_
 
 #endif /* F_DUPFD */
 
-#if defined(LINUX) || defined(SOLARIS)
+/* The following check is basically for everything
+ */
+#if defined(LINUX) || defined(SOLARIS) || defined(BSDX) || defined(DARWIN)
 # define PS_DEFAULT_TYPE        ACR_PS_TYPE_UNIX
 #else
 # define PS_DEFAULT_TYPE        ACR_PS_TYPE_POLL



Mime
View raw message