commons-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mt...@apache.org
Subject svn commit: r1154855 [1/2] - in /commons/sandbox/runtime/trunk/src/main: java/org/apache/commons/runtime/net/ native/ native/include/acr/ native/os/win32/ native/shared/
Date Mon, 08 Aug 2011 06:57:16 GMT
Author: mturk
Date: Mon Aug  8 06:57:15 2011
New Revision: 1154855

URL: http://svn.apache.org/viewvc?rev=1154855&view=rev
Log:
Add java part of IPC endpoint

Added:
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcDescriptor.java   (with props)
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcEndpoint.java   (with props)
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcEndpointAddress.java   (with props)
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcServerEndpoint.java   (with props)
    commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcStream.java   (with props)
Modified:
    commons/sandbox/runtime/trunk/src/main/native/configure.bat
    commons/sandbox/runtime/trunk/src/main/native/include/acr/descriptor.h
    commons/sandbox/runtime/trunk/src/main/native/include/acr/netdefs.h
    commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_ipcs.h
    commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.c
    commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsstream.c
    commons/sandbox/runtime/trunk/src/main/native/shared/netaddr.c

Added: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcDescriptor.java
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcDescriptor.java?rev=1154855&view=auto
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcDescriptor.java (added)
+++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcDescriptor.java Mon Aug  8 06:57:15 2011
@@ -0,0 +1,228 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.commons.runtime.net;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.io.SyncFailedException;
+import java.net.SocketException;
+import org.apache.commons.runtime.Status;
+import org.apache.commons.runtime.io.ClosedDescriptorException;
+import org.apache.commons.runtime.io.InvalidDescriptorException;
+import org.apache.commons.runtime.io.Descriptor;
+
+/**
+ * Package private Socket Descriptor
+ * @since Runtime 1.0
+ */
+final class IpcDescriptor extends Descriptor
+{
+
+    private static native int     close0(long fd);
+    private static native long    create0(int flags)
+        throws IOException;
+    private static native long    socket0(int type, boolean blocking)
+        throws IOException;
+    private static native int     flush0(long fd);
+    private static native int     sync0(long fd);
+    private static native int     block0(long fd, boolean block);
+    private static native boolean optget0(long fd, int opt)
+        throws IOException;
+    private static native int     optset0(long fd, int opt, int val);
+    private static native int     optset1(long fd, int opt, boolean val);
+    private static native int     optset2(long fd, int opt, boolean val);
+    private static native int     shutdown0(long fd, int how);
+    private static native int     getlocal0(long fd, byte[] sa);
+    private static native int     getremote0(long fd, byte[] sa);
+
+    public IpcDescriptor()
+    {
+    }
+
+    public IpcDescriptor(long fd)
+    {
+        this.fd = fd;
+        closed  = false;
+    }
+
+    public void create(AddressFamily af, SocketType type)
+        throws IOException
+    {
+        create(af, type, true);
+    }
+
+    public void create(AddressFamily af, SocketType type, boolean blocking)
+        throws IOException
+    {
+        if (valid())
+            close0(fd);
+        fd = socket0(type.valueOf(), blocking);
+        closed = false;
+    }
+
+    public void create(SocketType type)
+        throws IOException
+    {
+        create(type, false);
+    }
+
+    public void create(SocketType type, boolean blocking)
+        throws IOException
+    {
+        this.fd = create0(0);
+        closed  = false;
+    }
+
+    public IpcDescriptor configureBlocking(boolean block)
+        throws IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        int rc = block0(fd, block);
+        if (rc != 0)
+            throw new IOException(Status.describe(rc));
+        return this;
+    }
+
+    @Override
+    public void close()
+        throws IOException
+    {
+        // Closing an empty socket is not an error
+        if (valid()) {
+            closed = true;
+            int rc = close0(fd);
+            fd     = 0L;
+            if (rc != 0)
+                throw new SocketException(Status.describe(rc));
+        }
+    }
+
+    @Override
+    public void sync()
+        throws SyncFailedException, IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        int rc = sync0(fd);
+        if (rc != 0)
+            throw new SocketException(Status.describe(rc));
+    }
+
+    @Override
+    public void flush()
+        throws SyncFailedException, IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        int rc = flush0(fd);
+        if (rc != 0)
+            throw new SocketException(Status.describe(rc));
+    }
+
+    public byte[] getLocalAddress()
+        throws IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        byte[] sa = Address.alloc();
+        int rc = getlocal0(fd, sa);
+        if (rc != 0)
+            throw new SocketException(Status.describe(rc));
+        return sa;
+    }
+
+    public byte[] getRemoteAddress()
+        throws IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        byte[] sa = Address.alloc();
+        int rc = getremote0(fd, sa);
+        if (rc != 0)
+            throw new SocketException(Status.describe(rc));
+        return sa;
+    }
+
+    public void setTimeout(int timeout)
+        throws IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        int rc = optset0(fd, 9 /* SocketOption.TIMEOUT.valueOf() */, timeout);
+        if (rc != 0)
+            throw new SocketException(Status.describe(rc));
+    }
+
+    public void shutdown(ShutdownHow how)
+        throws IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        int rc = shutdown0(fd, how.valueOf());
+        if (rc != 0)
+            throw new SocketException(Status.describe(rc));
+    }
+
+    public void setOption(SocketOption key, int val)
+        throws IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        int rc = optset0(fd, key.valueOf(), val);
+        if (rc != 0)
+            throw new SocketException(Status.describe(rc));
+    }
+
+    public void setOption(SocketOption key, boolean val)
+        throws IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        int rc = optset1(fd, key.valueOf(), val);
+        if (rc != 0)
+            throw new SocketException(Status.describe(rc));
+    }
+
+    public void setOption(TcpOption key, boolean val)
+        throws IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        int rc = optset2(fd, key.valueOf(), val);
+        if (rc != 0)
+            throw new SocketException(Status.describe(rc));
+    }
+
+    public boolean hasOption(SocketOption key)
+        throws IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        return optget0(fd, key.valueOf());
+    }
+
+    public boolean hasOption(TcpOption key)
+        throws IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        return optget0(fd, key.valueOf());
+    }
+
+}

Propchange: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcDescriptor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcEndpoint.java
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcEndpoint.java?rev=1154855&view=auto
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcEndpoint.java (added)
+++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcEndpoint.java Mon Aug  8 06:57:15 2011
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.commons.runtime.net;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.io.SyncFailedException;
+import java.net.SocketException;
+import org.apache.commons.runtime.io.ClosedDescriptorException;
+import org.apache.commons.runtime.io.Descriptor;
+import org.apache.commons.runtime.io.OperationInProgressException;
+import org.apache.commons.runtime.io.OperationWouldBlockException;
+import org.apache.commons.runtime.io.Stream;
+import org.apache.commons.runtime.Errno;
+import org.apache.commons.runtime.Status;
+import org.apache.commons.runtime.OverflowException;
+import org.apache.commons.runtime.TimeoutException;
+
+/**
+ * This class represents a local endpoint.
+ * <p>
+ * Local socket represents either unix domain socket or a windows pipe,
+ * depending on the operating system.
+ * </p>
+ */
+public class IpcEndpoint extends Connection
+{
+    private final IpcDescriptor         sd;
+    private EndpointAddress             ea;
+    private SelectionKeyImpl            key;
+    private boolean                     connected = false;
+    private IpcStream                   stream = null;
+    private static native int           connect0(long fd, byte[] sa, int timeout);
+
+    /**
+     * Creates a new unconnected socket object.
+     */
+    public IpcEndpoint()
+    {
+        super(EndpointType.LOCAL);
+        this.sd = new IpcDescriptor();
+    }
+
+    /**
+     * Creates a new connected local endpoint from a local descriptor
+     * and address.
+     */
+    public IpcEndpoint(Descriptor sd, EndpointAddress ea)
+    {
+        super(EndpointType.LOCAL);
+        if (sd == null || ea == null)
+            throw new NullPointerException();
+        this.sd   = (IpcDescriptor)sd;
+        this.ea   = ea;
+        connected = true;
+    }
+
+    @Override
+    public void connect(EndpointAddress endpoint, int timeout)
+        throws IOException
+    {
+        if (connected)
+            throw new IOException(Local.sm.get("endpoint.ECONNECTED"));
+        if (sd.closed())
+            sd.create(SocketType.STREAM);
+        int rc = connect0(sd.fd(), endpoint.sockaddr(), timeout);
+        if (rc != 0) {
+            // XXX: Should exception throwing go inside native?
+            if (Status.IS_TIMEUP(rc))
+                throw new TimeoutException();
+            else if (Status.IS_EAGAIN(rc))
+                throw new OperationWouldBlockException();
+            else if (Status.IS_EINPROGRESS(rc))
+                throw new OperationInProgressException();
+            else
+                throw new IOException(Status.describe(rc));
+        }
+        ea = endpoint;
+        connected = true;
+    }
+
+    @Override
+    public Descriptor descriptor()
+    {
+        return sd;
+    }
+
+    @Override
+    public boolean isBlocking()
+        throws IOException
+    {
+        return sd.hasOption(SocketOption.NONBLOCK) == false;
+    }
+
+    @Override
+    public Descriptor configureBlocking(boolean block)
+        throws ClosedDescriptorException,
+               IllegalBlockingModeException,
+               IOException
+    {
+        if (key != null && key.selected)
+            throw new IllegalBlockingModeException();
+        return sd.configureBlocking(block);
+    }
+
+
+    @Override
+    public synchronized void close()
+        throws IOException
+    {
+
+        connected = false;
+        if (sd.valid()) {
+            if (key != null && key.selected) {
+                try {
+                    key.cancel();
+                } catch (Exception e) {
+                    // Ignore selector exceptions
+                }
+            }
+            sd.close();
+        }
+        if (key != null) {
+            key.reset();
+            key = null;
+        }
+    }
+
+    @Override
+    public SelectionKey keyFor(Selector selector)
+    {
+        if (key != null && key.selector == selector && key.ievents != 0)
+            return key;
+        else
+            return null;
+    }
+
+    @Override
+    public SelectionKey register(Selector selector, int ops, Object att)
+        throws ClosedSelectorException,
+               IllegalSelectorException,
+               ClosedDescriptorException,
+               OverflowException,
+               IOException
+    {
+        if (sd.closed())
+            throw new ClosedDescriptorException();
+        AbstractSelector sel = (AbstractSelector)selector;
+        //
+        // If key.ievents is zero, this means that the
+        // key was invalidated by Selector.close().
+        // In that case create a new key instance.
+        if (key == null || key.ievents == 0)
+            key = new SelectionKeyImpl(sel, this);
+        if (key.selector != sel)
+            throw new IllegalSelectorException();
+        if (att != null)
+            key.attach(att);
+        return key.queue(ops & 0x000f);
+    }
+
+    @Override
+    public synchronized void setTimeout(int timeout)
+        throws IOException
+    {
+        sd.setTimeout(timeout);
+    }
+
+    @Override
+    public synchronized void shutdown(ShutdownHow how)
+        throws IOException
+    {
+        sd.shutdown(how);
+        if (how == ShutdownHow.RDWR)
+            connected  = false;
+    }
+
+    @Override
+    public synchronized final boolean closed()
+    {
+        return sd.closed();
+    }
+
+    @Override
+    public final Stream getStream()
+        throws IOException
+    {
+        if (connected) {
+            if (stream != null)
+                return stream;
+            stream = new IpcStream(sd);
+            return stream;
+        }
+        else {
+            /* We only provide bidirectional streams
+             * unlike Java Socket api which has input and
+             * output streams separated.
+             */
+            throw new IOException(Local.sm.get("endpoint.SHUTRDWR"));
+        }
+    }
+
+    @Override
+    public synchronized boolean connected()
+    {
+        return connected;
+    }
+
+}

