Return-Path: X-Original-To: apmail-commons-commits-archive@minotaur.apache.org Delivered-To: apmail-commons-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0370F6FBF for ; Mon, 30 May 2011 17:49:57 +0000 (UTC) Received: (qmail 33113 invoked by uid 500); 30 May 2011 17:49:56 -0000 Delivered-To: apmail-commons-commits-archive@commons.apache.org Received: (qmail 33061 invoked by uid 500); 30 May 2011 17:49:56 -0000 Mailing-List: contact commits-help@commons.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@commons.apache.org Delivered-To: mailing list commits@commons.apache.org Received: (qmail 33054 invoked by uid 99); 30 May 2011 17:49:56 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 May 2011 17:49:56 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 May 2011 17:49:53 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 5AA442388A2C; Mon, 30 May 2011 17:49:33 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1129273 - in /commons/sandbox/runtime/trunk/src/main: java/org/apache/commons/runtime/net/ java/org/apache/commons/runtime/platform/unix/ native/ native/os/unix/ test/org/apache/commons/runtime/ Date: Mon, 30 May 2011 17:49:33 -0000 To: commits@commons.apache.org From: mturk@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110530174933.5AA442388A2C@eris.apache.org> Author: mturk Date: Mon May 30 17:49:32 2011 New Revision: 1129273 URL: http://svn.apache.org/viewvc?rev=1129273&view=rev Log: Simplify selector class layout Added: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/PosixSelector.java (with props) commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKeyImpl.java (with props) commons/sandbox/runtime/trunk/src/main/native/os/unix/selector.c (with props) Removed: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/LocalSocketSelectorFactory.java commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKeyFactory.java commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SocketSelectorFactory.java commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/platform/unix/SelectionKeyImpl.java commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/platform/unix/SocketSelectorImpl.java Modified: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/AbstractSocketDescriptor.java commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Endpoint.java commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKey.java commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Selector.java commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h commons/sandbox/runtime/trunk/src/main/native/os/unix/poll.c commons/sandbox/runtime/trunk/src/main/test/org/apache/commons/runtime/TestSelectionKey.java Modified: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/AbstractSocketDescriptor.java URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/AbstractSocketDescriptor.java?rev=1129273&r1=1129272&r2=1129273&view=diff ============================================================================== --- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/AbstractSocketDescriptor.java (original) +++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/AbstractSocketDescriptor.java Mon May 30 17:49:32 2011 @@ -32,8 +32,15 @@ import org.apache.commons.runtime.io.Des abstract class AbstractSocketDescriptor extends Descriptor { + /** + * The socket blocking mode. + */ protected boolean blocking = false; - + + public volatile boolean selecting = false; + /** + * Create new object instance. + */ protected AbstractSocketDescriptor() { } Modified: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Endpoint.java URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Endpoint.java?rev=1129273&r1=1129272&r2=1129273&view=diff ============================================================================== --- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Endpoint.java (original) +++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Endpoint.java Mon May 30 17:49:32 2011 @@ -32,7 +32,7 @@ import org.apache.commons.runtime.io.Des public abstract class Endpoint implements Closeable { /** - * Selection key. + * This endpoint's selection key. */ protected SelectionKey key; @@ -84,7 +84,7 @@ public abstract class Endpoint implement if (selector == null) throw new NullPointerException(Local.sm.get("selector.NULL")); if (key == null) - key = SelectionKeyFactory.newInstance(selector, this); + key = new SelectionKeyImpl(selector, this); if (key.selector() != selector) throw new IllegalSelectorException(); return key; Added: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/PosixSelector.java URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/PosixSelector.java?rev=1129273&view=auto ============================================================================== --- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/PosixSelector.java (added) +++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/PosixSelector.java Mon May 30 17:49:32 2011 @@ -0,0 +1,159 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.runtime.net; + +import java.io.IOException; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.HashSet; +import org.apache.commons.runtime.io.ClosedDescriptorException; +import org.apache.commons.runtime.io.Descriptor; +import org.apache.commons.runtime.AlreadyExistsException; +import org.apache.commons.runtime.InvalidArgumentException; +import org.apache.commons.runtime.NoSuchObjectException; +import org.apache.commons.runtime.OperationNotImplementedException; +import org.apache.commons.runtime.OverflowException; +import org.apache.commons.runtime.SystemException; +import org.apache.commons.runtime.Errno; +import org.apache.commons.runtime.Status; + +/** + * Socket Selector implementation class. + *

+ *

