commons-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mt...@apache.org
Subject svn commit: r1129273 - in /commons/sandbox/runtime/trunk/src/main: java/org/apache/commons/runtime/net/ java/org/apache/commons/runtime/platform/unix/ native/ native/os/unix/ test/org/apache/commons/runtime/
Date Mon, 30 May 2011 17:49:33 GMT
Author: mturk
Date: Mon May 30 17:49:32 2011
New Revision: 1129273

URL: http://svn.apache.org/viewvc?rev=1129273&view=rev
Log:
Simplify selector class layout

Added:
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/PosixSelector.java   (with props)
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKeyImpl.java   (with props)
    commons/sandbox/runtime/trunk/src/main/native/os/unix/selector.c   (with props)
Removed:
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/LocalSocketSelectorFactory.java
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKeyFactory.java
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SocketSelectorFactory.java
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/platform/unix/SelectionKeyImpl.java
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/platform/unix/SocketSelectorImpl.java
Modified:
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/AbstractSocketDescriptor.java
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Endpoint.java
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKey.java
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Selector.java
    commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in
    commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h
    commons/sandbox/runtime/trunk/src/main/native/os/unix/poll.c
    commons/sandbox/runtime/trunk/src/main/test/org/apache/commons/runtime/TestSelectionKey.java