Propchange: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcEndpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcEndpointAddress.java
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcEndpointAddress.java?rev=1154855&view=auto
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcEndpointAddress.java (added)
+++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcEndpointAddress.java Mon Aug  8 06:57:15 2011
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.commons.runtime.net;
+
+import java.io.File;
+import java.net.UnknownHostException;
+import org.apache.commons.runtime.Errno;
+import org.apache.commons.runtime.Status;
+import org.apache.commons.runtime.InvalidArgumentException;
+
+/**
+ * This class represents a local Endpoint address described by a name.
+ * <p>
+ * Local endpoint depends on the Operating System and is either
+ * Unix domain socket or a Windows pipe.
+ * </p>
+ */
+public final class IpcEndpointAddress extends EndpointAddress
+{
+
+    private byte[] sa;
+    private native byte[] sockaddr0(String name)
+        throws OutOfMemoryError, NetworkException;
+
+    private IpcEndpointAddress()
+    {
+        // No instance
+    }
+
+    /**
+     * Create a new local socket adrress.
+     */
+    public IpcEndpointAddress(String name)
+        throws NetworkException, InvalidArgumentException
+    {
+        super(AddressFamily.LOCAL);
+        if (name == null || name.length() == 0)
+            throw new InvalidArgumentException();
+        super.sa = sockaddr0("Global\\" + name);
+    }
+
+    /**
+     * Create a new local socket adrress.
+     */
+    public IpcEndpointAddress(File path)
+        throws NetworkException, InvalidArgumentException
+    {
+        super(AddressFamily.LOCAL);
+        String name = "Global\\" + path.getPath();
+        if (name == null || name.length() == 0)
+            throw new InvalidArgumentException();
+        super.sa = sockaddr0(name);
+    }
+
+}

Propchange: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcEndpointAddress.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcServerEndpoint.java
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcServerEndpoint.java?rev=1154855&view=auto
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcServerEndpoint.java (added)
+++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcServerEndpoint.java Mon Aug  8 06:57:15 2011
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.commons.runtime.net;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.io.SyncFailedException;
+import java.net.SocketException;
+import org.apache.commons.runtime.io.ClosedDescriptorException;
+import org.apache.commons.runtime.io.Descriptor;
+import org.apache.commons.runtime.io.OperationInProgressException;
+import org.apache.commons.runtime.io.OperationWouldBlockException;
+import org.apache.commons.runtime.OverflowException;
+import org.apache.commons.runtime.Status;
+
+/**
+ * This class represents a local server endpoint.
+ * <p>
+ * Local socket represents either unix domain socket or a windows pipe,
+ * depending on the operating system.
+ * </p>
+ */
+public class IpcServerEndpoint extends ServerEndpoint<IpcEndpoint>
+{
+    private static final int            LISTEN_BACKLOG =  50;
+    private final IpcDescriptor         sd;
+    private SelectionKeyImpl            key;
+    private EndpointAddress             sa;
+    private boolean                     bound    = false;
+    private boolean                     blocking = true;
+    private final Object                mutex    = new Object();
+
+    /**
+     * Creates a new unbound local server endpoint.
+     */
+    public IpcServerEndpoint()
+    {
+        super(EndpointType.LOCAL);
+        this.sd = new IpcDescriptor();
+    }
+
+    /**
+     * Creates a new local server endpoint from the
+     * given socket descriptor.
+     */
+    public IpcServerEndpoint(Descriptor sd)
+    {
+        super(EndpointType.LOCAL);
+        if (sd == null)
+            throw new NullPointerException();
+        this.sd = (IpcDescriptor)sd;
+    }
+
+    @Override
+    public Descriptor descriptor()
+    {
+        return sd;
+    }
+
+    @Override
+    public boolean isBlocking()
+        throws IOException
+    {
+        return blocking;
+    }
+
+    @Override
+    public Descriptor configureBlocking(boolean block)
+        throws IllegalBlockingModeException,
+               IOException
+    {
+        if (key != null && key.selected)
+            throw new IllegalBlockingModeException();
+        // Blocking mode must be set before the
+        // actual server endpoint is created.
+        if (sd.valid())
+            throw new IllegalBlockingModeException();
+        blocking = block;
+        return sd;
+    }
+
+
+    @Override
+    public void close()
+        throws IOException
+    {
+        synchronized(mutex) {
+            if (sd.valid()) {
+                if (key != null && key.selected) {
+                    try {
+                        key.cancel();
+                    } catch (Exception e) {
+                        // Ignore selector exceptions
+                    }
+                }
+                sd.close();
+            }
+            if (key != null) {
+                key.reset();
+                key = null;
+            }
+        }
+    }
+
+    @Override
+    public SelectionKey keyFor(Selector selector)
+    {
+        if (key != null && key.selector == selector && key.ievents != 0)
+            return key;
+        else
+            return null;
+    }
+
+    @Override
+    public SelectionKey register(Selector selector, int ops, Object att)
+        throws ClosedSelectorException,
+               IllegalSelectorException,
+               ClosedDescriptorException,
+               OverflowException,
+               IOException
+    {
+        ops = ops & 0x000f;
+        if (sd.closed())
+            throw new ClosedDescriptorException();
+        AbstractSelector sel = (AbstractSelector)selector;
+        //
+        // If key.ievents is zero, this means that the
+        // key was invalidated by Selector.close().
+        // In that case create a new key instance.
+        if (key == null || key.ievents == 0)
+            key = new SelectionKeyImpl(sel, this);
+        if (key.selector != sel)
+            throw new IllegalSelectorException();
+        if (att != null)
+            key.attach(att);
+        return key.queue(ops);
+    }
+
+    private static native int           bind0(long fd, byte[] sa, int backlog);
+    private static native long          accept0(long fd, int timeout, boolean block)
+        throws SocketException;
+
+    @Override
+    public synchronized void bind(EndpointAddress endpoint, int backlog)
+        throws IOException
+    {
+        if (bound)
+            throw new IOException(Local.sm.get("endpoint.EBOUND"));
+        if (sd.closed())
+            sd.create(SocketType.STREAM, blocking);
+        if (backlog == 0)
+            backlog = LISTEN_BACKLOG;
+        int rc = bind0(sd.fd(), endpoint.sockaddr(), backlog);
+        if (rc != 0) {
+            try {
+                sd.close();
+            } catch (Exception u) {
+                // Ingnore close Exceptions.
+            }
+            throw new IOException(Status.describe(rc));
+        }
+        bound = true;
+        sa    = endpoint;
+    }
+
+    @Override
+    public IpcEndpoint accept()
+        throws IOException
+    {
+        return accept(blocking);
+    }
+
+    @Override
+    public IpcEndpoint accept(boolean blocking)
+        throws IOException
+    {
+        if (sd.closed())
+            throw new ClosedDescriptorException();
+        long fd = accept0(sd.fd(), -1, blocking);
+        IpcDescriptor ad = new IpcDescriptor(fd);
+        return new IpcEndpoint(ad, sa);
+    }
+
+    @Override
+    public synchronized void setTimeout(int timeout)
+        throws IOException
+    {
+        sd.setTimeout(timeout);
+        blocking = sd.hasOption(SocketOption.NONBLOCK) == false;
+    }
+
+    @Override
+    public synchronized final boolean closed()
+    {
+        return sd.closed();
+    }
+
+}