+ * + * @since Runtime 1.0 + */ +final class PosixSelector extends Selector +{ + + private short[] revents; + private SelectionKeyImpl[] keyset; + private long pollset; + + private static native long create0(int size) + throws OutOfMemoryError, + SystemException; + private static native void wakeup0(long pollset); + private static native int destroy0(long pollset); + private static native int size0(long pollset); + private static native int add0(long pollset, SelectionKeyImpl key, int fd, int events, int ttl); + private static native int del0(long pollset, SelectionKeyImpl key, int fd); + private static native int clear0(long pollset, SelectionKeyImpl[] set); + private static native int wait0(long pollset, SelectionKeyImpl[] set, short[] events, + int timeout, boolean autocancel); + + /* + * Created from native + */ + public PosixSelector(int size) + { + super(size); + pollset = create0(size); + revents = new short[size]; + keyset = new SelectionKeyImpl[size]; + } + + private void ensureValid() + throws ClosedSelectorException + { + if (pollset == 0L) + throw new ClosedSelectorException(); + } + + @Override + public int size() + throws ClosedSelectorException + { + ensureValid(); + return size0(pollset); + } + + @Override + public void interrupt() + throws ClosedSelectorException + { + ensureValid(); + wakeup0(pollset); + } + + @Override + public boolean register(SelectionKey key, int ops) + throws ClosedSelectorException, + IllegalSelectorException, + ClosedDescriptorException, + OverflowException, + IOException + { + ensureValid(); + SelectionKeyImpl skey = (SelectionKeyImpl)key; + if (skey.selector() != this) + throw new IllegalSelectorException(); + Descriptor sd = skey.endpoint().descriptor(); + int fd = sd.fd(); + if (fd == -1) + throw new ClosedDescriptorException(); + int rc = add0(pollset, skey, fd, ops, skey.timeout()); + if (rc != 0) { + if (rc == Errno.EALREADY) + return false; + if (rc == Errno.EOVERFLOW) + throw new OverflowException(); + else + throw new IOException(Status.describe(rc)); + } + return true; + } + + @Override + protected void cancel(SelectionKey key) + throws ClosedSelectorException, + IllegalSelectorException + { + ensureValid(); + if (key.selector() != this) + throw new IllegalSelectorException(); + SelectionKeyImpl skey = (SelectionKeyImpl)key; + Descriptor sd = skey.endpoint().descriptor(); + int fd = sd.fd(); + if (fd != -1) { + // Remove selection key + del0(pollset, skey, fd); + skey.revents = 0; + } + } + + @Override + public boolean isOpen() + { + return pollset != 0L; + } + + @Override + public void close() + throws IOException + { + if (pollset != 0L) { + long ps = pollset; + pollset = 0L; + int rc = destroy0(ps); + if (rc != 0) + throw new IOException(Status.describe(rc)); + } + } + +} + Propchange: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/PosixSelector.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKey.java URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKey.java?rev=1129273&r1=1129272&r2=1129273&view=diff ============================================================================== --- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKey.java (original) +++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKey.java Mon May 30 17:49:32 2011 @@ -26,8 +26,8 @@ import org.apache.commons.runtime.net.En public abstract class SelectionKey { - private Object attachment; private Selector selector; + private Object attachment; private Endpoint endpoint; private int timeout; @@ -134,4 +134,36 @@ public abstract class SelectionKey return prevtime; } + /** + * Register this selection key with the given selector. + * + * @throws ClosedSelectorException if this selector is closed. + * @throws IllegalSelectorException if this key was not created by the same + * selector as the given selector. + */ + public SelectionKey register(Selector selector) + throws IllegalSelectorException + { + if (this.selector == null || selector == null) + this.selector = selector; + if (this.selector != selector) + throw new IllegalSelectorException(); + if (this.selector == null) { + AbstractSocketDescriptor sd = (AbstractSocketDescriptor)endpoint.descriptor(); + sd.selecting = false; + } + return this; + } + /** + * Unregister this selection key from its selector. + * + * @throws ClosedSelectorException if this selector is closed. + */ + public void cancel() + throws ClosedSelectorException + { + selector.cancel(this); + } + + } Added: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKeyImpl.java URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKeyImpl.java?rev=1129273&view=auto ============================================================================== --- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKeyImpl.java (added) +++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKeyImpl.java Mon May 30 17:49:32 2011 @@ -0,0 +1,69 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.runtime.net; + +import org.apache.commons.runtime.AlreadyExistsException; +import org.apache.commons.runtime.InvalidArgumentException; +import org.apache.commons.runtime.NoSuchObjectException; +import org.apache.commons.runtime.OperationNotImplementedException; +import org.apache.commons.runtime.SystemException; +import org.apache.commons.runtime.Errno; + +/** + * Selector implementation class. + *

+ *

+ * + * @since Runtime 1.0 + */ +final class SelectionKeyImpl extends SelectionKey +{ + + public int ievents; + public int revents; + + public SelectionKeyImpl(Selector selector, Endpoint endpoint, + int ievents) + { + super(selector, endpoint); + this.ievents = ievents; + this.revents = 0; + } + + public SelectionKeyImpl(Selector selector, Endpoint endpoint) + { + this(selector, endpoint, 0); + } + + @Override + public int events() + { + return revents; + } + + @Override + public SelectionKey events(int set) + { + ievents = set; + return this; + } + + public void destroy() + { + register(null); + } + +} Propchange: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/SelectionKeyImpl.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Selector.java URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Selector.java?rev=1129273&r1=1129272&r2=1129273&view=diff ============================================================================== --- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Selector.java (original) +++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/Selector.java Mon May 30 17:49:32 2011 @@ -22,6 +22,7 @@ import java.io.Closeable; import java.io.IOException; import org.apache.commons.runtime.io.ClosedDescriptorException; import org.apache.commons.runtime.io.InvalidDescriptorException; +import org.apache.commons.runtime.InvalidRangeException; import org.apache.commons.runtime.OverflowException; /** @@ -29,13 +30,63 @@ import org.apache.commons.runtime.Overfl */ public abstract class Selector implements Closeable { - private int capacity; + private int capacity; + private static final int type; + /** + * Maximum size of the selector limited by the operating system + */ + public static final int MAX_CAPACITY; + private static native int init0(); + private static native int init1(); + static { + type = init0(); + MAX_CAPACITY = init1(); + } /** * Indicates the current auto-cancel mode for this {@code Selector} object. */ protected boolean autoCancel; + /** + * Creates a new {@code Selector} instance with the given size. + * + * @return the new socket selector. + * + * @throws InvalidRangeException if {@code size} is outside the + * valid range. + * @throws OutOfMemoryError if the memory allocation fails. + */ + public static Selector newInstance(int size) + throws InvalidRangeException, OutOfMemoryError + { + if (size < 1 || size > MAX_CAPACITY) + throw new InvalidRangeException(Local.sm.get("selector.ERANGE")); + switch (type) { + case 0: + return new PosixSelector(size); + default: + throw new RuntimeException("Unknown Selector type"); + } + } + + /** + * Creates a new {@code Selector} instance. + * + * @return the new socket selector. + * + * @throws RuntimeException if the selector capacity cannot be + * determined. + * @throws OutOfMemoryError if the memory allocation fails. + */ + public static Selector newInstance() + throws RuntimeException, OutOfMemoryError + { + if (MAX_CAPACITY < 1) + throw new RuntimeException(); + return newInstance(MAX_CAPACITY); + } + private Selector() { // No instance @@ -122,6 +173,13 @@ public abstract class Selector implement throws IOException; /** + * Tells whether or not this selector is open. + * + * @return {@code true} if, and only if, this selector is open + */ + public abstract boolean isOpen(); + + /** * Sets this selector's auto-cancel mode to the given state. * If the selector is in auto-cancel mode, then the keys are * canceled when signaled. Modified: commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in?rev=1129273&r1=1129272&r2=1129273&view=diff ============================================================================== --- commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in (original) +++ commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in Mon May 30 17:49:32 2011 @@ -71,6 +71,7 @@ UNIX_SOURCES=\ $(TOPDIR)/os/unix/procmutex.c \ $(TOPDIR)/os/unix/shmem.c \ $(TOPDIR)/os/unix/selectkey.c \ + $(TOPDIR)/os/unix/selector.c \ $(TOPDIR)/os/unix/semaphore.c \ $(TOPDIR)/os/unix/time.c \ $(TOPDIR)/os/unix/usock.c \ Modified: commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h?rev=1129273&r1=1129272&r2=1129273&view=diff ============================================================================== --- commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h (original) +++ commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h Mon May 30 17:49:32 2011 @@ -134,10 +134,14 @@ typedef struct stat struct_stat_ #endif /* F_DUPFD */ +#define PS_TYPE_POLL 0 +#define PS_TYPE_EPOLL 1 +#define PS_TYPE_DEVPOLL 2 +#define PS_TYPE_KQUEUE 3 + #if 1 -# define POLLSET_USE_POLL 1 +# define PS_DEFAULT_TYPE PS_TYPE_POLL #else -# define POLLSET_USE_POLL 0 #endif Modified: commons/sandbox/runtime/trunk/src/main/native/os/unix/poll.c URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/unix/poll.c?rev=1129273&r1=1129272&r2=1129273&view=diff ============================================================================== --- commons/sandbox/runtime/trunk/src/main/native/os/unix/poll.c (original) +++ commons/sandbox/runtime/trunk/src/main/native/os/unix/poll.c Mon May 30 17:49:32 2011 @@ -27,46 +27,6 @@ # include #endif -#if POLLSET_USE_POLL -/* pollset operation states */ -#define PSS_DESTROY 1 -#define PSS_POLL 2 -#define PSS_WAIT 3 -#define PSS_WAKEUP 4 - -typedef struct acr_pollfd_t { - jobject obj; - acr_time_t ttl; - acr_time_t exp; -} acr_pollfd_t; - -typedef struct acr_pollset_t { - struct pollfd *fdset; - acr_pollfd_t *ooset; - int used; - int size; - volatile int state; - int wpipe[2]; - pthread_mutex_t mutex; - pthread_cond_t wakeup; -} acr_pollset_t; - -J_DECLARE_CLAZZ = { - INVALID_FIELD_OFFSET, - 0, - 0, - 0, - ACR_UNX_CP "SocketSelectorImpl" -}; - -J_DECLARE_M_ID(0000) = { - 0, - "", - "(I)V" -}; - -#endif - static short ieventt(int event) { short rv = 0; @@ -105,479 +65,6 @@ static short reventt(short event) return rv; } -#if POLLSET_USE_POLL - -ACR_NET_EXPORT(jobject, SocketSelectorFactory, new0)(JNI_STDARGS, jint size) -{ - if (_clazzn.u == 1) - return (*env)->NewObject(env, _clazzn.i, J4MID(0000), size); - else - return 0; -} - -ACR_NET_EXPORT(jobject, LocalSocketSelectorFactory, new0)(JNI_STDARGS, jint size) -{ - if (_clazzn.u == 1) - return (*env)->NewObject(env, _clazzn.i, J4MID(0000), size); - else - return 0; -} - -static int maxopendesc(void) -{ - int nm = 65536; -#if HAVE_SYS_RESOURCE_H - struct rlimit rl; - - if (getrlimit(RLIMIT_NOFILE, &rl) == 0 && rl.rlim_max != RLIM_INFINITY) - nm = (int)rl.rlim_max; - else -#endif - nm = (int)sysconf(_SC_OPEN_MAX); - if (nm > 1) - --nm; - else - nm = 1023; - return nm; -} - -ACR_NET_EXPORT(jint, SocketSelectorFactory, init0)(JNI_STDARGS) -{ - if (_clazzn.u == 0) { - if (AcrLoadClass(env, &_clazzn, 0) == JNI_FALSE) - return 0; - R_LOAD_METHOD(0000, 0); - _clazzn.u = 1; - } - return maxopendesc(); -} - -ACR_NET_EXPORT(jint, LocalSocketSelectorFactory, init0)(JNI_STDARGS) -{ - if (_clazzn.u == 0) { - if (AcrLoadClass(env, &_clazzn, 0) == JNI_FALSE) - return 0; - R_LOAD_METHOD(0000, 0); - _clazzn.u = 1; - } - return maxopendesc(); -} - -ACR_UNX_EXPORT(jlong, SocketSelectorImpl, create0)(JNI_STDARGS, jint size) -{ - int rc; - acr_pollset_t *ps; - - ps = ACR_TALLOC(acr_pollset_t); - if (ps == 0) - return 0; - ps->wpipe[0] = -1; - ps->wpipe[1] = -1; - ps->size = size + 1; - ps->used = 1; - - ps->fdset = ACR_MALLOC(struct pollfd, ps->size); - if (ps->fdset == 0) - return 0; - ps->ooset = ACR_MALLOC(acr_pollfd_t, ps->size); - if (ps->fdset == 0) { - AcrFree(ps->fdset); - return 0; - } - if ((rc = AcrPipePair(ps->wpipe, 0)) != 0) { - ACR_THROW_NET_ERROR(rc); - goto cleanup; - } - /* Add the wakeup pipe to the pset - */ - ps->fdset[0].fd = ps->wpipe[0]; - ps->fdset[0].events = POLLIN; - ps->fdset[0].revents = 0; - ps->ooset[0].obj = 0; - ps->ooset[0].ttl = ACR_INFINITE; - ps->ooset[0].exp = ACR_INFINITE; - - if (pthread_cond_init(&ps->wakeup, 0) != 0) { - ACR_THROW_NET_ERRNO(); - goto cleanup; - } - if (pthread_mutex_init(&ps->mutex, 0) != 0) { - ACR_THROW_NET_ERRNO(); - pthread_cond_destroy(&ps->wakeup); - goto cleanup; - } - return P2J(ps); - -cleanup: - AcrFree(ps->fdset); - AcrFree(ps->ooset); - AcrFree(ps); - return 0; -} - -ACR_UNX_EXPORT(int, SocketSelectorImpl, destroy0)(JNI_STDARGS, jlong pollset) -{ - int i; - int rc = 0; - acr_pollset_t *ps = J2P(pollset, acr_pollset_t *); - - pthread_mutex_lock(&ps->mutex); - if (ps->state != 0) { - int state = ps->state; - ps->state = PSS_DESTROY; - if (state == PSS_POLL) { - char ch = 1; - r_write(ps->wpipe[1], &ch, 1); - } - /* Wait until the wait0 call breaks. - * Since we set the state to DESTROY - * wait0 will return 0. - */ - if ((rc = pthread_cond_wait(&ps->wakeup, &ps->mutex)) != 0) { - pthread_mutex_unlock(&ps->mutex); - return rc; - } - } - ps->state = PSS_DESTROY; - for (i = 1; i < ps->used; i++) { - /* Invalidate the container. */ - (*env)->DeleteGlobalRef(env, ps->ooset[i].obj); - } - ps->used = 0; - pthread_mutex_unlock(&ps->mutex); - r_close(ps->wpipe[0]); - r_close(ps->wpipe[1]); - pthread_cond_destroy(&ps->wakeup); - pthread_mutex_destroy(&ps->mutex); - AcrFree(ps->fdset); - AcrFree(ps->ooset); - AcrFree(ps); - return rc; -} - -ACR_UNX_EXPORT(jint, SocketSelectorImpl, clear0)(JNI_STDARGS, jlong pollset, - jobjectArray rs) -{ - int i; - int cnt = 0; - acr_pollset_t *ps = J2P(pollset, acr_pollset_t *); - - pthread_mutex_lock(&ps->mutex); - while (ps->state != 0) { - if (ps->state == PSS_DESTROY) { - /* Interrupted by destroy0 */ - pthread_mutex_unlock(&ps->mutex); - return 0; - } - if (ps->state == PSS_POLL) { - char ch = 1; - ps->state = PSS_WAKEUP; - r_write(ps->wpipe[1], &ch, 1); - } - /* Wait until the wait0 call breaks. - * Since we set the state to DESTROY - * wait0 will return 0. - */ - if (pthread_cond_wait(&ps->wakeup, &ps->mutex) != 0) { - pthread_mutex_unlock(&ps->mutex); - ACR_THROW(ACR_EX_EILLEGAL, 0); - return 0; - } - } - for (i = 1; i < ps->used; i++) { - (*env)->SetObjectArrayElement(env, rs, cnt++, ps->ooset[i].obj); - /* Unref the container. */ - (*env)->DeleteGlobalRef(env, ps->ooset[i].obj); - } - ps->used = 1; - pthread_mutex_unlock(&ps->mutex); - return cnt; -} - -ACR_UNX_EXPORT(void, SocketSelectorImpl, wakeup0)(JNI_STDARGS, jlong pollset) -{ - acr_pollset_t *ps = J2P(pollset, acr_pollset_t *); - - pthread_mutex_lock(&ps->mutex); - if (ps->state == PSS_POLL) { - char ch = 1; - ps->state = PSS_WAKEUP; - r_write(ps->wpipe[1], &ch, 1); - } - pthread_mutex_unlock(&ps->mutex); -} - -ACR_UNX_EXPORT(jint, SocketSelectorImpl, size0)(JNI_STDARGS, jlong pollset) -{ - int rv; - acr_pollset_t *ps = J2P(pollset, acr_pollset_t *); - - pthread_mutex_lock(&ps->mutex); - rv = ps->used - 1; - pthread_mutex_unlock(&ps->mutex); - return rv; -} - -ACR_UNX_EXPORT(jint, SocketSelectorImpl, wait0)(JNI_STDARGS, jlong pollset, - jobjectArray rs, jshortArray revents, - jint timeout, jboolean autocancel) -{ - int i, ns, rc = 0; - int rv = 0; - jshort *pevents; - acr_time_t now = 0; - acr_time_t tmx = 0; - acr_pollset_t *ps = J2P(pollset, acr_pollset_t *); - - pthread_mutex_lock(&ps->mutex); - if (ps->state != 0) { - /* Note that this should never happen if api is correctly used. - * wait cannot be run from multiple threads and cannot be run - * after destroy. - */ - pthread_mutex_unlock(&ps->mutex); - ACR_THROW(ACR_EX_EILLEGAL, 0); - return 0; - } - if (ps->used == 1) { - /* We only have the wakeup pipe in the pollset - * so there is no point to wait. - */ - pthread_mutex_unlock(&ps->mutex); - return 0; - } - - ps->state = PSS_POLL; - pthread_mutex_unlock(&ps->mutex); - if (timeout > 0) - tmx = AcrTimeMilliseconds() + timeout; - for (;;) { - ns = poll(ps->fdset, ps->used, timeout); - if (ns == -1 && errno == EINTR) { - if (timeout >= 0) { - timeout = tmx - AcrTimeMilliseconds(); - if (timeout <= 0) { - ns = 0; - break; - } - } - } - else - break; - } - - if (ns == -1) - rc = ACR_GET_OS_ERROR(); - pthread_mutex_lock(&ps->mutex); - if (ps->state == PSS_DESTROY) { - /* Interrupted by destroy0 */ - pthread_cond_broadcast(&ps->wakeup); - pthread_mutex_unlock(&ps->mutex); - return 0; - } - if (rc != 0) { - /* Error during poll */ - ps->state = 0; - pthread_cond_broadcast(&ps->wakeup); - pthread_mutex_unlock(&ps->mutex); - ACR_THROW_NET_ERROR(rc); - return 0; - } - if (ns == 0) { - /* Timeout occured */ - ps->state = 0; - pthread_cond_broadcast(&ps->wakeup); - pthread_mutex_unlock(&ps->mutex); - return 0; - } - if (ps->state == PSS_WAKEUP) { - /* Interrupted by wakeup0 */ - if (ps->fdset[0].revents != 0) { - /* Drain the wakeup pipe. - * Wakeup pipe is always at index zero. - */ - AcrDrainPipe(ps->wpipe[0]); - } - ps->state = 0; - pthread_cond_broadcast(&ps->wakeup); - pthread_mutex_unlock(&ps->mutex); - return 0; - } - ps->state = PSS_WAIT; - pevents = JARRAY_CRITICAL(jshort, revents); - /* Cycle trough the descriptors */ - for (i = 0; i < ps->used; i++) { - if (ps->fdset[i].revents != 0) { - if (i == 0) { - /* Drain the wakeup pipe. - * Wakeup pipe is always at index zero. - */ - AcrDrainPipe(ps->wpipe[0]); - continue; - } - else { - pevents[rv] = reventt(ps->fdset[i].revents); - (*env)->SetObjectArrayElement(env, rs, rv++, ps->ooset[i].obj); - if (ps->ooset[i].ttl > 0) { - /* Reset TTL - */ - if (now == 0) - now = AcrTimeNow(); - ps->ooset[i].exp = now + ps->ooset[i].ttl; - } - } - } - else { - /* Check for the expired descriptors. - */ - if (ps->ooset[i].ttl > 0) { - if (now == 0) - now = AcrTimeNow(); - if (ps->ooset[i].exp > now) { - /* Expired descriptor */ - ps->fdset[i].revents = POLLHUP; - pevents[rv] = ACR_OP_TIMEOUT; - (*env)->SetObjectArrayElement(env, rs, rv++, ps->ooset[i].obj); - } - } - } - } - RELEASE_CRITICAL(revents, pevents); - if (autocancel == JNI_TRUE && rv > 0) { - /* Remove all descriptors with revents set except - * the wakeup pipe at index zero. - */ - for (i = 1; i < ps->used; i++) { - if (ps->fdset[i].revents != 0) { - int dest = i; - int used = ps->used; - ps->used--; - /* Unref descriptor */ - (*env)->DeleteGlobalRef(env, ps->ooset[i].obj); - for (++i; i < used; i++) { - if (ps->fdset[i].revents != 0) { - /* Skip signaled descriptor */ - (*env)->DeleteGlobalRef(env, ps->ooset[i].obj); - ps->used--; - } - else { - ps->fdset[dest] = ps->fdset[i]; - ps->ooset[dest] = ps->ooset[i]; - dest++; - } - } - } - } - } - ps->state = 0; - pthread_cond_broadcast(&ps->wakeup); - pthread_mutex_unlock(&ps->mutex); - return rv; -} - -ACR_UNX_EXPORT(jint, SocketSelectorImpl, add0)(JNI_STDARGS, jlong pollset, jobject fo, - jint f, jint events, jint ttlms) -{ - int i, rc = 0; - acr_pollset_t *ps = J2P(pollset, acr_pollset_t *); - - pthread_mutex_lock(&ps->mutex); - while (ps->state != 0) { - if (ps->state == PSS_DESTROY) { - rc = 0; - goto cleanup; - } - if (ps->state == PSS_POLL) { - char ch = 1; - ps->state = PSS_WAKEUP; - r_write(ps->wpipe[1], &ch, 1); - } - if ((rc = pthread_cond_wait(&ps->wakeup, &ps->mutex)) != 0) - goto cleanup; - } - if (ps->used == ps->size) { - /* Overflow - */ - rc = ACR_EOVERFLOW; - goto cleanup; - } - for (i = 1; i < ps->used; i++) { - if (ps->fdset[i].fd == f) { - /* Duplicate descriptor - */ - rc = ACR_EALREADY; - goto cleanup; - } - } - ps->fdset[ps->used].fd = f; - ps->fdset[ps->used].events = ieventt(events); - ps->fdset[ps->used].revents = 0; - ps->ooset[ps->used].obj = (*env)->NewGlobalRef(env, fo); - if (ps->ooset[ps->used].obj == 0) { - /* In case the NewGlobalRef fails, - * OutOfMemoryError should be thrown already by the JVM. - */ - rc = ACR_ENOMEM; - goto cleanup; - } - if (ttlms > 0) { - ps->ooset[ps->used].ttl = AcrTimeFromMsec(ttlms); - ps->ooset[ps->used].exp = AcrTimeNow() + ps->ooset[ps->used].ttl; - } - else { - ps->ooset[ps->used].ttl = ACR_INFINITE; - ps->ooset[ps->used].exp = ACR_INFINITE; - } - ps->used++; -cleanup: - pthread_mutex_unlock(&ps->mutex); - return rc; -} - -ACR_UNX_EXPORT(jint, SocketSelectorImpl, del0)(JNI_STDARGS, jlong pollset, - jobject fo, jint f) -{ - int i, rc = ACR_EOF; - acr_pollset_t *ps = J2P(pollset, acr_pollset_t *); - - pthread_mutex_lock(&ps->mutex); - while (ps->state != 0) { - if (ps->state == PSS_DESTROY) { - rc = 0; - goto cleanup; - } - if (ps->state == PSS_POLL) { - char ch = 1; - ps->state = PSS_WAKEUP; - r_write(ps->wpipe[1], &ch, 1); - } - if ((rc = pthread_cond_wait(&ps->wakeup, &ps->mutex)) != 0) - goto cleanup; - } - - for (i = 1; i < ps->used; i++) { - if ((*env)->IsSameObject(env, ps->ooset[i].obj, fo) == JNI_TRUE) { - int dest = i; - int used = ps->used; - ps->used--; - /* Unref descriptor */ - (*env)->DeleteGlobalRef(env, ps->ooset[i].obj); - for (++i; i < used; i++) { - ps->fdset[dest] = ps->fdset[i]; - ps->ooset[dest] = ps->ooset[i]; - dest++; - } - rc = 0; - } - } - -cleanup: - pthread_mutex_unlock(&ps->mutex); - return rc; -} - -#endif /* POLLSET_USE_POLL */ - ACR_NET_EXPORT(jint, Poll, wait0)(JNI_STDARGS, jintArray fdset, jshortArray events, jshortArray revents, jint nevents, jint timeout) Added: commons/sandbox/runtime/trunk/src/main/native/os/unix/selector.c URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/unix/selector.c?rev=1129273&view=auto ============================================================================== --- commons/sandbox/runtime/trunk/src/main/native/os/unix/selector.c (added) +++ commons/sandbox/runtime/trunk/src/main/native/os/unix/selector.c Mon May 30 17:49:32 2011 @@ -0,0 +1,525 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "acr/clazz.h" +#include "acr/memory.h" +#include "acr/jniapi.h" +#include "acr/port.h" +#include "acr/time.h" +#include "acr/iodefs.h" +#include "acr/misc.h" +#include "arch_opts.h" +#include +#if HAVE_SYS_RESOURCE_H +# include +#endif + +/* pollset operation states */ +#define PSS_DESTROY 1 +#define PSS_POLL 2 +#define PSS_WAIT 3 +#define PSS_WAKEUP 4 + +typedef struct acr_pollfd_t { + jobject obj; + acr_time_t ttl; + acr_time_t exp; +} acr_pollfd_t; + +typedef struct acr_pollset_t { + struct pollfd *fdset; + acr_pollfd_t *ooset; + int used; + int size; + volatile int state; + int wpipe[2]; + pthread_mutex_t mutex; + pthread_cond_t wakeup; +} acr_pollset_t; + +static short ieventt(int event) +{ + short rv = 0; + + if (event & ACR_OP_INP) + rv |= POLLIN; + if (event & ACR_OP_PRI) + rv |= POLLPRI; + if (event & ACR_OP_WRITE) + rv |= POLLOUT; + /* POLLERR, POLLHUP, and POLLNVAL aren't valid as requested events + */ + return rv; +} + +static short reventt(short event) +{ + short rv = 0; + + if (event & POLLIN) + rv |= ACR_OP_INP; + if (event & POLLPRI) + rv |= ACR_OP_PRI; + if (event & POLLOUT) + rv |= ACR_OP_WRITE; + if (event & POLLERR) + rv |= ACR_OP_ERROR; + if (event & POLLHUP) + rv |= ACR_OP_HANGUP; +#if defined(POLLRDHUP) + if (event & POLLRDHUP) + rv |= ACR_OP_RDHUP; +#endif + if (event & POLLNVAL) + rv |= ACR_OP_NVAL; + return rv; +} + +ACR_NET_EXPORT(jint, Selector, init0)(JNI_STDARGS) +{ + return PS_DEFAULT_TYPE; +} + +ACR_NET_EXPORT(jint, Selector, init1)(JNI_STDARGS) +{ + int nm = 65536; +#if HAVE_SYS_RESOURCE_H + struct rlimit rl; + + if (getrlimit(RLIMIT_NOFILE, &rl) == 0 && rl.rlim_max != RLIM_INFINITY) + nm = (int)rl.rlim_max; + else +#endif + nm = (int)sysconf(_SC_OPEN_MAX); + if (nm > 1) + --nm; + else + nm = 1023; + return nm; +} + +ACR_NET_EXPORT(jlong, PosixSelector, create0)(JNI_STDARGS, jint size) +{ + int rc; + acr_pollset_t *ps; + + ps = ACR_TALLOC(acr_pollset_t); + if (ps == 0) + return 0; + ps->wpipe[0] = -1; + ps->wpipe[1] = -1; + ps->size = size + 1; + ps->used = 1; + + ps->fdset = ACR_MALLOC(struct pollfd, ps->size); + if (ps->fdset == 0) + return 0; + ps->ooset = ACR_MALLOC(acr_pollfd_t, ps->size); + if (ps->fdset == 0) { + AcrFree(ps->fdset); + return 0; + } + if ((rc = AcrPipePair(ps->wpipe, 0)) != 0) { + ACR_THROW_NET_ERROR(rc); + goto cleanup; + } + /* Add the wakeup pipe to the pset + */ + ps->fdset[0].fd = ps->wpipe[0]; + ps->fdset[0].events = POLLIN; + ps->fdset[0].revents = 0; + ps->ooset[0].obj = 0; + ps->ooset[0].ttl = ACR_INFINITE; + ps->ooset[0].exp = ACR_INFINITE; + + if (pthread_cond_init(&ps->wakeup, 0) != 0) { + ACR_THROW_NET_ERRNO(); + goto cleanup; + } + if (pthread_mutex_init(&ps->mutex, 0) != 0) { + ACR_THROW_NET_ERRNO(); + pthread_cond_destroy(&ps->wakeup); + goto cleanup; + } + return P2J(ps); + +cleanup: + AcrFree(ps->fdset); + AcrFree(ps->ooset); + AcrFree(ps); + return 0; +} + +ACR_NET_EXPORT(int, PosixSelector, destroy0)(JNI_STDARGS, jlong pollset) +{ + int i; + int rc = 0; + acr_pollset_t *ps = J2P(pollset, acr_pollset_t *); + + pthread_mutex_lock(&ps->mutex); + if (ps->state != 0) { + int state = ps->state; + ps->state = PSS_DESTROY; + if (state == PSS_POLL) { + char ch = 1; + r_write(ps->wpipe[1], &ch, 1); + } + /* Wait until the wait0 call breaks. + * Since we set the state to DESTROY + * wait0 will return 0. + */ + if ((rc = pthread_cond_wait(&ps->wakeup, &ps->mutex)) != 0) { + pthread_mutex_unlock(&ps->mutex); + return rc; + } + } + ps->state = PSS_DESTROY; + for (i = 1; i < ps->used; i++) { + /* Invalidate the container. */ + (*env)->DeleteGlobalRef(env, ps->ooset[i].obj); + } + ps->used = 0; + pthread_mutex_unlock(&ps->mutex); + r_close(ps->wpipe[0]); + r_close(ps->wpipe[1]); + pthread_cond_destroy(&ps->wakeup); + pthread_mutex_destroy(&ps->mutex); + AcrFree(ps->fdset); + AcrFree(ps->ooset); + AcrFree(ps); + return rc; +} + +ACR_NET_EXPORT(jint, PosixSelector, clear0)(JNI_STDARGS, jlong pollset, + jobjectArray rs) +{ + int i; + int cnt = 0; + acr_pollset_t *ps = J2P(pollset, acr_pollset_t *); + + pthread_mutex_lock(&ps->mutex); + while (ps->state != 0) { + if (ps->state == PSS_DESTROY) { + /* Interrupted by destroy0 */ + pthread_mutex_unlock(&ps->mutex); + return 0; + } + if (ps->state == PSS_POLL) { + char ch = 1; + ps->state = PSS_WAKEUP; + r_write(ps->wpipe[1], &ch, 1); + } + /* Wait until the wait0 call breaks. + * Since we set the state to DESTROY + * wait0 will return 0. + */ + if (pthread_cond_wait(&ps->wakeup, &ps->mutex) != 0) { + pthread_mutex_unlock(&ps->mutex); + ACR_THROW(ACR_EX_EILLEGAL, 0); + return 0; + } + } + for (i = 1; i < ps->used; i++) { + (*env)->SetObjectArrayElement(env, rs, cnt++, ps->ooset[i].obj); + /* Unref the container. */ + (*env)->DeleteGlobalRef(env, ps->ooset[i].obj); + } + ps->used = 1; + pthread_mutex_unlock(&ps->mutex); + return cnt; +} + +ACR_NET_EXPORT(void, PosixSelector, wakeup0)(JNI_STDARGS, jlong pollset) +{ + acr_pollset_t *ps = J2P(pollset, acr_pollset_t *); + + pthread_mutex_lock(&ps->mutex); + if (ps->state == PSS_POLL) { + char ch = 1; + ps->state = PSS_WAKEUP; + r_write(ps->wpipe[1], &ch, 1); + } + pthread_mutex_unlock(&ps->mutex); +} + +ACR_NET_EXPORT(jint, PosixSelector, size0)(JNI_STDARGS, jlong pollset) +{ + int rv; + acr_pollset_t *ps = J2P(pollset, acr_pollset_t *); + + pthread_mutex_lock(&ps->mutex); + rv = ps->used - 1; + pthread_mutex_unlock(&ps->mutex); + return rv; +} + +ACR_NET_EXPORT(jint, PosixSelector, wait0)(JNI_STDARGS, jlong pollset, + jobjectArray rs, jshortArray revents, + jint timeout, jboolean autocancel) +{ + int i, ns, rc = 0; + int rv = 0; + jshort *pevents; + acr_time_t now = 0; + acr_time_t tmx = 0; + acr_pollset_t *ps = J2P(pollset, acr_pollset_t *); + + pthread_mutex_lock(&ps->mutex); + if (ps->state != 0) { + /* Note that this should never happen if api is correctly used. + * wait cannot be run from multiple threads and cannot be run + * after destroy. + */ + pthread_mutex_unlock(&ps->mutex); + ACR_THROW(ACR_EX_EILLEGAL, 0); + return 0; + } + if (ps->used == 1) { + /* We only have the wakeup pipe in the pollset + * so there is no point to wait. + */ + pthread_mutex_unlock(&ps->mutex); + return 0; + } + + ps->state = PSS_POLL; + pthread_mutex_unlock(&ps->mutex); + if (timeout > 0) + tmx = AcrTimeMilliseconds() + timeout; + for (;;) { + ns = poll(ps->fdset, ps->used, timeout); + if (ns == -1 && errno == EINTR) { + if (timeout >= 0) { + timeout = tmx - AcrTimeMilliseconds(); + if (timeout <= 0) { + ns = 0; + break; + } + } + } + else + break; + } + + if (ns == -1) + rc = ACR_GET_OS_ERROR(); + pthread_mutex_lock(&ps->mutex); + if (ps->state == PSS_DESTROY) { + /* Interrupted by destroy0 */ + pthread_cond_broadcast(&ps->wakeup); + pthread_mutex_unlock(&ps->mutex); + return 0; + } + if (rc != 0) { + /* Error during poll */ + ps->state = 0; + pthread_cond_broadcast(&ps->wakeup); + pthread_mutex_unlock(&ps->mutex); + ACR_THROW_NET_ERROR(rc); + return 0; + } + if (ns == 0) { + /* Timeout occured */ + ps->state = 0; + pthread_cond_broadcast(&ps->wakeup); + pthread_mutex_unlock(&ps->mutex); + return 0; + } + if (ps->state == PSS_WAKEUP) { + /* Interrupted by wakeup0 */ + if (ps->fdset[0].revents != 0) { + /* Drain the wakeup pipe. + * Wakeup pipe is always at index zero. + */ + AcrDrainPipe(ps->wpipe[0]); + } + ps->state = 0; + pthread_cond_broadcast(&ps->wakeup); + pthread_mutex_unlock(&ps->mutex); + return 0; + } + ps->state = PSS_WAIT; + pevents = JARRAY_CRITICAL(jshort, revents); + /* Cycle trough the descriptors */ + for (i = 0; i < ps->used; i++) { + if (ps->fdset[i].revents != 0) { + if (i == 0) { + /* Drain the wakeup pipe. + * Wakeup pipe is always at index zero. + */ + AcrDrainPipe(ps->wpipe[0]); + continue; + } + else { + pevents[rv] = reventt(ps->fdset[i].revents); + (*env)->SetObjectArrayElement(env, rs, rv++, ps->ooset[i].obj); + if (ps->ooset[i].ttl > 0) { + /* Reset TTL + */ + if (now == 0) + now = AcrTimeNow(); + ps->ooset[i].exp = now + ps->ooset[i].ttl; + } + } + } + else { + /* Check for the expired descriptors. + */ + if (ps->ooset[i].ttl > 0) { + if (now == 0) + now = AcrTimeNow(); + if (ps->ooset[i].exp > now) { + /* Expired descriptor */ + ps->fdset[i].revents = POLLHUP; + pevents[rv] = ACR_OP_TIMEOUT; + (*env)->SetObjectArrayElement(env, rs, rv++, ps->ooset[i].obj); + } + } + } + } + RELEASE_CRITICAL(revents, pevents); + if (autocancel == JNI_TRUE && rv > 0) { + /* Remove all descriptors with revents set except + * the wakeup pipe at index zero. + */ + for (i = 1; i < ps->used; i++) { + if (ps->fdset[i].revents != 0) { + int dest = i; + int used = ps->used; + ps->used--; + /* Unref descriptor */ + (*env)->DeleteGlobalRef(env, ps->ooset[i].obj); + for (++i; i < used; i++) { + if (ps->fdset[i].revents != 0) { + /* Skip signaled descriptor */ + (*env)->DeleteGlobalRef(env, ps->ooset[i].obj); + ps->used--; + } + else { + ps->fdset[dest] = ps->fdset[i]; + ps->ooset[dest] = ps->ooset[i]; + dest++; + } + } + } + } + } + ps->state = 0; + pthread_cond_broadcast(&ps->wakeup); + pthread_mutex_unlock(&ps->mutex); + return rv; +} + +ACR_NET_EXPORT(jint, PosixSelector, add0)(JNI_STDARGS, jlong pollset, jobject fo, + jint f, jint events, jint ttlms) +{ + int i, rc = 0; + acr_pollset_t *ps = J2P(pollset, acr_pollset_t *); + + pthread_mutex_lock(&ps->mutex); + while (ps->state != 0) { + if (ps->state == PSS_DESTROY) { + rc = 0; + goto cleanup; + } + if (ps->state == PSS_POLL) { + char ch = 1; + ps->state = PSS_WAKEUP; + r_write(ps->wpipe[1], &ch, 1); + } + if ((rc = pthread_cond_wait(&ps->wakeup, &ps->mutex)) != 0) + goto cleanup; + } + if (ps->used == ps->size) { + /* Overflow + */ + rc = ACR_EOVERFLOW; + goto cleanup; + } + for (i = 1; i < ps->used; i++) { + if (ps->fdset[i].fd == f) { + /* Duplicate descriptor + */ + rc = ACR_EALREADY; + goto cleanup; + } + } + ps->fdset[ps->used].fd = f; + ps->fdset[ps->used].events = ieventt(events); + ps->fdset[ps->used].revents = 0; + ps->ooset[ps->used].obj = (*env)->NewGlobalRef(env, fo); + if (ps->ooset[ps->used].obj == 0) { + /* In case the NewGlobalRef fails, + * OutOfMemoryError should be thrown already by the JVM. + */ + rc = ACR_ENOMEM; + goto cleanup; + } + if (ttlms > 0) { + ps->ooset[ps->used].ttl = AcrTimeFromMsec(ttlms); + ps->ooset[ps->used].exp = AcrTimeNow() + ps->ooset[ps->used].ttl; + } + else { + ps->ooset[ps->used].ttl = ACR_INFINITE; + ps->ooset[ps->used].exp = ACR_INFINITE; + } + ps->used++; +cleanup: + pthread_mutex_unlock(&ps->mutex); + return rc; +} + +ACR_NET_EXPORT(jint, PosixSelector, del0)(JNI_STDARGS, jlong pollset, + jobject fo, jint f) +{ + int i, rc = ACR_EOF; + acr_pollset_t *ps = J2P(pollset, acr_pollset_t *); + + pthread_mutex_lock(&ps->mutex); + while (ps->state != 0) { + if (ps->state == PSS_DESTROY) { + rc = 0; + goto cleanup; + } + if (ps->state == PSS_POLL) { + char ch = 1; + ps->state = PSS_WAKEUP; + r_write(ps->wpipe[1], &ch, 1); + } + if ((rc = pthread_cond_wait(&ps->wakeup, &ps->mutex)) != 0) + goto cleanup; + } + + for (i = 1; i < ps->used; i++) { + if ((*env)->IsSameObject(env, ps->ooset[i].obj, fo) == JNI_TRUE) { + int dest = i; + int used = ps->used; + ps->used--; + /* Unref descriptor */ + (*env)->DeleteGlobalRef(env, ps->ooset[i].obj); + for (++i; i < used; i++) { + ps->fdset[dest] = ps->fdset[i]; + ps->ooset[dest] = ps->ooset[i]; + dest++; + } + rc = 0; + } + } + +cleanup: + pthread_mutex_unlock(&ps->mutex); + return rc; +} Propchange: commons/sandbox/runtime/trunk/src/main/native/os/unix/selector.c ------------------------------------------------------------------------------ svn:eol-style = native Modified: commons/sandbox/runtime/trunk/src/main/test/org/apache/commons/runtime/TestSelectionKey.java URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/test/org/apache/commons/runtime/TestSelectionKey.java?rev=1129273&r1=1129272&r2=1129273&view=diff ============================================================================== --- commons/sandbox/runtime/trunk/src/main/test/org/apache/commons/runtime/TestSelectionKey.java (original) +++ commons/sandbox/runtime/trunk/src/main/test/org/apache/commons/runtime/TestSelectionKey.java Mon May 30 17:49:32 2011 @@ -31,7 +31,7 @@ public class TestSelectionKey extends As // Create socket bound to the first free port // SocketEndpoint ep = new SocketEndpoint(); - Selector ss = SocketSelectorFactory.newInstance(); + Selector ss = Selector.newInstance(); assertNotNull(ss); System.out.println("SocketSelector capacity=" + ss.capacity()); SelectionKey skey = ep.key(ss);