Modified: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/AbstractSocketDescriptor.java
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/AbstractSocketDescriptor.java?rev=1129273&r1=1129272&r2=1129273&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/AbstractSocketDescriptor.java (original)
+++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/AbstractSocketDescriptor.java Mon May 30 17:49:32 2011
@@ -32,8 +32,15 @@ import org.apache.commons.runtime.io.Des
 abstract class AbstractSocketDescriptor extends Descriptor
 {
 
+    /**
+     * The socket blocking mode.
+     */
     protected boolean blocking  = false;
-    
+
+    public volatile boolean selecting = false;
+    /**
+     * Create new object instance.
+     */
     protected AbstractSocketDescriptor()
     {
     }

Modified: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Endpoint.java
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Endpoint.java?rev=1129273&r1=1129272&r2=1129273&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Endpoint.java (original)
+++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Endpoint.java Mon May 30 17:49:32 2011
@@ -32,7 +32,7 @@ import org.apache.commons.runtime.io.Des
 public abstract class Endpoint implements Closeable
 {
     /**
-     * Selection key.
+     * This endpoint's selection key.
      */
     protected SelectionKey      key;
 
@@ -84,7 +84,7 @@ public abstract class Endpoint implement
         if (selector == null)
             throw new NullPointerException(Local.sm.get("selector.NULL"));
         if (key == null)
-            key = SelectionKeyFactory.newInstance(selector, this);
+            key = new SelectionKeyImpl(selector, this);
         if (key.selector() != selector)
             throw new IllegalSelectorException();
         return key;

Added: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/PosixSelector.java
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/PosixSelector.java?rev=1129273&view=auto
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/PosixSelector.java (added)
+++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/PosixSelector.java Mon May 30 17:49:32 2011
@@ -0,0 +1,159 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.runtime.net;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import org.apache.commons.runtime.io.ClosedDescriptorException;
+import org.apache.commons.runtime.io.Descriptor;
+import org.apache.commons.runtime.AlreadyExistsException;
+import org.apache.commons.runtime.InvalidArgumentException;
+import org.apache.commons.runtime.NoSuchObjectException;
+import org.apache.commons.runtime.OperationNotImplementedException;
+import org.apache.commons.runtime.OverflowException;
+import org.apache.commons.runtime.SystemException;
+import org.apache.commons.runtime.Errno;
+import org.apache.commons.runtime.Status;
+
+/**
+ * Socket Selector implementation class.
+ * <p>
+ * </p>
+ *
+ * @since Runtime 1.0
+ */
+final class PosixSelector extends Selector
+{
+
+    private short[]             revents;
+    private SelectionKeyImpl[]  keyset;
+    private long                pollset;
+
+    private static native long  create0(int size)
+        throws OutOfMemoryError,
+               SystemException;
+    private static native void  wakeup0(long pollset);
+    private static native int   destroy0(long pollset);
+    private static native int   size0(long pollset);
+    private static native int   add0(long pollset, SelectionKeyImpl key, int fd, int events, int ttl);
+    private static native int   del0(long pollset, SelectionKeyImpl key, int fd);
+    private static native int   clear0(long pollset, SelectionKeyImpl[] set);
+    private static native int   wait0(long pollset, SelectionKeyImpl[] set, short[] events,
+                                      int timeout, boolean autocancel);
+
+    /*
+     * Created from native
+     */
+    public PosixSelector(int size)
+    {
+        super(size);
+        pollset = create0(size);
+        revents = new short[size];
+        keyset  = new SelectionKeyImpl[size];
+    }
+
+    private void ensureValid()
+        throws ClosedSelectorException
+    {
+        if (pollset == 0L)
+            throw new ClosedSelectorException();
+    }
+
+    @Override
+    public int size()
+        throws ClosedSelectorException
+    {
+        ensureValid();
+        return size0(pollset);
+    }
+
+    @Override
+    public void interrupt()
+        throws ClosedSelectorException
+    {
+        ensureValid();
+        wakeup0(pollset);
+    }
+
+    @Override
+    public boolean register(SelectionKey key, int ops)
+        throws ClosedSelectorException,
+               IllegalSelectorException,
+               ClosedDescriptorException,
+               OverflowException,
+               IOException
+    {
+        ensureValid();
+        SelectionKeyImpl skey = (SelectionKeyImpl)key;
+        if (skey.selector() != this)
+            throw new IllegalSelectorException();
+        Descriptor sd = skey.endpoint().descriptor();
+        int fd  = sd.fd();
+        if (fd == -1)
+            throw new ClosedDescriptorException();
+        int rc = add0(pollset, skey, fd, ops, skey.timeout());
+        if (rc != 0) {
+            if (rc == Errno.EALREADY)
+                return false;
+            if (rc == Errno.EOVERFLOW)
+                throw new OverflowException();
+            else
+                throw new IOException(Status.describe(rc));
+        }
+        return true;
+    }
+
+    @Override
+    protected void cancel(SelectionKey key)
+        throws ClosedSelectorException,
+               IllegalSelectorException
+    {
+        ensureValid();
+        if (key.selector() != this)
+            throw new IllegalSelectorException();
+        SelectionKeyImpl skey = (SelectionKeyImpl)key;
+        Descriptor sd = skey.endpoint().descriptor();
+        int fd  = sd.fd();
+        if (fd != -1) {
+            // Remove selection key
+            del0(pollset, skey, fd);
+            skey.revents = 0;
+        }
+    }
+
+    @Override
+    public boolean isOpen()
+    {
+        return pollset != 0L;
+    }
+
+    @Override
+    public void close()
+        throws IOException
+    {
+        if (pollset != 0L) {
+            long ps = pollset;
+            pollset = 0L;
+            int rc  = destroy0(ps);
+            if (rc != 0)
+                throw new IOException(Status.describe(rc));
+        }
+    }
+
+}
+

Propchange: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/PosixSelector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKey.java
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKey.java?rev=1129273&r1=1129272&r2=1129273&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKey.java (original)
+++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKey.java Mon May 30 17:49:32 2011
@@ -26,8 +26,8 @@ import org.apache.commons.runtime.net.En
 public abstract class SelectionKey
 {
 
-    private Object          attachment;
     private Selector        selector;
+    private Object          attachment;
     private Endpoint        endpoint;
     private int             timeout;
 
@@ -134,4 +134,36 @@ public abstract class SelectionKey
         return prevtime;
     }
 
+    /**
+     * Register this selection key with the given selector.
+     *
+     * @throws ClosedSelectorException if this selector is closed.
+     * @throws IllegalSelectorException if this key was not created by the same
+     *          selector as the given selector.
+     */
+    public SelectionKey register(Selector selector)
+        throws IllegalSelectorException
+    {
+        if (this.selector == null || selector == null)
+            this.selector  = selector;
+        if (this.selector != selector)
+            throw new IllegalSelectorException();
+        if (this.selector == null) {
+            AbstractSocketDescriptor sd = (AbstractSocketDescriptor)endpoint.descriptor();
+            sd.selecting = false;
+        }
+        return this;
+    }
+    /**
+     * Unregister this selection key from its selector.
+     *
+     * @throws ClosedSelectorException if this selector is closed.
+     */
+    public void cancel()
+        throws ClosedSelectorException
+    {
+        selector.cancel(this);
+    }
+
+    
 }

Added: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKeyImpl.java
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKeyImpl.java?rev=1129273&view=auto
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKeyImpl.java (added)
+++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKeyImpl.java Mon May 30 17:49:32 2011
@@ -0,0 +1,69 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.runtime.net;
+
+import org.apache.commons.runtime.AlreadyExistsException;
+import org.apache.commons.runtime.InvalidArgumentException;
+import org.apache.commons.runtime.NoSuchObjectException;
+import org.apache.commons.runtime.OperationNotImplementedException;
+import org.apache.commons.runtime.SystemException;
+import org.apache.commons.runtime.Errno;
+
+/**
+ * Selector implementation class.
+ * <p>
+ * </p>
+ *
+ * @since Runtime 1.0
+ */
+final class SelectionKeyImpl extends SelectionKey
+{
+
+    public  int             ievents;
+    public  int             revents;
+
+    public SelectionKeyImpl(Selector selector, Endpoint endpoint,
+                            int ievents)
+    {
+        super(selector, endpoint);
+        this.ievents    = ievents;
+        this.revents    = 0;
+    }
+
+    public SelectionKeyImpl(Selector selector, Endpoint endpoint)
+    {
+        this(selector, endpoint, 0);
+    }
+
+    @Override
+    public int events()
+    {
+        return revents;
+    }
+
+    @Override
+    public SelectionKey events(int set)
+    {
+        ievents = set;
+        return this;
+    }
+
+    public void     destroy()
+    {
+        register(null);
+    }
+
+}

Propchange: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKeyImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Selector.java
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Selector.java?rev=1129273&r1=1129272&r2=1129273&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Selector.java (original)
+++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Selector.java Mon May 30 17:49:32 2011
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import org.apache.commons.runtime.io.ClosedDescriptorException;
 import org.apache.commons.runtime.io.InvalidDescriptorException;
+import org.apache.commons.runtime.InvalidRangeException;
 import org.apache.commons.runtime.OverflowException;
 
 /**
@@ -29,13 +30,63 @@ import org.apache.commons.runtime.Overfl
  */
 public abstract class Selector implements Closeable
 {
-    private int         capacity;
+    private int                         capacity;
+    private static final int            type;
+    /**
+     * Maximum size of the selector limited by the operating system
+     */
+    public  static final int            MAX_CAPACITY;
+    private static native int           init0();
+    private static native int           init1();
+    static {
+        type         = init0();
+        MAX_CAPACITY = init1();
+    }
 
     /**
      * Indicates the current auto-cancel mode for this {@code Selector} object.
      */
     protected boolean   autoCancel;
 
+    /**
+     * Creates a new {@code Selector} instance with the given size.
+     *
+     * @return the new socket selector.
+     *
+     * @throws InvalidRangeException if {@code size} is outside the
+     *          valid range.
+     * @throws OutOfMemoryError if the memory allocation fails.
+     */
+    public static Selector newInstance(int size)
+        throws InvalidRangeException, OutOfMemoryError
+    {
+        if (size < 1 || size > MAX_CAPACITY)
+            throw new InvalidRangeException(Local.sm.get("selector.ERANGE"));
+        switch (type) {
+            case 0:
+                return new PosixSelector(size);
+            default:
+                throw new RuntimeException("Unknown Selector type");
+        }
+    }
+
+    /**
+     * Creates a new {@code Selector} instance.
+     *
+     * @return the new socket selector.
+     *
+     * @throws RuntimeException if the selector capacity cannot be
+     *          determined.
+     * @throws OutOfMemoryError if the memory allocation fails.
+     */
+    public static Selector newInstance()
+        throws RuntimeException, OutOfMemoryError
+    {
+        if (MAX_CAPACITY < 1)
+            throw new RuntimeException();
+        return newInstance(MAX_CAPACITY);
+    }
+
     private Selector()
     {
         // No instance
@@ -122,6 +173,13 @@ public abstract class Selector implement
         throws IOException;
 
     /**
+     * Tells whether or not this selector is open.
+     *
+     * @return {@code true} if, and only if, this selector is open
+     */
+    public abstract boolean isOpen();
+
+    /**
      * Sets this selector's auto-cancel mode to the given state.
      * If the selector is in auto-cancel mode, then the keys are
      * canceled when signaled.

Modified: commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in?rev=1129273&r1=1129272&r2=1129273&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in (original)
+++ commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in Mon May 30 17:49:32 2011
@@ -71,6 +71,7 @@ UNIX_SOURCES=\
 	$(TOPDIR)/os/unix/procmutex.c \
 	$(TOPDIR)/os/unix/shmem.c \
 	$(TOPDIR)/os/unix/selectkey.c \
+	$(TOPDIR)/os/unix/selector.c \
 	$(TOPDIR)/os/unix/semaphore.c \
 	$(TOPDIR)/os/unix/time.c \
 	$(TOPDIR)/os/unix/usock.c \

Modified: commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h?rev=1129273&r1=1129272&r2=1129273&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h Mon May 30 17:49:32 2011
@@ -134,10 +134,14 @@ typedef struct stat         struct_stat_
 
 #endif /* F_DUPFD */
 
+#define  PS_TYPE_POLL           0
+#define  PS_TYPE_EPOLL          1
+#define  PS_TYPE_DEVPOLL        2
+#define  PS_TYPE_KQUEUE         3
+
 #if 1
-# define POLLSET_USE_POLL       1
+# define PS_DEFAULT_TYPE        PS_TYPE_POLL
 #else
-# define POLLSET_USE_POLL       0
 #endif
 
 

Modified: commons/sandbox/runtime/trunk/src/main/native/os/unix/poll.c
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/unix/poll.c?rev=1129273&r1=1129272&r2=1129273&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/unix/poll.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/unix/poll.c Mon May 30 17:49:32 2011
@@ -27,46 +27,6 @@
 # include <sys/resource.h>
 #endif
 
-#if POLLSET_USE_POLL
-/* pollset operation states */
-#define PSS_DESTROY     1
-#define PSS_POLL        2
-#define PSS_WAIT        3
-#define PSS_WAKEUP      4
-
-typedef struct acr_pollfd_t {
-    jobject        obj;
-    acr_time_t     ttl;
-    acr_time_t     exp;
-} acr_pollfd_t;
-
-typedef struct acr_pollset_t {
-    struct pollfd    *fdset;
-    acr_pollfd_t     *ooset;
-    int               used;
-    int               size;
-    volatile int      state;
-    int               wpipe[2];
-    pthread_mutex_t   mutex;
-    pthread_cond_t    wakeup;
-} acr_pollset_t;
-
-J_DECLARE_CLAZZ = {
-    INVALID_FIELD_OFFSET,
-    0,
-    0,
-    0,
-    ACR_UNX_CP "SocketSelectorImpl"
-};
-
-J_DECLARE_M_ID(0000) = {
-    0,
-    "<init>",
-    "(I)V"
-};
-
-#endif
-
 static short ieventt(int event)
 {
     short rv = 0;
@@ -105,479 +65,6 @@ static short reventt(short event)
     return rv;
 }
 
-#if POLLSET_USE_POLL
-
-ACR_NET_EXPORT(jobject, SocketSelectorFactory, new0)(JNI_STDARGS, jint size)
-{
-    if (_clazzn.u == 1)
-        return (*env)->NewObject(env, _clazzn.i, J4MID(0000), size);
-    else
-        return 0;
-}
-
-ACR_NET_EXPORT(jobject, LocalSocketSelectorFactory, new0)(JNI_STDARGS, jint size)
-{
-    if (_clazzn.u == 1)
-        return (*env)->NewObject(env, _clazzn.i, J4MID(0000), size);
-    else
-        return 0;
-}
-
-static int maxopendesc(void)
-{
-    int nm = 65536;
-#if HAVE_SYS_RESOURCE_H
-    struct rlimit rl;
-
-    if (getrlimit(RLIMIT_NOFILE, &rl) == 0 && rl.rlim_max != RLIM_INFINITY)
-        nm = (int)rl.rlim_max;
-    else
-#endif
-        nm = (int)sysconf(_SC_OPEN_MAX);
-    if (nm > 1)
-        --nm;
-    else
-        nm = 1023;
-    return nm;
-}
-
-ACR_NET_EXPORT(jint, SocketSelectorFactory, init0)(JNI_STDARGS)
-{
-    if (_clazzn.u == 0) {
-        if (AcrLoadClass(env, &_clazzn, 0) == JNI_FALSE)
-            return 0;
-        R_LOAD_METHOD(0000, 0);
-        _clazzn.u = 1;
-    }
-    return maxopendesc();
-}
-
-ACR_NET_EXPORT(jint, LocalSocketSelectorFactory, init0)(JNI_STDARGS)
-{
-    if (_clazzn.u == 0) {
-        if (AcrLoadClass(env, &_clazzn, 0) == JNI_FALSE)
-            return 0;
-        R_LOAD_METHOD(0000, 0);
-        _clazzn.u = 1;
-    }
-    return maxopendesc();
-}
-
-ACR_UNX_EXPORT(jlong, SocketSelectorImpl, create0)(JNI_STDARGS, jint size)
-{
-    int rc;
-    acr_pollset_t *ps;
-
-    ps = ACR_TALLOC(acr_pollset_t);
-    if (ps == 0)
-        return 0;
-    ps->wpipe[0] = -1;
-    ps->wpipe[1] = -1;
-    ps->size     = size + 1;
-    ps->used     = 1;
-
-    ps->fdset    = ACR_MALLOC(struct pollfd, ps->size);
-    if (ps->fdset == 0)
-        return 0;
-    ps->ooset    = ACR_MALLOC(acr_pollfd_t,  ps->size);
-    if (ps->fdset == 0) {
-        AcrFree(ps->fdset);
-        return 0;
-    }
-    if ((rc = AcrPipePair(ps->wpipe, 0)) != 0) {
-        ACR_THROW_NET_ERROR(rc);
-        goto cleanup;
-    }
-    /* Add the wakeup pipe to the pset
-     */
-    ps->fdset[0].fd      = ps->wpipe[0];
-    ps->fdset[0].events  = POLLIN;
-    ps->fdset[0].revents = 0;
-    ps->ooset[0].obj     = 0;
-    ps->ooset[0].ttl     = ACR_INFINITE;
-    ps->ooset[0].exp     = ACR_INFINITE;
-
-    if (pthread_cond_init(&ps->wakeup, 0) != 0) {
-        ACR_THROW_NET_ERRNO();
-        goto cleanup;
-    }
-    if (pthread_mutex_init(&ps->mutex, 0) != 0) {
-        ACR_THROW_NET_ERRNO();
-        pthread_cond_destroy(&ps->wakeup);
-        goto cleanup;
-    }
-    return P2J(ps);
-
-cleanup:
-    AcrFree(ps->fdset);
-    AcrFree(ps->ooset);
-    AcrFree(ps);
-    return 0;
-}
-
-ACR_UNX_EXPORT(int, SocketSelectorImpl, destroy0)(JNI_STDARGS, jlong pollset)
-{
-    int i;
-    int rc = 0;
-    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
-
-    pthread_mutex_lock(&ps->mutex);
-    if (ps->state != 0) {
-        int  state = ps->state;
-        ps->state  = PSS_DESTROY;
-        if (state == PSS_POLL) {
-            char ch   = 1;
-            r_write(ps->wpipe[1], &ch, 1);
-        }
-        /* Wait until the wait0 call breaks.
-         * Since we set the state to DESTROY
-         * wait0 will return 0.
-         */
-        if ((rc = pthread_cond_wait(&ps->wakeup, &ps->mutex)) != 0) {
-            pthread_mutex_unlock(&ps->mutex);
-            return rc;
-        }
-    }
-    ps->state = PSS_DESTROY;
-    for (i = 1; i < ps->used; i++) {
-        /* Invalidate the container. */
-        (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
-    }
-    ps->used = 0;
-    pthread_mutex_unlock(&ps->mutex);
-    r_close(ps->wpipe[0]);
-    r_close(ps->wpipe[1]);
-    pthread_cond_destroy(&ps->wakeup);
-    pthread_mutex_destroy(&ps->mutex);
-    AcrFree(ps->fdset);
-    AcrFree(ps->ooset);
-    AcrFree(ps);
-    return rc;
-}
-
-ACR_UNX_EXPORT(jint, SocketSelectorImpl, clear0)(JNI_STDARGS, jlong pollset,
-                                                 jobjectArray rs)
-{
-    int i;
-    int cnt = 0;
-    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
-
-    pthread_mutex_lock(&ps->mutex);
-    while (ps->state != 0) {
-        if (ps->state == PSS_DESTROY) {
-            /* Interrupted by destroy0 */
-            pthread_mutex_unlock(&ps->mutex);
-            return 0;
-        }
-        if (ps->state == PSS_POLL) {
-            char ch   = 1;
-            ps->state = PSS_WAKEUP;
-            r_write(ps->wpipe[1], &ch, 1);
-        }
-        /* Wait until the wait0 call breaks.
-         * Since we set the state to DESTROY
-         * wait0 will return 0.
-         */
-        if (pthread_cond_wait(&ps->wakeup, &ps->mutex) != 0) {
-            pthread_mutex_unlock(&ps->mutex);
-            ACR_THROW(ACR_EX_EILLEGAL, 0);
-            return 0;
-        }
-    }
-    for (i = 1; i < ps->used; i++) {
-        (*env)->SetObjectArrayElement(env, rs, cnt++, ps->ooset[i].obj);
-        /* Unref the container. */
-        (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
-    }
-    ps->used = 1;
-    pthread_mutex_unlock(&ps->mutex);
-    return cnt;
-}
-
-ACR_UNX_EXPORT(void, SocketSelectorImpl, wakeup0)(JNI_STDARGS, jlong pollset)
-{
-    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
-
-    pthread_mutex_lock(&ps->mutex);
-    if (ps->state == PSS_POLL) {
-        char ch   = 1;
-        ps->state = PSS_WAKEUP;
-        r_write(ps->wpipe[1], &ch, 1);
-    }
-    pthread_mutex_unlock(&ps->mutex);
-}
-
-ACR_UNX_EXPORT(jint, SocketSelectorImpl, size0)(JNI_STDARGS, jlong pollset)
-{
-    int rv;
-    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
-
-    pthread_mutex_lock(&ps->mutex);
-    rv = ps->used - 1;
-    pthread_mutex_unlock(&ps->mutex);
-    return rv;
-}
-
-ACR_UNX_EXPORT(jint, SocketSelectorImpl, wait0)(JNI_STDARGS, jlong pollset,
-                                                jobjectArray rs, jshortArray revents,
-                                                jint timeout, jboolean autocancel)
-{
-    int i, ns, rc = 0;
-    int rv = 0;
-    jshort *pevents;
-    acr_time_t now = 0;
-    acr_time_t tmx = 0;
-    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
-
-    pthread_mutex_lock(&ps->mutex);
-    if (ps->state != 0) {
-        /* Note that this should never happen if api is correctly used.
-         * wait cannot be run from multiple threads and cannot be run
-         * after destroy.
-         */
-        pthread_mutex_unlock(&ps->mutex);
-        ACR_THROW(ACR_EX_EILLEGAL, 0);
-        return 0;
-    }
-    if (ps->used == 1) {
-        /* We only have the wakeup pipe in the pollset
-         * so there is no point to wait.
-         */
-        pthread_mutex_unlock(&ps->mutex);
-        return 0;
-    }
-
-    ps->state = PSS_POLL;
-    pthread_mutex_unlock(&ps->mutex);
-    if (timeout > 0)
-        tmx = AcrTimeMilliseconds() + timeout;
-    for (;;) {
-        ns = poll(ps->fdset, ps->used, timeout);
-        if (ns == -1 && errno == EINTR) {
-            if (timeout >= 0) {
-                timeout = tmx - AcrTimeMilliseconds();
-                if (timeout <= 0) {
-                    ns = 0;
-                    break;
-                }
-            }
-        }
-        else
-            break;
-    }
-
-    if (ns == -1)
-        rc = ACR_GET_OS_ERROR();
-    pthread_mutex_lock(&ps->mutex);
-    if (ps->state == PSS_DESTROY) {
-        /* Interrupted by destroy0 */
-        pthread_cond_broadcast(&ps->wakeup);
-        pthread_mutex_unlock(&ps->mutex);
-        return 0;
-    }
-    if (rc != 0) {
-        /* Error during poll */
-        ps->state = 0;
-        pthread_cond_broadcast(&ps->wakeup);
-        pthread_mutex_unlock(&ps->mutex);
-        ACR_THROW_NET_ERROR(rc);
-        return 0;
-    }
-    if (ns == 0) {
-        /* Timeout occured */
-        ps->state = 0;
-        pthread_cond_broadcast(&ps->wakeup);
-        pthread_mutex_unlock(&ps->mutex);
-        return 0;
-    }
-    if (ps->state == PSS_WAKEUP) {
-        /* Interrupted by wakeup0 */
-        if (ps->fdset[0].revents != 0) {
-            /* Drain the wakeup pipe.
-             * Wakeup pipe is always at index zero.
-             */
-            AcrDrainPipe(ps->wpipe[0]);
-        }
-        ps->state = 0;
-        pthread_cond_broadcast(&ps->wakeup);
-        pthread_mutex_unlock(&ps->mutex);
-        return 0;
-    }
-    ps->state = PSS_WAIT;
-    pevents   = JARRAY_CRITICAL(jshort, revents);
-    /* Cycle trough the descriptors */
-    for (i = 0; i < ps->used; i++) {
-        if (ps->fdset[i].revents != 0) {
-            if (i == 0) {
-                /* Drain the wakeup pipe.
-                 * Wakeup pipe is always at index zero.
-                 */
-                AcrDrainPipe(ps->wpipe[0]);
-                continue;
-            }
-            else {
-                pevents[rv] = reventt(ps->fdset[i].revents);
-                (*env)->SetObjectArrayElement(env, rs, rv++, ps->ooset[i].obj);
-                if (ps->ooset[i].ttl > 0) {
-                    /* Reset TTL
-                     */
-                    if (now == 0)
-                        now = AcrTimeNow();
-                    ps->ooset[i].exp = now + ps->ooset[i].ttl;
-                }
-            }
-        }
-        else {
-            /* Check for the expired descriptors.
-             */
-            if (ps->ooset[i].ttl > 0) {
-                if (now == 0)
-                    now = AcrTimeNow();                
-                if (ps->ooset[i].exp > now) {
-                    /* Expired descriptor */
-                    ps->fdset[i].revents = POLLHUP;
-                    pevents[rv] = ACR_OP_TIMEOUT;
-                    (*env)->SetObjectArrayElement(env, rs, rv++, ps->ooset[i].obj);
-                }
-            }
-        }
-    }
-    RELEASE_CRITICAL(revents, pevents);
-    if (autocancel == JNI_TRUE && rv > 0) {
-        /* Remove all descriptors with revents set except
-         * the wakeup pipe at index zero.
-         */
-        for (i = 1; i < ps->used; i++) {
-            if (ps->fdset[i].revents != 0) {
-                int dest = i;
-                int used = ps->used;
-                ps->used--;
-                /* Unref descriptor */
-                (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
-                for (++i; i < used; i++) {
-                    if (ps->fdset[i].revents != 0) {
-                        /* Skip signaled descriptor */
-                        (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
-                        ps->used--;
-                    }
-                    else {
-                        ps->fdset[dest] = ps->fdset[i];
-                        ps->ooset[dest] = ps->ooset[i];
-                        dest++;
-                    }
-                }
-            }
-        }
-    }
-    ps->state = 0;
-    pthread_cond_broadcast(&ps->wakeup);
-    pthread_mutex_unlock(&ps->mutex);
-    return rv;
-}
-
-ACR_UNX_EXPORT(jint, SocketSelectorImpl, add0)(JNI_STDARGS, jlong pollset, jobject fo,
-                                               jint f, jint events, jint ttlms)
-{
-    int i, rc = 0;
-    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
-
-    pthread_mutex_lock(&ps->mutex);
-    while (ps->state != 0) {
-        if (ps->state == PSS_DESTROY) {
-            rc = 0;
-            goto cleanup;
-        }
-        if (ps->state == PSS_POLL) {
-            char ch   = 1;
-            ps->state = PSS_WAKEUP;
-            r_write(ps->wpipe[1], &ch, 1);
-        }
-        if ((rc = pthread_cond_wait(&ps->wakeup, &ps->mutex)) != 0)
-            goto cleanup;
-    }
-    if (ps->used == ps->size) {
-        /* Overflow
-         */
-        rc = ACR_EOVERFLOW;
-        goto cleanup;
-    }
-    for (i = 1; i < ps->used; i++) {
-        if (ps->fdset[i].fd == f) {
-            /* Duplicate descriptor
-             */
-            rc = ACR_EALREADY;
-            goto cleanup;
-        }
-    }
-    ps->fdset[ps->used].fd      = f;
-    ps->fdset[ps->used].events  = ieventt(events);
-    ps->fdset[ps->used].revents = 0;
-    ps->ooset[ps->used].obj     = (*env)->NewGlobalRef(env, fo);
-    if (ps->ooset[ps->used].obj == 0) {
-        /* In case the NewGlobalRef fails,
-         * OutOfMemoryError should be thrown already by the JVM.
-         */
-        rc = ACR_ENOMEM;
-        goto cleanup;
-    }
-    if (ttlms > 0) {
-        ps->ooset[ps->used].ttl = AcrTimeFromMsec(ttlms);
-        ps->ooset[ps->used].exp = AcrTimeNow() + ps->ooset[ps->used].ttl;
-    }
-    else {
-        ps->ooset[ps->used].ttl = ACR_INFINITE;
-        ps->ooset[ps->used].exp = ACR_INFINITE;
-    }
-    ps->used++;
-cleanup:
-    pthread_mutex_unlock(&ps->mutex);
-    return rc;
-}
-
-ACR_UNX_EXPORT(jint, SocketSelectorImpl, del0)(JNI_STDARGS, jlong pollset,
-                                               jobject fo, jint f)
-{
-    int i, rc = ACR_EOF;
-    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
-
-    pthread_mutex_lock(&ps->mutex);
-    while (ps->state != 0) {
-        if (ps->state == PSS_DESTROY) {
-            rc = 0;
-            goto cleanup;
-        }
-        if (ps->state == PSS_POLL) {
-            char ch   = 1;
-            ps->state = PSS_WAKEUP;
-            r_write(ps->wpipe[1], &ch, 1);
-        }
-        if ((rc = pthread_cond_wait(&ps->wakeup, &ps->mutex)) != 0)
-            goto cleanup;
-    }
-
-    for (i = 1; i < ps->used; i++) {
-        if ((*env)->IsSameObject(env, ps->ooset[i].obj, fo) == JNI_TRUE) {
-            int dest = i;
-            int used = ps->used;
-            ps->used--;
-            /* Unref descriptor */
-            (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
-            for (++i; i < used; i++) {
-                ps->fdset[dest] = ps->fdset[i];
-                ps->ooset[dest] = ps->ooset[i];
-                dest++;
-            }
-            rc = 0;
-        }
-    }
-
-cleanup:
-    pthread_mutex_unlock(&ps->mutex);
-    return rc;
-}
-
-#endif /* POLLSET_USE_POLL */
-
 ACR_NET_EXPORT(jint, Poll, wait0)(JNI_STDARGS, jintArray fdset,
                                   jshortArray events, jshortArray revents,
                                   jint nevents, jint timeout)

Added: commons/sandbox/runtime/trunk/src/main/native/os/unix/selector.c
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/unix/selector.c?rev=1129273&view=auto
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/unix/selector.c (added)
+++ commons/sandbox/runtime/trunk/src/main/native/os/unix/selector.c Mon May 30 17:49:32 2011
@@ -0,0 +1,525 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "acr/clazz.h"
+#include "acr/memory.h"
+#include "acr/jniapi.h"
+#include "acr/port.h"
+#include "acr/time.h"
+#include "acr/iodefs.h"
+#include "acr/misc.h"
+#include "arch_opts.h"
+#include <poll.h>
+#if HAVE_SYS_RESOURCE_H
+# include <sys/resource.h>
+#endif
+
+/* pollset operation states */
+#define PSS_DESTROY     1
+#define PSS_POLL        2
+#define PSS_WAIT        3
+#define PSS_WAKEUP      4
+
+typedef struct acr_pollfd_t {
+    jobject        obj;
+    acr_time_t     ttl;
+    acr_time_t     exp;
+} acr_pollfd_t;
+
+typedef struct acr_pollset_t {
+    struct pollfd    *fdset;
+    acr_pollfd_t     *ooset;
+    int               used;
+    int               size;
+    volatile int      state;
+    int               wpipe[2];
+    pthread_mutex_t   mutex;
+    pthread_cond_t    wakeup;
+} acr_pollset_t;
+
+static short ieventt(int event)
+{
+    short rv = 0;
+
+    if (event & ACR_OP_INP)
+        rv |= POLLIN;
+    if (event & ACR_OP_PRI)
+        rv |= POLLPRI;
+    if (event & ACR_OP_WRITE)
+        rv |= POLLOUT;
+    /* POLLERR, POLLHUP, and POLLNVAL aren't valid as requested events
+     */
+    return rv;
+}
+
+static short reventt(short event)
+{
+    short rv = 0;
+
+    if (event & POLLIN)
+        rv |= ACR_OP_INP;
+    if (event & POLLPRI)
+        rv |= ACR_OP_PRI;
+    if (event & POLLOUT)
+        rv |= ACR_OP_WRITE;
+    if (event & POLLERR)
+        rv |= ACR_OP_ERROR;
+    if (event & POLLHUP)
+        rv |= ACR_OP_HANGUP;
+#if defined(POLLRDHUP)
+    if (event & POLLRDHUP)
+        rv |= ACR_OP_RDHUP;
+#endif
+    if (event & POLLNVAL)
+        rv |= ACR_OP_NVAL;
+    return rv;
+}
+
+ACR_NET_EXPORT(jint, Selector, init0)(JNI_STDARGS)
+{
+    return PS_DEFAULT_TYPE;
+}
+
+ACR_NET_EXPORT(jint, Selector, init1)(JNI_STDARGS)
+{
+    int nm = 65536;
+#if HAVE_SYS_RESOURCE_H
+    struct rlimit rl;
+
+    if (getrlimit(RLIMIT_NOFILE, &rl) == 0 && rl.rlim_max != RLIM_INFINITY)
+        nm = (int)rl.rlim_max;
+    else
+#endif
+        nm = (int)sysconf(_SC_OPEN_MAX);
+    if (nm > 1)
+        --nm;
+    else
+        nm = 1023;
+    return nm;
+}
+
+ACR_NET_EXPORT(jlong, PosixSelector, create0)(JNI_STDARGS, jint size)
+{
+    int rc;
+    acr_pollset_t *ps;
+
+    ps = ACR_TALLOC(acr_pollset_t);
+    if (ps == 0)
+        return 0;
+    ps->wpipe[0] = -1;
+    ps->wpipe[1] = -1;
+    ps->size     = size + 1;
+    ps->used     = 1;
+
+    ps->fdset    = ACR_MALLOC(struct pollfd, ps->size);
+    if (ps->fdset == 0)
+        return 0;
+    ps->ooset    = ACR_MALLOC(acr_pollfd_t,  ps->size);
+    if (ps->fdset == 0) {
+        AcrFree(ps->fdset);
+        return 0;
+    }
+    if ((rc = AcrPipePair(ps->wpipe, 0)) != 0) {
+        ACR_THROW_NET_ERROR(rc);
+        goto cleanup;
+    }
+    /* Add the wakeup pipe to the pset
+     */
+    ps->fdset[0].fd      = ps->wpipe[0];
+    ps->fdset[0].events  = POLLIN;
+    ps->fdset[0].revents = 0;
+    ps->ooset[0].obj     = 0;
+    ps->ooset[0].ttl     = ACR_INFINITE;
+    ps->ooset[0].exp     = ACR_INFINITE;
+
+    if (pthread_cond_init(&ps->wakeup, 0) != 0) {
+        ACR_THROW_NET_ERRNO();
+        goto cleanup;
+    }
+    if (pthread_mutex_init(&ps->mutex, 0) != 0) {
+        ACR_THROW_NET_ERRNO();
+        pthread_cond_destroy(&ps->wakeup);
+        goto cleanup;
+    }
+    return P2J(ps);
+
+cleanup:
+    AcrFree(ps->fdset);
+    AcrFree(ps->ooset);
+    AcrFree(ps);
+    return 0;
+}
+
+ACR_NET_EXPORT(int, PosixSelector, destroy0)(JNI_STDARGS, jlong pollset)
+{
+    int i;
+    int rc = 0;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    pthread_mutex_lock(&ps->mutex);
+    if (ps->state != 0) {
+        int  state = ps->state;
+        ps->state  = PSS_DESTROY;
+        if (state == PSS_POLL) {
+            char ch   = 1;
+            r_write(ps->wpipe[1], &ch, 1);
+        }
+        /* Wait until the wait0 call breaks.
+         * Since we set the state to DESTROY
+         * wait0 will return 0.
+         */
+        if ((rc = pthread_cond_wait(&ps->wakeup, &ps->mutex)) != 0) {
+            pthread_mutex_unlock(&ps->mutex);
+            return rc;
+        }
+    }
+    ps->state = PSS_DESTROY;
+    for (i = 1; i < ps->used; i++) {
+        /* Invalidate the container. */
+        (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
+    }
+    ps->used = 0;
+    pthread_mutex_unlock(&ps->mutex);
+    r_close(ps->wpipe[0]);
+    r_close(ps->wpipe[1]);
+    pthread_cond_destroy(&ps->wakeup);
+    pthread_mutex_destroy(&ps->mutex);
+    AcrFree(ps->fdset);
+    AcrFree(ps->ooset);
+    AcrFree(ps);
+    return rc;
+}
+
+ACR_NET_EXPORT(jint, PosixSelector, clear0)(JNI_STDARGS, jlong pollset,
+                                                 jobjectArray rs)
+{
+    int i;
+    int cnt = 0;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    pthread_mutex_lock(&ps->mutex);
+    while (ps->state != 0) {
+        if (ps->state == PSS_DESTROY) {
+            /* Interrupted by destroy0 */
+            pthread_mutex_unlock(&ps->mutex);
+            return 0;
+        }
+        if (ps->state == PSS_POLL) {
+            char ch   = 1;
+            ps->state = PSS_WAKEUP;
+            r_write(ps->wpipe[1], &ch, 1);
+        }
+        /* Wait until the wait0 call breaks.
+         * Since we set the state to DESTROY
+         * wait0 will return 0.
+         */
+        if (pthread_cond_wait(&ps->wakeup, &ps->mutex) != 0) {
+            pthread_mutex_unlock(&ps->mutex);
+            ACR_THROW(ACR_EX_EILLEGAL, 0);
+            return 0;
+        }
+    }
+    for (i = 1; i < ps->used; i++) {
+        (*env)->SetObjectArrayElement(env, rs, cnt++, ps->ooset[i].obj);
+        /* Unref the container. */
+        (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
+    }
+    ps->used = 1;
+    pthread_mutex_unlock(&ps->mutex);
+    return cnt;
+}
+
+ACR_NET_EXPORT(void, PosixSelector, wakeup0)(JNI_STDARGS, jlong pollset)
+{
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    pthread_mutex_lock(&ps->mutex);
+    if (ps->state == PSS_POLL) {
+        char ch   = 1;
+        ps->state = PSS_WAKEUP;
+        r_write(ps->wpipe[1], &ch, 1);
+    }
+    pthread_mutex_unlock(&ps->mutex);
+}
+
+ACR_NET_EXPORT(jint, PosixSelector, size0)(JNI_STDARGS, jlong pollset)
+{
+    int rv;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    pthread_mutex_lock(&ps->mutex);
+    rv = ps->used - 1;
+    pthread_mutex_unlock(&ps->mutex);
+    return rv;
+}
+
+ACR_NET_EXPORT(jint, PosixSelector, wait0)(JNI_STDARGS, jlong pollset,
+                                                jobjectArray rs, jshortArray revents,
+                                                jint timeout, jboolean autocancel)
+{
+    int i, ns, rc = 0;
+    int rv = 0;
+    jshort *pevents;
+    acr_time_t now = 0;
+    acr_time_t tmx = 0;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    pthread_mutex_lock(&ps->mutex);
+    if (ps->state != 0) {
+        /* Note that this should never happen if api is correctly used.
+         * wait cannot be run from multiple threads and cannot be run
+         * after destroy.
+         */
+        pthread_mutex_unlock(&ps->mutex);
+        ACR_THROW(ACR_EX_EILLEGAL, 0);
+        return 0;
+    }
+    if (ps->used == 1) {
+        /* We only have the wakeup pipe in the pollset
+         * so there is no point to wait.
+         */
+        pthread_mutex_unlock(&ps->mutex);
+        return 0;
+    }
+
+    ps->state = PSS_POLL;
+    pthread_mutex_unlock(&ps->mutex);
+    if (timeout > 0)
+        tmx = AcrTimeMilliseconds() + timeout;
+    for (;;) {
+        ns = poll(ps->fdset, ps->used, timeout);
+        if (ns == -1 && errno == EINTR) {
+            if (timeout >= 0) {
+                timeout = tmx - AcrTimeMilliseconds();
+                if (timeout <= 0) {
+                    ns = 0;
+                    break;
+                }
+            }
+        }
+        else
+            break;
+    }
+
+    if (ns == -1)
+        rc = ACR_GET_OS_ERROR();
+    pthread_mutex_lock(&ps->mutex);
+    if (ps->state == PSS_DESTROY) {
+        /* Interrupted by destroy0 */
+        pthread_cond_broadcast(&ps->wakeup);
+        pthread_mutex_unlock(&ps->mutex);
+        return 0;
+    }
+    if (rc != 0) {
+        /* Error during poll */
+        ps->state = 0;
+        pthread_cond_broadcast(&ps->wakeup);
+        pthread_mutex_unlock(&ps->mutex);
+        ACR_THROW_NET_ERROR(rc);
+        return 0;
+    }
+    if (ns == 0) {
+        /* Timeout occured */
+        ps->state = 0;
+        pthread_cond_broadcast(&ps->wakeup);
+        pthread_mutex_unlock(&ps->mutex);
+        return 0;
+    }
+    if (ps->state == PSS_WAKEUP) {
+        /* Interrupted by wakeup0 */
+        if (ps->fdset[0].revents != 0) {
+            /* Drain the wakeup pipe.
+             * Wakeup pipe is always at index zero.
+             */
+            AcrDrainPipe(ps->wpipe[0]);
+        }
+        ps->state = 0;
+        pthread_cond_broadcast(&ps->wakeup);
+        pthread_mutex_unlock(&ps->mutex);
+        return 0;
+    }
+    ps->state = PSS_WAIT;
+    pevents   = JARRAY_CRITICAL(jshort, revents);
+    /* Cycle trough the descriptors */
+    for (i = 0; i < ps->used; i++) {
+        if (ps->fdset[i].revents != 0) {
+            if (i == 0) {
+                /* Drain the wakeup pipe.
+                 * Wakeup pipe is always at index zero.
+                 */
+                AcrDrainPipe(ps->wpipe[0]);
+                continue;
+            }
+            else {
+                pevents[rv] = reventt(ps->fdset[i].revents);
+                (*env)->SetObjectArrayElement(env, rs, rv++, ps->ooset[i].obj);
+                if (ps->ooset[i].ttl > 0) {
+                    /* Reset TTL
+                     */
+                    if (now == 0)
+                        now = AcrTimeNow();
+                    ps->ooset[i].exp = now + ps->ooset[i].ttl;
+                }
+            }
+        }
+        else {
+            /* Check for the expired descriptors.
+             */
+            if (ps->ooset[i].ttl > 0) {
+                if (now == 0)
+                    now = AcrTimeNow();                
+                if (ps->ooset[i].exp > now) {
+                    /* Expired descriptor */
+                    ps->fdset[i].revents = POLLHUP;
+                    pevents[rv] = ACR_OP_TIMEOUT;
+                    (*env)->SetObjectArrayElement(env, rs, rv++, ps->ooset[i].obj);
+                }
+            }
+        }
+    }
+    RELEASE_CRITICAL(revents, pevents);
+    if (autocancel == JNI_TRUE && rv > 0) {
+        /* Remove all descriptors with revents set except
+         * the wakeup pipe at index zero.
+         */
+        for (i = 1; i < ps->used; i++) {
+            if (ps->fdset[i].revents != 0) {
+                int dest = i;
+                int used = ps->used;
+                ps->used--;
+                /* Unref descriptor */
+                (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
+                for (++i; i < used; i++) {
+                    if (ps->fdset[i].revents != 0) {
+                        /* Skip signaled descriptor */
+                        (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
+                        ps->used--;
+                    }
+                    else {
+                        ps->fdset[dest] = ps->fdset[i];
+                        ps->ooset[dest] = ps->ooset[i];
+                        dest++;
+                    }
+                }
+            }
+        }
+    }
+    ps->state = 0;
+    pthread_cond_broadcast(&ps->wakeup);
+    pthread_mutex_unlock(&ps->mutex);
+    return rv;
+}
+
+ACR_NET_EXPORT(jint, PosixSelector, add0)(JNI_STDARGS, jlong pollset, jobject fo,
+                                               jint f, jint events, jint ttlms)
+{
+    int i, rc = 0;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    pthread_mutex_lock(&ps->mutex);
+    while (ps->state != 0) {
+        if (ps->state == PSS_DESTROY) {
+            rc = 0;
+            goto cleanup;
+        }
+        if (ps->state == PSS_POLL) {
+            char ch   = 1;
+            ps->state = PSS_WAKEUP;
+            r_write(ps->wpipe[1], &ch, 1);
+        }
+        if ((rc = pthread_cond_wait(&ps->wakeup, &ps->mutex)) != 0)
+            goto cleanup;
+    }
+    if (ps->used == ps->size) {
+        /* Overflow
+         */
+        rc = ACR_EOVERFLOW;
+        goto cleanup;
+    }
+    for (i = 1; i < ps->used; i++) {
+        if (ps->fdset[i].fd == f) {
+            /* Duplicate descriptor
+             */
+            rc = ACR_EALREADY;
+            goto cleanup;
+        }
+    }
+    ps->fdset[ps->used].fd      = f;
+    ps->fdset[ps->used].events  = ieventt(events);
+    ps->fdset[ps->used].revents = 0;
+    ps->ooset[ps->used].obj     = (*env)->NewGlobalRef(env, fo);
+    if (ps->ooset[ps->used].obj == 0) {
+        /* In case the NewGlobalRef fails,
+         * OutOfMemoryError should be thrown already by the JVM.
+         */
+        rc = ACR_ENOMEM;
+        goto cleanup;
+    }
+    if (ttlms > 0) {
+        ps->ooset[ps->used].ttl = AcrTimeFromMsec(ttlms);
+        ps->ooset[ps->used].exp = AcrTimeNow() + ps->ooset[ps->used].ttl;
+    }
+    else {
+        ps->ooset[ps->used].ttl = ACR_INFINITE;
+        ps->ooset[ps->used].exp = ACR_INFINITE;
+    }
+    ps->used++;
+cleanup:
+    pthread_mutex_unlock(&ps->mutex);
+    return rc;
+}
+
+ACR_NET_EXPORT(jint, PosixSelector, del0)(JNI_STDARGS, jlong pollset,
+                                               jobject fo, jint f)
+{
+    int i, rc = ACR_EOF;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    pthread_mutex_lock(&ps->mutex);
+    while (ps->state != 0) {
+        if (ps->state == PSS_DESTROY) {
+            rc = 0;
+            goto cleanup;
+        }
+        if (ps->state == PSS_POLL) {
+            char ch   = 1;
+            ps->state = PSS_WAKEUP;
+            r_write(ps->wpipe[1], &ch, 1);
+        }
+        if ((rc = pthread_cond_wait(&ps->wakeup, &ps->mutex)) != 0)
+            goto cleanup;
+    }
+
+    for (i = 1; i < ps->used; i++) {
+        if ((*env)->IsSameObject(env, ps->ooset[i].obj, fo) == JNI_TRUE) {
+            int dest = i;
+            int used = ps->used;
+            ps->used--;
+            /* Unref descriptor */
+            (*env)->DeleteGlobalRef(env, ps->ooset[i].obj);
+            for (++i; i < used; i++) {
+                ps->fdset[dest] = ps->fdset[i];
+                ps->ooset[dest] = ps->ooset[i];
+                dest++;
+            }
+            rc = 0;
+        }
+    }
+
+cleanup:
+    pthread_mutex_unlock(&ps->mutex);
+    return rc;
+}

Propchange: commons/sandbox/runtime/trunk/src/main/native/os/unix/selector.c
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: commons/sandbox/runtime/trunk/src/main/test/org/apache/commons/runtime/TestSelectionKey.java
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/test/org/apache/commons/runtime/TestSelectionKey.java?rev=1129273&r1=1129272&r2=1129273&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/test/org/apache/commons/runtime/TestSelectionKey.java (original)
+++ commons/sandbox/runtime/trunk/src/main/test/org/apache/commons/runtime/TestSelectionKey.java Mon May 30 17:49:32 2011
@@ -31,7 +31,7 @@ public class TestSelectionKey extends As
         // Create socket bound to the first free port
         //
         SocketEndpoint ep = new SocketEndpoint();
-        Selector ss = SocketSelectorFactory.newInstance();
+        Selector ss = Selector.newInstance();
         assertNotNull(ss);
         System.out.println("SocketSelector capacity=" + ss.capacity());
         SelectionKey skey = ep.key(ss);



Mime
View raw message