Propchange: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcServerEndpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcStream.java
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcStream.java?rev=1154855&view=auto
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcStream.java (added)
+++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcStream.java Mon Aug  8 06:57:15 2011
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.commons.runtime.net;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.io.SyncFailedException;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import org.apache.commons.runtime.io.ClosedDescriptorException;
+import org.apache.commons.runtime.io.Descriptor;
+import org.apache.commons.runtime.io.OperationInProgressException;
+import org.apache.commons.runtime.io.OperationWouldBlockException;
+import org.apache.commons.runtime.io.Stream;
+import org.apache.commons.runtime.Errno;
+import org.apache.commons.runtime.Pointer;
+import org.apache.commons.runtime.Status;
+import org.apache.commons.runtime.OverflowException;
+import org.apache.commons.runtime.TimeoutException;
+
+/**
+ * Socket stream implementation.
+ */
+class IpcStream extends Stream
+{
+    /**
+     * Native socket descriptor pointer
+     */
+    private long                        nd;
+    private Descriptor                  sd;
+    private static native long          alloc0(long fd);
+    private static native void          close0(long nd);
+
+    private static native boolean       eof0(long nd);
+    private static native int           avail0(long nd);
+    // Native read methods
+    private static native int           read0(long nd)
+        throws IOException;
+    private static native int           read1(long nd, byte[] buf, int off, int len)
+        throws IOException;
+    private static native int           read2(long nd, long ptr, long off, int len)
+        throws IOException;
+    private static native int           read3(long nd, ByteBuffer buf, int off, int len)
+        throws IOException;
+    // Native write methods
+    private static native int           write0(long nd, int ch)
+        throws IOException;
+    private static native int           write1(long nd, byte[] buf, int off, int len)
+        throws IOException;
+    private static native int           write2(long nd, long ptr, long off, int len)
+        throws IOException;
+    private static native int           write3(long nd, ByteBuffer buf, int off, int len)
+        throws IOException;
+    private static native long          write4(long nd, byte[][] buf, int off, int len)
+        throws IOException;
+    private static native long          write5(long nd, ByteBuffer[] buf, int off, int len)
+        throws IOException;
+
+    private IpcStream()
+    {
+        // No instance
+    }
+
+    /**
+     * Create new Stream object from the given Descriptor.
+     * The Descripor's reference counter is incremented on
+     * each operation meaning that the object finalize() won't
+     * release the object if still used.
+     *
+     * @param fd native Descriptor pointer
+     */
+    public IpcStream(Descriptor sd)
+    {
+        this.sd = sd;
+        nd = alloc0(sd.fd());
+    }
+
+    @Override
+    public final void close()
+        throws IOException
+    {
+        // XXX: Use separate closeLock object?
+        //
+        if (nd != 0L) {
+            try {
+                close0(nd);
+                if (sd != null)
+                    sd.close();
+            }
+            finally {
+                nd = 0L;
+                sd = null;
+            }
+        }
+    }
+
+    @Override
+    public long skip(long count)
+        throws IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+
+        return 0L;
+    }
+
+    @Override
+    public boolean closed()
+    {
+        if (sd == null || nd == 0L)
+            return true;
+        else
+            return sd.closed();
+    }
+
+    @Override
+    public boolean valid()
+    {
+        if (sd != null && nd != 0L)
+            return sd.valid();
+        else
+            return false;
+    }
+
+    @Override
+    public void sync()
+        throws SyncFailedException, IOException
+    {
+        if (sd == null)
+            throw new ClosedDescriptorException();
+        sd.sync();
+    }
+
+    @Override
+    public void flush()
+        throws SyncFailedException, IOException
+    {
+        if (sd == null)
+            throw new ClosedDescriptorException();
+        sd.flush();
+    }
+
+    @Override
+    public boolean eof()
+        throws IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        return eof0(nd);
+    }
+
+    // === Reader methods
+
+    @Override
+    public int available()
+        throws IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        return avail0(nd);
+    }
+
+    @Override
+    public int read()
+        throws IOException
+    {
+
+        if (closed())
+            throw new ClosedDescriptorException();
+        return read0(nd);
+    }
+
+    @Override
+    public int read(byte[] buffer, int offset, int count)
+        throws IndexOutOfBoundsException, IOException
+    {
+
+        if (closed())
+            throw new ClosedDescriptorException();
+        return read1(nd, buffer, offset, count);
+    }
+
+    @Override
+    public int read(Pointer pointer, long offset, int count)
+        throws NullPointerException, IndexOutOfBoundsException, IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        return read2(nd, pointer.address(), offset, count);
+    }
+
+    @Override
+    public int read(ByteBuffer buffer, int offset, int count)
+        throws IndexOutOfBoundsException, IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        return read3(nd, buffer, offset, count);
+    }
+
+    // === Writer methods
+
+    @Override
+    public int write(int b)
+        throws IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        return write0(nd, b);
+    }
+
+    @Override
+    public int write(byte[] buffer, int offset, int count)
+        throws IndexOutOfBoundsException, IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        return write1(nd, buffer, offset, count);
+    }
+
+    @Override
+    public int write(Pointer pointer, long offset, int count)
+        throws IndexOutOfBoundsException, IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        return write2(nd, pointer.address(), offset, count);
+    }
+
+    @Override
+    public int write(ByteBuffer buffer, int offset, int count)
+        throws IndexOutOfBoundsException, IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        return write3(nd, buffer, offset, count);
+    }
+
+    @Override
+    public long write(byte[][] array, int offset, int count)
+        throws IndexOutOfBoundsException, IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        return write4(nd, array, offset, count);
+    }
+
+    @Override
+    public long write(ByteBuffer[] array, int offset, int count)
+        throws IndexOutOfBoundsException, IOException
+    {
+        if (closed())
+            throw new ClosedDescriptorException();
+        return write5(nd, array, offset, count);
+    }
+
+}

Propchange: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/IpcStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: commons/sandbox/runtime/trunk/src/main/native/configure.bat
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/configure.bat?rev=1154855&r1=1154854&r2=1154855&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/configure.bat (original)
+++ commons/sandbox/runtime/trunk/src/main/native/configure.bat Mon Aug  8 06:57:15 2011
@@ -160,7 +160,7 @@ if not exist "%WITH_JAVA%\include\jni.h"
 set "INCLUDES=%INCLUDES% -I"%WITH_JAVA%\include" -I"%WITH_JAVA%\include\win32""
 set "RCINCLS=%RCINCLS% /i "%WITH_JAVA%\include" /i "%WITH_JAVA%\include\win32""
 
-set "CCFLAGS=%CCFLAGS% -O2 -Ob2 -Zi -D_MT"
+set "CCFLAGS=%CCFLAGS% -W3 -O2 -Ob2 -Zi -D_MT"
 if %ENABLE_DEBUG% == 1 (
   set "CPPOPTS=-DDEBUG -D_DEBUG %CPPOPTS%"
   set "CCFLAGS=%CCFLAGS% -MDd"

Modified: commons/sandbox/runtime/trunk/src/main/native/include/acr/descriptor.h
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/include/acr/descriptor.h?rev=1154855&r1=1154854&r2=1154855&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/include/acr/descriptor.h (original)
+++ commons/sandbox/runtime/trunk/src/main/native/include/acr/descriptor.h Mon Aug  8 06:57:15 2011
@@ -33,6 +33,7 @@
 #define ACR_TCP_DEFER_ACCEPT    0x0400
 #define ACR_TCP_NODELAY         0x0800
 #define ACR_TCP_NOPUSH          0x1000
+#define ACR_TCP_SENDALL         0x2000
 
 /**
  * Descriptor types
@@ -43,6 +44,7 @@
 #define ACR_DT_LOCALSOCK        0x0004
 #define ACR_DT_SSLSOCK          0x0005
 #define ACR_DT_IPCSOCK          0x0006
+#define ACR_DT_IPCSERVER        0x0007
 
 typedef struct acr_fd_t acr_fd_t;
 struct acr_fd_t {

Modified: commons/sandbox/runtime/trunk/src/main/native/include/acr/netdefs.h
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/include/acr/netdefs.h?rev=1154855&r1=1154854&r2=1154855&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/include/acr/netdefs.h (original)
+++ commons/sandbox/runtime/trunk/src/main/native/include/acr/netdefs.h Mon Aug  8 06:57:15 2011
@@ -88,5 +88,6 @@
 #define ACR_OPT_TCP_DEFER_ACCEPT  21
 #define ACR_OPT_TCP_NODELAY       22
 #define ACR_OPT_TCP_NOPUSH        23
+#define ACR_OPT_TCP_SENDALL       24
 
 #endif /* _ACR_IODEFS_H */

Modified: commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_ipcs.h
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_ipcs.h?rev=1154855&r1=1154854&r2=1154855&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_ipcs.h (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_ipcs.h Mon Aug  8 06:57:15 2011
@@ -334,11 +334,10 @@ struct IPCSERVER
 #define IPCSOCK_READ (1L)
 #define IPCSOCK_SEND (2L)
 
-#define IPCSOCK_MODE_STREAM 0
-#define IPCSOCK_MODE_MSG    1
-
-#define IPCSOCK_SEND_ALL    1
-#define IPCSOCK_FLUSH       2
+/* This is actually TCP_NODELAY
+ */
+#define IPCSOCK_NODELAY     0x0800
+#define IPCSOCK_SENDALL     0x2000
 
 /**
  * Connection status values that
@@ -389,6 +388,12 @@ struct IPCSERVER
 extern "C" {
 #endif
 
+ACR_INLINE(LPIPCSOCK) AcrIpcSdRetain(acr_sd_t *sd)
+{
+    AcrAtomic32Inc(&sd->refs);
+    return sd->p;
+}
+
 /**
  * Read the data
  * @param pSocket the socket to use.
@@ -433,6 +438,35 @@ AcrIpcWrite(LPIPCSOCK pSocket, const voi
 int AcrIpcSend(LPIPCSOCK pSocket, const void *pData, int nSize, int nFlags);
 
 /**
+ * Write the data.
+ * @param pSocket the socket to use.
+ * @param pBuffers input data array buffer
+ * @param nBuffers number of buffers in buffer array.
+ * @return number of bytes written which can be lower
+ *         then total number of bytes or -1 on error.
+ * @notice The function will block if output buffer is
+ *         full until the peer read some data or until
+ *         the socket times out.
+ */
+int
+AcrIpcWritev(LPIPCSOCK pSocket, LPWSABUF pBuffers, int nBuffers, int nFlags);
+
+/**
+ * Send the message.
+ * @param pSocket the socket to use.
+ * @param pBuffers input data array buffer
+ * @param nBuffers number of buffers in buffer array.
+ * @param nFlags send flags. If set to IPCSOCK_FLUSH the
+ *        operation will block until the peer reads the
+ *        entire message send.
+ * @return zero on success or error code.
+ * @notice The function will block if output buffer is
+ *         full until the peer reads the message or until
+ *         the socket times out.
+ */
+int AcrIpcSendv(LPIPCSOCK cp, LPWSABUF pBuffers, int nBuffers, int nFlags);
+
+/**
  * Get the data messages
  * @param pSocket the socket to use.
  * @param pData output data buffer.

Modified: commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.c
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.c?rev=1154855&r1=1154854&r2=1154855&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.c Mon Aug  8 06:57:15 2011
@@ -342,28 +342,6 @@ AcrIpcServerClose(LPIPCSERVER sp)
     return 0;
 }
 
-ACR_NET_EXPORT(jlong, IpcServerEndpoint, create0)(JNI_STDARGS, jint flags)
-{
-    acr_sd_t  *sd;
-    IPCSERVER *sp;
-
-    if ((sd = ACR_TALLOC(acr_sd_t)) == 0) {
-        /* Allocation failed */
-        return 0;
-    }
-    if ((sp = ACR_TALLOC(IPCSERVER)) == 0) {
-        /* Allocation failed */
-        AcrFree(sd);
-        return 0;
-    }
-    ACR_RING_INIT(&sp->rConnections, IPCSOCK, rLink);
-    sd->p           = sp;
-    sd->flags       = flags;
-    sd->refs        = 1;
-    sp->nReferences = 1;
-    return P2J(sd);
-}
-
 ACR_NET_EXPORT(jint, IpcServerEndpoint, bind0)(JNI_STDARGS, jlong fp,
                                                jbyteArray cb, jint backlog)
 {
@@ -414,26 +392,6 @@ failed:
     return rc;
 }
 
-ACR_NET_EXPORT(jint, IpcServerEndpoint, close0)(JNI_STDARGS, jlong fp)
-{
-    int rc = 0;
-    acr_sd_t   *sd = J2P(fp, acr_sd_t *);
-    LPIPCSERVER sp;
-
-    if (sd == 0)
-        return ACR_EBADF;
-    sp = (LPIPCSERVER)InterlockedExchangePointer(&sd->p, 0);
-    if (InterlockedDecrement(&sd->refs) == 0)
-        AcrFree(sd);
-    if (sp == 0)
-        return ACR_EBADF;
-    sp->bClosed = TRUE;
-    SetEvent(sp->hAcceptSync);
-    if (InterlockedDecrement(&sp->nReferences) == 0)
-        rc = AcrIpcServerClose(sp);
-    return rc;
-}
-
 static LPBYTE
 AcrIoBufMap(HANDLE hMap, DWORD dwSize)
 {
@@ -725,39 +683,6 @@ finally:
     return 0;
 }
 
-ACR_NET_EXPORT(jlong, IpcEndpoint, create0)(JNI_STDARGS, jint stype, jboolean block)
-{
-    acr_sd_t *sd;
-    IPCSOCK  *cp;
-
-    if ((sd = ACR_TALLOC(acr_sd_t)) == 0) {
-        /* Allocation failed */
-        return 0;
-    }
-    if ((cp = ACR_TALLOC(IPCSOCK)) == 0) {
-        AcrFree(sd);
-        /* Allocation failed */
-        return 0;
-    }
-    ACR_RING_ELEM_INIT(cp, rLink);
-    if (stype != SOCK_STREAM)
-        cp->dwPageSize = PAGESIZE;
-    if (block != JNI_FALSE) {
-        cp->dwTimeout   = INFINITE;
-        sd->timeout     = -1;
-    }
-    else {
-        ACR_SETFLAG(sd, ACR_SO_NONBLOCK);
-    }
-    cp->nStatus     = IPCSOCK_CLOSED;
-    cp->pStatus     = &cp->nStatus;
-    cp->nReferences = 1;
-    sd->p           = cp;
-    sd->refs        = 1;
-    sd->type        = ACR_DT_IPCSOCK;
-    return P2J(sd);
-};
-
 static int
 AcrIpcConnect(LPIPCSOCK cp, LPCSTR szAddress, int nTimeout)
 {
@@ -1014,43 +939,8 @@ cleanup:
     return rc;
 }
 
-ACR_NET_EXPORT(jint, IpcEndpoint, tmset0)(JNI_STDARGS, jlong fp, jint timeout)
-{
-    LPIPCSOCK cp = J2P(fp, LPIPCSOCK);
-
-    if (cp == 0)
-        return WSAENOTSOCK;
-    if (cp->c == 0)
-        return WSAENOTCONN;
-    if (_InterlockedOr(cp->pStatus, 0) != 0)
-        return WSAESHUTDOWN;
-    if (timeout < 0)
-        cp->dwTimeout = INFINITE;
-    else
-        cp->dwTimeout = timeout;
-    return 0;
-}
-
-ACR_NET_EXPORT(jint, IpcEndpoint, shutdown0)(JNI_STDARGS, jlong fp)
-{
-    LPIPCSOCK cp = J2P(fp, LPIPCSOCK);
-    if (cp == 0)
-        return WSAENOTSOCK;
-    if (cp->c == 0)
-        return WSAENOTCONN;
-    if (_InterlockedOr(cp->pStatus, IPCSOCK_SHUTDOWN) != 0)
-        return WSAEALREADY;
-    cp->c->dwError = WSAESHUTDOWN;
-
-    /* Break listeners on our side
-     */
-    SetEvent(cp->hSync[IPCSOCK_RDR]);
-    SetEvent(cp->hSync[IPCSOCK_RTT]);
-    return 0;
-}
-
 static int
-AcrIpcClose(LPIPCSOCK cp)
+AcrIpcSocketClose(LPIPCSOCK cp)
 {
     int i;
     if (cp == 0)
@@ -1204,8 +1094,10 @@ int AcrIpcRead(LPIPCSOCK cp, void *pData
 
             if (cp->nStatus != 0) {
                 /* Cannot require data on already SHUTDOWN connection */
-                SetLastError(WSAESHUTDOWN);
-                nRead = -1;
+                if (cp->nStatus != IPCSOCK_SHUTDOWN) {
+                    SetLastError(WSAESHUTDOWN);
+                    nRead = -1;
+                }
                 break;
             }
             wh[0] = cp->hSync[IPCSOCK_RDR];
@@ -1316,7 +1208,7 @@ int AcrIpcWrite(LPIPCSOCK cp, const void
                 /* Inform the peer that we wrote some bytes */
                 SetEvent(cp->hSync[IPCSOCK_TDR]);
             }
-            if ((nFlags & IPCSOCK_SEND_ALL) == 0) {
+            if ((nFlags & ACR_TCP_SENDALL) == 0) {
                 /* User requested a partial write
                  */
                 break;
@@ -1417,7 +1309,7 @@ int AcrIpcSend(LPIPCSOCK cp, const void 
         cp->Mw->SendWait = TRUE;
         MemoryBarrier();
         if (cp->Mw->Length == 0) {
-            if ((nFlags & IPCSOCK_FLUSH) == 0)
+            if ((nFlags & ACR_TCP_NODELAY) == 0)
                 cp->Mw->SendWait = FALSE;
             if (nSend != 0) {
                 /* We have already send the data
@@ -1437,7 +1329,7 @@ int AcrIpcSend(LPIPCSOCK cp, const void 
                 /* Inform the peer that we have send the message */
                 SetEvent(cp->hSync[IPCSOCK_TDR]);
             }
-            if ((nFlags & IPCSOCK_FLUSH) == 0)
+            if ((nFlags & ACR_TCP_NODELAY) == 0)
                 break;
             nSend = nSize;
         }
@@ -1492,10 +1384,19 @@ int AcrIpcSend(LPIPCSOCK cp, const void 
     return rc;
 }
 
-int AcrIpcRecv(LPIPCSOCK cp, void *pData, int nSize)
+int AcrIpcWritev(LPIPCSOCK cp, LPWSABUF pBuffers, int nBuffers, int nFlags)
 {
-    long   nAvail;
-    long   nRead     = 0;
+    long   nCapacity;
+    LPBYTE pDest;
+    LPBYTE pData;
+    LPWSABUF pCb    = pBuffers;
+    long   nSend    = 0;
+    long   nChunk   = 0;
+    long   nSize    = 0;
+    long   nData    = 0;
+    INT64  nTimeup  = 0;
+    int    nTimeout;
+    int    i;
 
     if (cp == 0) {
         SetLastError(WSAENOTSOCK);
@@ -1505,81 +1406,108 @@ int AcrIpcRecv(LPIPCSOCK cp, void *pData
         SetLastError(WSAENOTCONN);
         return -1;
     }
-    if (cp->dwPageSize != 0) {
+    if (cp->dwPageSize == 0) {
         SetLastError(WSAEPFNOSUPPORT);
         return -1;
     }
-    if (pData == 0 || nSize < 1) {
+    if (pBuffers == 0) {
         SetLastError(WSAEINVAL);
         return -1;
     }
-    if (_InterlockedOr(&cp->nState, IPCSOCK_READ) == IPCSOCK_READ) {
-        /* Protect against concurrent reads.
+    for (i = 0; i < nBuffers; i++) {
+        if (pBuffers[i].len == 0 || pBuffers[i].buf == 0) {
+            SetLastError(WSAEINVAL);
+            return -1;
+        }
+        nSize += pBuffers[i].len;
+    }
+    if (_InterlockedOr(&cp->nState, IPCSOCK_SEND) == IPCSOCK_SEND) {
+        /* Protect against concurrent writes.
          */
         SetLastError(WSAEALREADY);
         return -1;
     }
     InterlockedIncrement(&cp->nReferences);
-    while (nRead == 0) {
+    nTimeout = cp->dwTimeout;
+    if (nTimeout <= 0) {
+        /* Get "future" timeout */
+        nTimeup = GetCurrentMilliseconds() + cp->dwTimeout;
+    }
+    pData = pCb->buf;
+    nData = pCb->len;
+    while (nSize > 0) {
         cp->nStatus = _InterlockedOr(cp->pStatus, 0);
-        if (cp->nStatus > IPCSOCK_SHUTDOWN) {
-            /* Any attempt to read on closed connection is an error.
+        if (cp->nStatus != 0) {
+            /* We don't support half-closed connections
+             * so any attempt to write on shudown socket
+             * is error.
              */
             if (cp->c != 0)
                 SetLastError(cp->c->dwError);
             else
                 SetLastError(WSAENOTCONN);
-            nRead = -1;
+            nSend = -1;
             break;
         }
-        cp->Mr->RecvWait = TRUE;
+        cp->Wr->SendWait = TRUE;
         MemoryBarrier();
-        nAvail = cp->Mr->Length - cp->Mr->Readed;
-        if (nAvail > 0) {
-            /* We have something in the buffer
+        nCapacity = cp->dwPageSize - (cp->Wr->SendPos - cp->Wr->RecvPos);
+        pDest     = cp->pWrbData + cp->Wr->SendPos;
+        if (nCapacity > 0) {
+            cp->Wr->SendWait = FALSE;
+            /* We have free space in the buffer
              */
-            cp->Mr->RecvWait = FALSE;
-            /* Copy to the user buffer */
-            nRead = nAvail > nSize ? nSize : nAvail;
-            memcpy(pData, cp->Mr->Data + cp->Mr->Readed, nRead);
-            /* Advance read pointer */
-            cp->Mr->Readed += nRead;
-            if (cp->Mr->Readed >= cp->Mr->Length) {
-                cp->Mr->Readed = 0;
-                cp->Mr->Length = 0;
-            }
+            nChunk = nCapacity > nData ? nData : nCapacity;
+            memcpy(pDest, pData, nChunk);
+            /* Advance write pointer */
+            cp->Wr->SendPos += nChunk;
+            nSend += nChunk;
+            nSize -= nChunk;
+            pData += nChunk;
+            nData -= nChunk;
             MemoryBarrier();
-            if (cp->Mr->SendWait && cp->Mr->Length == 0) {
-                /* Inform the peer that we have read the entire message
+            if (cp->Wr->RecvWait) {
+                /* Inform the peer that we wrote some bytes */
+                SetEvent(cp->hSync[IPCSOCK_TDR]);
+            }
+            if ((nFlags & ACR_TCP_SENDALL) == 0 && nChunk == nCapacity) {
+                /* User requested a partial write
                  */
-                SetEvent(cp->hSync[IPCSOCK_RTR]);
+                break;
+            }
+            if (nData == 0) {
+                ++pCb;
+                pData = pCb->buf;
+                nData = pCb->len;
             }
         }
         else if (cp->dwTimeout == 0) {
             /* Non blocking mode.
              */
-            cp->Mr->RecvWait = FALSE;
-            SetLastError(WSAEWOULDBLOCK);
-            nRead = -1;
+            cp->Wr->SendWait = FALSE;
+            if (nSend == 0) {
+                SetLastError(WSAEWOULDBLOCK);
+                nSend = -1;
+            }
+            break;
         }
         else {
             DWORD  ws;
             HANDLE wh[2];
 
-            if (cp->nStatus != 0) {
-                /* Cannot require data on already SHUTDOWN connection */
-                SetLastError(WSAESHUTDOWN);
-                nRead = -1;
-                break;
-            }
-            wh[0] = cp->hSync[IPCSOCK_RDR];
+            wh[0] = cp->hSync[IPCSOCK_RTT];
             wh[1] = cp->rp->hProcessLock;
+            if (nTimeup != 0) {
+                nTimeout = (int)(nTimeup - GetCurrentMilliseconds());
+                if (nTimeout < 0)
+                    nTimeout = 0;
+            }
             /* Wait for a signal.
              * XXX: INFINITE timeouts should really be some sane
              *      value which will abort the connection.
              */
-            ws = WaitForMultipleObjects(2, wh, FALSE, cp->dwTimeout);
-            cp->Mr->RecvWait = FALSE;
+            ws = WaitForMultipleObjects(2, wh, FALSE, nTimeout);
+            cp->Wr->SendWait = FALSE;
             switch (ws) {
                 case WAIT_OBJECT_0:
                 break;
@@ -1589,91 +1517,340 @@ int AcrIpcRecv(LPIPCSOCK cp, void *pData
                     cp->c->dwError = WSAECONNRESET;
                     InterlockedExchange(cp->pStatus, IPCSOCK_ABORTED);
                     SetLastError(cp->c->dwError);
-                    nRead = -1;
+                    nSend = -1;
                 break;
                 case WAIT_TIMEOUT:
                     SetLastError(WSAETIMEDOUT);
-                    nRead = -1;
+                    nSend = -1;
                 break;
                 default:
-                    /* This can only be WAIT_FAILED
+                    /* This can be only WAIT_FAILED
                      * in which case errno is already set
                      */
-                    nRead = -1;
+                    nSend = -1;
                 break;
             }
         }
     }
-    _InterlockedAnd(&cp->nState, ~IPCSOCK_READ);
-    ApcIpcUnref(cp);
-    return nRead;
-}
-
-int AcrIpcAvail(LPIPCSOCK cp, BOOL bForRead)
-{
-    long nAvail;
-
-    if (cp == 0) {
-        SetLastError(WSAENOTSOCK);
-        return -1;
-    }
-    if (cp->c == 0) {
-        SetLastError(WSAENOTCONN);
-        return -1;
-    }
-    cp->nStatus = _InterlockedOr(cp->pStatus, 0);
-    if (cp->nStatus > IPCSOCK_SHUTDOWN) {
-        /* Any attempt to read on closed connection is error.
-         */
-        if (cp->c != 0)
-            SetLastError(cp->c->dwError);
-        else
-            SetLastError(WSAENOTCONN);
-        return -1;
-    }
-    InterlockedIncrement(&cp->nReferences);
-    if (WaitForSingleObject(cp->rp->hProcessLock, 0) != WAIT_TIMEOUT) {
-        ReleaseMutex(cp->rp->hProcessLock);
-        cp->c->dwError = WSAECONNRESET;
-        InterlockedExchange(cp->pStatus, IPCSOCK_ABORTED);
-        SetLastError(cp->c->dwError);
-        ApcIpcUnref(cp);
-        return -1;
-    }
-    MemoryBarrier();
-    if (cp->dwPageSize == 0) {
-        if (bForRead)
-            nAvail = cp->Mr->Length - cp->Mr->Readed;
-        else
-            nAvail = cp->Mw->Length == 0 ? IPCSOCK_MSGSIZE : 0;
-    }
-    else {
-        nAvail = cp->Rd->SendPos - cp->Rd->RecvPos;
-        if (!bForRead)
-            nAvail = cp->dwPageSize - nAvail;
-    }
+    _InterlockedAnd(&cp->nState, ~IPCSOCK_SEND);
     ApcIpcUnref(cp);
-    if (nAvail > 0)
-        return nAvail;
-    else
-        return 0;
+    return nSend;
 }
 
-int AcrIpcFlush(LPIPCSOCK cp)
+int AcrIpcSendv(LPIPCSOCK cp, LPWSABUF pBuffers, int nBuffers, int nFlags)
 {
     int    rc       = 0;
-    long   nUsed    = 0;
+    long   nSend    = 0;
+    long   nSize    = 0;
     INT64  nTimeup  = 0;
     int    nTimeout;
-    volatile long *pSendWait;
+    int    i;
 
     if (cp == 0)
         return WSAENOTSOCK;
     if (cp->c == 0)
         return WSAENOTCONN;
-    if (_InterlockedOr(&cp->nState, IPCSOCK_SEND) == IPCSOCK_SEND)
-        return WSAEALREADY;
-    InterlockedIncrement(&cp->nReferences);
+    if (cp->dwPageSize != 0)
+        return  WSAEPFNOSUPPORT;
+    if (pBuffers == 0) {
+        SetLastError(WSAEINVAL);
+        return -1;
+    }
+    for (i = 0; i < nBuffers; i++) {
+        if (pBuffers[i].len == 0 || pBuffers[i].buf == 0) {
+            SetLastError(WSAEINVAL);
+            return -1;
+        }
+        nSize += pBuffers[i].len;
+    }
+    if (nSize > IPCSOCK_MSGSIZE)
+        return WSAEINVAL;
+    if (_InterlockedOr(&cp->nState, IPCSOCK_SEND) == IPCSOCK_SEND)
+        return WSAEALREADY;
+    InterlockedIncrement(&cp->nReferences);
+    nTimeout = cp->dwTimeout;
+    if (nTimeout <= 0) {
+        /* Get "future" timeout */
+        nTimeup = GetCurrentMilliseconds() + cp->dwTimeout;
+    }
+    while (rc == 0) {
+        cp->nStatus = _InterlockedOr(cp->pStatus, 0);
+        if (cp->nStatus != 0) {
+            /* We don't support half-closed connections
+             * so any attempt to write on shudown socket
+             * is error.
+             */
+            if (cp->c != 0)
+                rc = cp->c->dwError;
+            else
+                rc = WSAENOTCONN;
+            break;
+        }
+        cp->Mw->SendWait = TRUE;
+        MemoryBarrier();
+        if (cp->Mw->Length == 0) {
+            if ((nFlags & ACR_TCP_NODELAY) == 0)
+                cp->Mw->SendWait = FALSE;
+            if (nSend != 0) {
+                /* We have already send the data
+                 * and this was confirmation that it was
+                 * delivered.
+                 */
+                cp->Mw->SendWait = FALSE;
+                break;
+            }
+            /* We have free space in the buffer
+             */
+            for (i = 0; i < nBuffers; i++) {
+                memcpy(cp->Mw->Data + nSend, pBuffers[i].buf, pBuffers[i].len);
+                nSend += pBuffers[i].len;
+            }
+            InterlockedIncrement(&cp->Mw->Id);
+            cp->Mw->Length = nSize;
+            MemoryBarrier();
+            if (cp->Mw->RecvWait) {
+                /* Inform the peer that we have send the message */
+                SetEvent(cp->hSync[IPCSOCK_TDR]);
+            }
+            if ((nFlags & ACR_TCP_NODELAY) == 0)
+                break;
+        }
+        else if (cp->dwTimeout == 0) {
+            /* Non blocking mode.
+             */
+            cp->Mw->SendWait = FALSE;
+            rc = WSAEWOULDBLOCK;
+            break;
+        }
+        else {
+            DWORD  ws;
+            HANDLE wh[2];
+
+            wh[0] = cp->hSync[IPCSOCK_RTT];
+            wh[1] = cp->rp->hProcessLock;
+            if (nTimeup != 0) {
+                nTimeout = (int)(nTimeup - GetCurrentMilliseconds());
+                if (nTimeout < 0)
+                    nTimeout = 0;
+            }
+            /* Wait for a signal.
+             * XXX: INFINITE timeouts should really be some sane
+             *      value which will abort the connection.
+             */
+            ws = WaitForMultipleObjects(2, wh, FALSE, nTimeout);
+            cp->Mw->SendWait = FALSE;
+            switch (ws) {
+                case WAIT_OBJECT_0:
+                break;
+                case WAIT_OBJECT_1:
+                case WAIT_ABANDONED_1:
+                    ReleaseMutex(cp->rp->hProcessLock);
+                    cp->c->dwError = WSAECONNRESET;
+                    InterlockedExchange(cp->pStatus, IPCSOCK_ABORTED);
+                    rc = cp->c->dwError;
+                break;
+                case WAIT_TIMEOUT:
+                    rc = WSAETIMEDOUT;
+                break;
+                default:
+                    /* This can be only WAIT_FAILED
+                     * in which case errno is already set
+                     */
+                    rc = GetLastError();
+                break;
+            }
+        }
+    }
+    _InterlockedAnd(&cp->nState, ~IPCSOCK_SEND);
+    ApcIpcUnref(cp);
+    return rc;
+}
+
+int AcrIpcRecv(LPIPCSOCK cp, void *pData, int nSize)
+{
+    long   nAvail;
+    long   nRead     = 0;
+
+    if (cp == 0) {
+        SetLastError(WSAENOTSOCK);
+        return -1;
+    }
+    if (cp->c == 0) {
+        SetLastError(WSAENOTCONN);
+        return -1;
+    }
+    if (cp->dwPageSize != 0) {
+        SetLastError(WSAEPFNOSUPPORT);
+        return -1;
+    }
+    if (pData == 0 || nSize < 1) {
+        SetLastError(WSAEINVAL);
+        return -1;
+    }
+    if (_InterlockedOr(&cp->nState, IPCSOCK_READ) == IPCSOCK_READ) {
+        /* Protect against concurrent reads.
+         */
+        SetLastError(WSAEALREADY);
+        return -1;
+    }
+    InterlockedIncrement(&cp->nReferences);
+    while (nRead == 0) {
+        cp->nStatus = _InterlockedOr(cp->pStatus, 0);
+        if (cp->nStatus > IPCSOCK_SHUTDOWN) {
+            /* Any attempt to read on closed connection is an error.
+             */
+            if (cp->c != 0)
+                SetLastError(cp->c->dwError);
+            else
+                SetLastError(WSAENOTCONN);
+            nRead = -1;
+            break;
+        }
+        cp->Mr->RecvWait = TRUE;
+        MemoryBarrier();
+        nAvail = cp->Mr->Length - cp->Mr->Readed;
+        if (nAvail > 0) {
+            /* We have something in the buffer
+             */
+            cp->Mr->RecvWait = FALSE;
+            /* Copy to the user buffer */
+            nRead = nAvail > nSize ? nSize : nAvail;
+            memcpy(pData, cp->Mr->Data + cp->Mr->Readed, nRead);
+            /* Advance read pointer */
+            cp->Mr->Readed += nRead;
+            if (cp->Mr->Readed >= cp->Mr->Length) {
+                cp->Mr->Readed = 0;
+                cp->Mr->Length = 0;
+            }
+            MemoryBarrier();
+            if (cp->Mr->SendWait && cp->Mr->Length == 0) {
+                /* Inform the peer that we have read the entire message
+                 */
+                SetEvent(cp->hSync[IPCSOCK_RTR]);
+            }
+        }
+        else if (cp->dwTimeout == 0) {
+            /* Non blocking mode.
+             */
+            cp->Mr->RecvWait = FALSE;
+            SetLastError(WSAEWOULDBLOCK);
+            nRead = -1;
+        }
+        else {
+            DWORD  ws;
+            HANDLE wh[2];
+
+            if (cp->nStatus != 0) {
+                /* Cannot require data on already SHUTDOWN connection
+                 */
+                if (cp->nStatus != IPCSOCK_SHUTDOWN) {
+                    SetLastError(WSAESHUTDOWN);
+                    nRead = -1;
+                }
+                break;
+            }
+            wh[0] = cp->hSync[IPCSOCK_RDR];
+            wh[1] = cp->rp->hProcessLock;
+            /* Wait for a signal.
+             * XXX: INFINITE timeouts should really be some sane
+             *      value which will abort the connection.
+             */
+            ws = WaitForMultipleObjects(2, wh, FALSE, cp->dwTimeout);
+            cp->Mr->RecvWait = FALSE;
+            switch (ws) {
+                case WAIT_OBJECT_0:
+                break;
+                case WAIT_OBJECT_1:
+                case WAIT_ABANDONED_1:
+                    ReleaseMutex(cp->rp->hProcessLock);
+                    cp->c->dwError = WSAECONNRESET;
+                    InterlockedExchange(cp->pStatus, IPCSOCK_ABORTED);
+                    SetLastError(cp->c->dwError);
+                    nRead = -1;
+                break;
+                case WAIT_TIMEOUT:
+                    SetLastError(WSAETIMEDOUT);
+                    nRead = -1;
+                break;
+                default:
+                    /* This can only be WAIT_FAILED
+                     * in which case errno is already set
+                     */
+                    nRead = -1;
+                break;
+            }
+        }
+    }
+    _InterlockedAnd(&cp->nState, ~IPCSOCK_READ);
+    ApcIpcUnref(cp);
+    return nRead;
+}
+
+int AcrIpcAvail(LPIPCSOCK cp, BOOL bForRead)
+{
+    long nAvail;
+
+    if (cp == 0) {
+        SetLastError(WSAENOTSOCK);
+        return -1;
+    }
+    if (cp->c == 0) {
+        SetLastError(WSAENOTCONN);
+        return -1;
+    }
+    cp->nStatus = _InterlockedOr(cp->pStatus, 0);
+    if (cp->nStatus > IPCSOCK_SHUTDOWN) {
+        /* Any attempt to read on closed connection is error.
+         */
+        if (cp->c != 0)
+            SetLastError(cp->c->dwError);
+        else
+            SetLastError(WSAENOTCONN);
+        return -1;
+    }
+    InterlockedIncrement(&cp->nReferences);
+    if (WaitForSingleObject(cp->rp->hProcessLock, 0) != WAIT_TIMEOUT) {
+        ReleaseMutex(cp->rp->hProcessLock);
+        cp->c->dwError = WSAECONNRESET;
+        InterlockedExchange(cp->pStatus, IPCSOCK_ABORTED);
+        SetLastError(cp->c->dwError);
+        ApcIpcUnref(cp);
+        return -1;
+    }
+    MemoryBarrier();
+    if (cp->dwPageSize == 0) {
+        if (bForRead)
+            nAvail = cp->Mr->Length - cp->Mr->Readed;
+        else
+            nAvail = cp->Mw->Length == 0 ? IPCSOCK_MSGSIZE : 0;
+    }
+    else {
+        nAvail = cp->Rd->SendPos - cp->Rd->RecvPos;
+        if (!bForRead)
+            nAvail = cp->dwPageSize - nAvail;
+    }
+    ApcIpcUnref(cp);
+    if (nAvail > 0)
+        return nAvail;
+    else
+        return 0;
+}
+
+int AcrIpcFlush(LPIPCSOCK cp)
+{
+    int    rc       = 0;
+    long   nUsed    = 0;
+    INT64  nTimeup  = 0;
+    int    nTimeout;
+    volatile long *pSendWait;
+
+    if (cp == 0)
+        return WSAENOTSOCK;
+    if (cp->c == 0)
+        return WSAENOTCONN;
+    if (_InterlockedOr(&cp->nState, IPCSOCK_SEND) == IPCSOCK_SEND)
+        return WSAEALREADY;
+    InterlockedIncrement(&cp->nReferences);
     nTimeout = cp->dwTimeout;
     if (nTimeout <= 0) {
         /* Get "future" timeout */
@@ -1768,6 +1945,52 @@ int AcrIpcFlush(LPIPCSOCK cp)
     return rc;
 }
 
+int AcrIpcSync(LPIPCSOCK cp)
+{
+    int  rc = 0;
+    long nUsed;
+
+    if (cp == 0)
+        return WSAENOTSOCK;
+    if (cp->c == 0)
+        return WSAENOTCONN;
+    if (_InterlockedOr(&cp->nState, IPCSOCK_SEND) == IPCSOCK_SEND)
+        return WSAEALREADY;
+    InterlockedIncrement(&cp->nReferences);
+    cp->nStatus = _InterlockedOr(cp->pStatus, 0);
+    if (cp->nStatus != 0) {
+        /* We don't support half-closed connections
+         * so any attempt to write on shudown socket
+         * is error.
+         */
+        if (cp->c != 0)
+            rc = cp->c->dwError;
+        else
+            rc = WSAENOTCONN;
+        goto finally;
+    }
+    MemoryBarrier();
+    if (cp->dwPageSize == 0)
+        nUsed = cp->Wr->SendPos - cp->Wr->RecvPos;
+    else
+        nUsed = cp->Mw->Length;
+    if (nUsed > 0) {
+        /* Our send buffer is not empty.
+         * Wait for the receiver to consume data
+         */
+        if (cp->Mw->RecvWait || cp->Wr->RecvWait) {
+            /* XXX: This should be signaled already
+             * but probably not handled (yet).
+             */
+            SetEvent(cp->hSync[IPCSOCK_TDR]);
+        }
+    }
+finally:
+    _InterlockedAnd(&cp->nState, ~IPCSOCK_SEND);
+    ApcIpcUnref(cp);
+    return rc;
+}
+
 ACR_NET_EXPORT(jlong, IpcServerEndpoint, accept0)(JNI_STDARGS, jlong fp,
                                                   jint timeout, jboolean block)
 {
@@ -1792,11 +2015,7 @@ ACR_NET_EXPORT(jlong, IpcServerEndpoint,
     cd->p    = cp;
     cd->refs = 1;
     cd->type = ACR_DT_IPCSOCK;
-    if (block) {
-        cd->timeout = cp->dwTimeout;
-    }
-    else {
-        cd->timeout   = 0;
+    if (block == JNI_FALSE) {
         cp->dwTimeout = 0;
         ACR_SETFLAG(cd, ACR_SO_NONBLOCK);
     }
@@ -1807,30 +2026,372 @@ ACR_NET_EXPORT(jint, IpcEndpoint, connec
 {
     int rc;
     acr_sd_t *sd = J2P(fp, acr_sd_t *);
-    LPIPCSOCK cp;
     acr_sockaddr_t *ca;
 
     if (sd == 0 || sd->p == 0)
         return ACR_EBADF;
-    InterlockedIncrement(&sd->refs);
-    cp = (LPIPCSOCK)sd->p;
     ca = SOCKADDR_CAST(cb);
-    rc = AcrIpcConnect(cp, ca->hostname, timeout);
+    rc = AcrIpcConnect(AcrIpcSdRetain(sd), ca->hostname, timeout);
     SOCKADDR_RELEASE(cb, ca);
+    AcrSdRelease(sd);
+    return rc;
+}
+
+ACR_NET_EXPORT(jlong, IpcDescriptor, create0)(JNI_STDARGS, jint flags)
+{
+    acr_sd_t  *sd;
+    IPCSERVER *sp;
+
+    if ((sd = ACR_TALLOC(acr_sd_t)) == 0) {
+        /* Allocation failed */
+        return 0;
+    }
+    if ((sp = ACR_TALLOC(IPCSERVER)) == 0) {
+        /* Allocation failed */
+        AcrFree(sd);
+        return 0;
+    }
+    ACR_RING_INIT(&sp->rConnections, IPCSOCK, rLink);
+    sd->p           = sp;
+    sd->flags       = flags;
+    sd->refs        = 1;
+    sd->type        = ACR_DT_IPCSERVER;
+    sp->nReferences = 1;
+    return P2J(sd);
+}
+
+ACR_NET_EXPORT(jlong, IpcDescriptor, socket0)(JNI_STDARGS, jint stype, jboolean block)
+{
+    acr_sd_t *sd;
+    IPCSOCK  *cp;
+
+    if ((sd = ACR_TALLOC(acr_sd_t)) == 0) {
+        /* Allocation failed */
+        return 0;
+    }
+    if ((cp = ACR_TALLOC(IPCSOCK)) == 0) {
+        AcrFree(sd);
+        /* Allocation failed */
+        return 0;
+    }
+    ACR_RING_ELEM_INIT(cp, rLink);
+    if (stype != SOCK_STREAM)
+        cp->dwPageSize = PAGESIZE;
+    if (block != JNI_FALSE)
+        cp->dwTimeout   = INFINITE;
+    else
+        ACR_SETFLAG(sd, ACR_SO_NONBLOCK);
+    cp->nStatus     = IPCSOCK_CLOSED;
+    cp->pStatus     = &cp->nStatus;
+    cp->nReferences = 1;
+    sd->p           = cp;
+    sd->refs        = 1;
+    sd->type        = ACR_DT_IPCSOCK;
+    return P2J(sd);
+};
+
+ACR_NET_EXPORT(jint, IpcDescriptor, close0)(JNI_STDARGS, jlong fp)
+{
+    int rc = WSAENOTSOCK;
+    acr_sd_t   *sd = J2P(fp, acr_sd_t *);
+    LPVOID      sock;
+    int         type;
+    if (sd == 0)
+        return ACR_EBADF;
+    sock = InterlockedExchangePointer(&sd->p, 0);
+    type = sd->type;
+    AcrSdRelease(sd);
+    if (sock == 0)
+        return ACR_EBADF;
+    if (sd->type == ACR_DT_IPCSERVER) {
+        LPIPCSERVER sp = sock;
+        sp->bClosed = TRUE;
+        SetEvent(sp->hAcceptSync);
+        if (InterlockedDecrement(&sp->nReferences) == 0)
+            rc = AcrIpcServerClose(sp);
+        else
+            rc = 0;
+    }
+    else if (sd->type == ACR_DT_IPCSOCK) {
+        rc = AcrIpcSocketClose(sock);
+    }
     return rc;
 }
 
-ACR_NET_EXPORT(jint, IpcEndpoint, close0)(JNI_STDARGS, jlong fp)
+ACR_NET_EXPORT(jint, IpcDescriptor, shutdown0)(JNI_STDARGS, jlong fp)
 {
+    LPIPCSOCK cp;
     acr_sd_t *sd = J2P(fp, acr_sd_t *);
-    LPIPCSOCK sp;
 
     if (sd == 0)
         return ACR_EBADF;
-    sp = (LPIPCSOCK)sd->p;
-    if (InterlockedDecrement(&sd->refs) == 0)
-        AcrFree(sd);
+    cp = (LPIPCSOCK)sd->p;
+    if (cp == 0)
+        return WSAENOTSOCK;
+    if (cp->c == 0)
+        return WSAENOTCONN;
+    if (_InterlockedOr(cp->pStatus, IPCSOCK_SHUTDOWN) != 0)
+        return WSAEALREADY;
+    cp->c->dwError = WSAESHUTDOWN;
+
+    /* Break listeners on our side
+     */
+    SetEvent(cp->hSync[IPCSOCK_RDR]);
+    SetEvent(cp->hSync[IPCSOCK_RTT]);
+    return 0;
+}
+
+ACR_NET_EXPORT(jint, IpcDescriptor, optset0)(JNI_STDARGS, jlong fp,
+                                             jint opt, jint val)
+{
+    int rc = 0;
+    LPIPCSOCK cp;
+    acr_sd_t *sd = J2P(fp, acr_sd_t *);
+
+    if (sd == 0)
+        return ACR_EBADF;
+    cp = (LPIPCSOCK)sd->p;
+    if (cp == 0)
+        return WSAENOTSOCK;
+    if (_InterlockedOr(cp->pStatus, 0) != 0)
+        return WSAESHUTDOWN;
+    switch (opt) {
+        case ACR_OPT_SO_TIMEOUT:
+            if (val < 0)
+                cp->dwTimeout = INFINITE;
+            else
+                cp->dwTimeout = val;
+            if (cp->dwTimeout == 0)
+                ACR_SETFLAG(sd, ACR_SO_NONBLOCK);
+        break;
+        default:
+            rc = ACR_EINVAL;
+        break;
+    }
+    return rc;
+}
+
+ACR_NET_EXPORT(jint, IpcDescriptor, optset1)(JNI_STDARGS, jlong fp,
+                                             jint opt, jboolean val)
+{
+    int rc = 0;
+    int on = val == JNI_TRUE ? 1 : 0;
+    LPIPCSOCK cp;
+    acr_sd_t *sd = J2P(fp, acr_sd_t *);
+
+    if (sd == 0)
+        return ACR_EBADF;
+    cp = (LPIPCSOCK)sd->p;
+    if (cp == 0)
+        return WSAENOTSOCK;
+
+    /* Set numeric values */
+    switch (opt) {
+        case ACR_OPT_SO_KEEPALIVE:
+        case ACR_OPT_SO_REUSEADDR:
+            rc = ACR_ENOTIMPL;
+        break;
+        case ACR_OPT_SO_NONBLOCK:
+            if (on)
+                cp->dwTimeout = 0;
+            else if (cp->dwTimeout == 0)
+                cp->dwTimeout = INFINITE;
+            ACR_PUTFLAG(sd, ACR_SO_NONBLOCK, on);
+        break;
+        default:
+            rc = ACR_EINVAL;
+        break;
+    }
+    return rc;
+}
+
+ACR_NET_EXPORT(jint, IpcDescriptor, optset2)(JNI_STDARGS, jlong fp,
+                                             jint opt, jboolean val)
+{
+    int rc = 0;
+    int on = val == JNI_FALSE ? 0 : 1;
+    acr_sd_t *sd = J2P(fp, acr_sd_t *);
+
+    /* Set numeric values */
+    switch (opt) {
+        case ACR_OPT_TCP_NODELAY:
+            ACR_PUTFLAG(sd, ACR_TCP_NODELAY, on);
+        break;
+        case ACR_OPT_TCP_SENDALL:
+            ACR_PUTFLAG(sd, ACR_TCP_SENDALL, on);
+        break;
+        case ACR_OPT_TCP_DEFER_ACCEPT:
+        case ACR_OPT_TCP_NOPUSH:
+        case ACR_OPT_IPV6_V6ONLY:
+            rc = ACR_ENOTIMPL;
+        break;
+        default:
+            rc = ACR_EINVAL;
+        break;
+    }
+    return rc;
+}
+
+ACR_NET_EXPORT(jboolean, IpcDescriptor, optget0)(JNI_STDARGS, jlong fp,
+                                                 jint opt)
+{
+    int rc = 0;
+    jboolean rv = JNI_FALSE;
+    LPIPCSOCK cp;
+    acr_sd_t *sd = J2P(fp, acr_sd_t *);
+
+    if (sd == 0)
+        rc = ACR_EBADF;
+    cp = (LPIPCSOCK)sd->p;
+    if (cp == 0)
+        rc = WSAENOTSOCK;
+    if (rc != 0)
+        goto finally;
+
+    /* Set numeric values */
+    switch (opt) {
+        case ACR_OPT_SO_KEEPALIVE:
+        case ACR_OPT_SO_LINGER:
+        case ACR_OPT_SO_REUSEADDR:
+        case ACR_OPT_IPV6_V6ONLY:
+        case ACR_OPT_TCP_NOPUSH:
+        case ACR_OPT_TCP_DEFER_ACCEPT:
+            rc = ACR_ENOTIMPL;
+        break;
+        case ACR_OPT_SO_TIMEOUT:
+            if (cp->dwTimeout != INFINITE && cp->dwTimeout != 0)
+                rv = JNI_TRUE;
+        break;
+        case ACR_OPT_SO_NONBLOCK:
+            if (ACR_HASFLAG(sd, ACR_SO_NONBLOCK))
+                rv = JNI_TRUE;
+        break;
+        case ACR_OPT_TCP_NODELAY:
+            if (ACR_HASFLAG(sd, ACR_TCP_NODELAY))
+                rv = JNI_TRUE;
+        break;
+        break;
+        case ACR_OPT_TCP_SENDALL:
+            if (ACR_HASFLAG(sd, ACR_TCP_SENDALL))
+                rv = JNI_TRUE;
+        break;
+        default:
+            rc = ACR_EINVAL;
+        break;
+    }
+finally:
+    if (rc != 0)
+        ACR_THROW_NET_ERROR(rc);
+    return rv;
+}
+
+ACR_NET_EXPORT(jint, IpcDescriptor, block0)(JNI_STDARGS, jlong fp, jboolean on)
+{
+    LPIPCSOCK cp;
+    acr_sd_t *sd = J2P(fp, acr_sd_t *);
+
+    cp = (LPIPCSOCK)sd->p;
+    if (cp == 0)
+        return WSAENOTSOCK;
+    if (on) {
+        ACR_CLRFLAG(sd, ACR_SO_NONBLOCK);
+        cp->dwTimeout = INFINITE;
+    }
+    else {
+        ACR_SETFLAG(sd, ACR_SO_NONBLOCK);
+        cp->dwTimeout = 0;
+    }
+    return 0;
+}
+
+ACR_NET_EXPORT(jboolean, IpcDescriptor, blocking0)(JNI_STDARGS, jlong fp)
+{
+    acr_sd_t *sd = J2P(fp, acr_sd_t *);
+
+    if (ACR_HASFLAG(sd, ACR_SO_NONBLOCK))
+        return JNI_FALSE;
     else
-        sd->p = 0;
-    return AcrIpcClose(sp);
+        return JNI_TRUE;
+}
+
+ACR_NET_EXPORT(jint, IpcDescriptor, flush0)(JNI_STDARGS, jlong fp)
+{
+    int rc;
+    acr_sd_t *sd = J2P(fp, acr_sd_t *);
+
+    if (sd == 0 || sd->p == 0)
+        return ACR_EBADF;
+    rc = AcrIpcFlush(AcrIpcSdRetain(sd));
+    AcrSdRelease(sd);
+    return rc;
+}
+
+ACR_NET_EXPORT(jint, IpcDescriptor, sync0)(JNI_STDARGS, jlong fp)
+{
+    int rc;
+    acr_sd_t *sd = J2P(fp, acr_sd_t *);
+
+    if (sd == 0 || sd->p == 0)
+        return ACR_EBADF;
+    rc = AcrIpcSync(AcrIpcSdRetain(sd));
+    AcrSdRelease(sd);
+    return rc;
+}
+
+ACR_NET_EXPORT(jint, IpcDescriptor, getlocal0)(JNI_STDARGS, jlong sp,
+                                               jbyteArray a)
+{
+    int rc = 0;
+    acr_sd_t       *sd = J2P(sp, acr_sd_t *);
+    acr_sockaddr_t *sa = SOCKADDR_CAST(a);
+    LPIPCSOCK cp;
+
+    if (sa == 0)
+        return rc;
+    memset(sa, 0, sizeof(acr_sockaddr_t));
+    if (sd == 0 || sd->p == 0) {
+        SOCKADDR_RELEASE(a, sa);
+        return ACR_EBADF;
+    }
+    cp = (LPIPCSOCK)sd->p;
+    if (cp->rp == 0) {
+        SOCKADDR_RELEASE(a, sa);
+        return WSAENOTCONN;
+    }
+    strcpy(sa->hostname, cp->rp->szAddress);
+    sa->family  = AF_LOCAL;
+    sa->addrlen = ISIZEOF(sa->hostname);
+    sa->iplen   = sa->addrlen;
+    sa->port    = dwCurrentPid;
+    SOCKADDR_RELEASE(a, sa);
+    return rc;
+}
+
+ACR_NET_EXPORT(jint, IpcDescriptor, getremote0)(JNI_STDARGS, jlong sp,
+                                                jbyteArray a)
+{
+    int rc = 0;
+    acr_sd_t       *sd = J2P(sp, acr_sd_t *);
+    acr_sockaddr_t *sa = SOCKADDR_CAST(a);
+    LPIPCSOCK cp;
+
+    if (sa == 0)
+        return rc;
+    memset(sa, 0, sizeof(acr_sockaddr_t));
+    if (sd == 0 || sd->p == 0) {
+        SOCKADDR_RELEASE(a, sa);
+        return ACR_EBADF;
+    }
+    cp = (LPIPCSOCK)sd->p;
+    if (cp->rp == 0) {
+        SOCKADDR_RELEASE(a, sa);
+        return WSAENOTCONN;
+    }
+    strcpy(sa->hostname, cp->rp->szAddress);
+    sa->family  = AF_LOCAL;
+    sa->addrlen = ISIZEOF(sa->hostname);
+    sa->iplen   = sa->addrlen;
+    sa->port    = cp->rp->dwProcessId;
+    SOCKADDR_RELEASE(a, sa);
+    return rc;
 }



Mime
View raw message