commons-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mt...@apache.org
Subject svn commit: r1154448 - in /commons/sandbox/runtime/trunk/src/main/native: Makefile.msc.in os/win32/arch_defs.h os/win32/arch_ipcs.h os/win32/ipcsock.c os/win32/ipcsock.h os/win32/ipcsstream.c
Date Sat, 06 Aug 2011 06:23:25 GMT
Author: mturk
Date: Sat Aug  6 06:23:24 2011
New Revision: 1154448

URL: http://svn.apache.org/viewvc?rev=1154448&view=rev
Log:
Implement windows IPC socket

Added:
    commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_ipcs.h
      - copied, changed from r1149389, commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.h
    commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsstream.c   (with props)
Removed:
    commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.h
Modified:
    commons/sandbox/runtime/trunk/src/main/native/Makefile.msc.in
    commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_defs.h
    commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.c

Modified: commons/sandbox/runtime/trunk/src/main/native/Makefile.msc.in
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/Makefile.msc.in?rev=1154448&r1=1154447&r2=1154448&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/Makefile.msc.in (original)
+++ commons/sandbox/runtime/trunk/src/main/native/Makefile.msc.in Sat Aug  6 06:23:24 2011
@@ -83,6 +83,7 @@ WIN32_SOURCES=\
 	$(TOPDIR)\os\win32\inetsock.c \
 	$(TOPDIR)\os\win32\init.c \
 	$(TOPDIR)\os\win32\ipcsock.c \
+	$(TOPDIR)\os\win32\ipcsstream.c \
 	$(TOPDIR)\os\win32\localsock.c \
 	$(TOPDIR)\os\win32\os.c \
 	$(TOPDIR)\os\win32\path.c \

Modified: commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_defs.h
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_defs.h?rev=1154448&r1=1154447&r2=1154448&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_defs.h (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_defs.h Sat Aug  6 06:23:24 2011
@@ -162,6 +162,7 @@ static __inline int isblank(int c)
 #define WAIT_OBJECT_3          (WAIT_OBJECT_0 + 3)
 #define WAIT_OBJECT_4          (WAIT_OBJECT_0 + 4)
 #define WAIT_OBJECT_M          (WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS)
+#define WAIT_ABANDONED_1       (WAIT_ABANDONED_0 + 1)
 
 /**
  * Definitions matching the KeyAccessRights Java enum

Copied: commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_ipcs.h (from r1149389, commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.h)
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_ipcs.h?p2=commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_ipcs.h&p1=commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.h&r1=1149389&r2=1154448&rev=1154448&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.h (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_ipcs.h Sat Aug  6 06:23:24 2011
@@ -14,10 +14,11 @@
  * limitations under the License.
  */
 
-#include "acr/stdtypes.h"
+#ifndef _ARCH_IPCS_H_
+#define _ARCH_IPCS_H_
+
+#include "acr/ring.h"
 
-#ifndef _IPCSOCK_H_
-#define _IPCSOCK_H_
 
 /*
 
@@ -32,8 +33,8 @@ multiple clients.
 
 Server side:
 
-Server creates a shared memory with header and data buffer.
-Its main purpose is to accept the client connections.
+Server creates a named shared memory segment with header and accept
+data buffer. Its main purpose is to accept the client connections.
 The buffer size reflects the socket api listen function backlog
 and should be big enough to handle the burst load.
 
@@ -42,190 +43,406 @@ DWORD so that the size is equal regarles
 
 Client side:
 
-Client creates a shared memory with the header and data
+Client creates a three shared memory pages with the header and data
 for read and write buffers. By default it uses the pagesize
 (64K) for both header and r/w buffers. This means that client
-should not go beyond 32K for a single read/write size
-for optimal performance.
-
-Connection procedure first opens the server acceptor shared memory
-with the name that represents the address and duplicates the
-sync objects into his address space. It then locks the server
-and when it gots the lock, it writes its connection info into the
-first free accept buffer and unlocks the server so others can
+should not go beyond 64K for a single read/write size
+for optimal performance. Both read and write buffers are mapped
+twice to the two consequtive page addresses. This allows to hanlde
+the cricular buffers effectively without splitting reads or writes
+on buffer flip.
+
+Connection procedure first opens the server shared memory
+with the name that represents the address and duplicates its
+sync objects into his own process. It then waits on the
+server accept semaphore when it gots it, it duplicated its own sync
+object and data buffers into the server process. It then writes its connection
+info into the accept circular buffer and unlocks the server so others can
 continue connect operation. Server at that time gets the signal
 from the newly connected client, locks the accept storage and pulls
-first record as part of accept call. That record becomes the
-initial data for establisking the connection from the server side.
+the record as part of accept call.
+After the new connection data has been read the server creates the
+accept socket and signals the client it can finish the connect procedure.
+
+In case there is error during connect stage on any side the connection
+is rolled back and sync objects are destroyed on both sides.
+
+OOB (Out Of Band) messages:
+
+OOB is half-duplex system with the handshake and uses hConnSync events for
+sending notification to the active peer. When established both sides are in
+the listening mode. Peer sends message and waits for the confirmation that the
+message was read. A separate thread should be used for listening to the OOB
+read events on each side of the connection or multiple connections could be
+used inside the poller. User must provide a callback function for handling the
+messages.
+The default handler discards the readed data and sends empty confirmation
+message.
 
 */
 
+typedef struct IPCSOCK_PDATA {
+    volatile long RecvWait; /* Indicates that the reader is blocking
+                             * inside read call waiting for data.
+                             */
+    volatile long SendWait; /* Indicates that the writter is blocking
+                             * inside write call waiting for readed
+                             * to consume some data.
+                             */
+    volatile long RecvPos;
+    volatile long SendPos;
+} IPCSOCK_PDATA;
+
 /**
- * [shm] Written by the client to the accept data queue
+ * Maximum message size.
+ * Actual message format and content is user defined.
  */
-typedef struct sipc_accept_rec_t
-{
-    DWORD   pid;            /* Process that is connecting
+#define IPCSOCK_MSGSIZE  16384
+
+typedef struct IPCSOCK_MSG {
+    volatile long Id;       /* Message Id.
+                             * Incremented by the sender for each new message
+                             * either data or sync one.
+                             */
+    volatile long RecvWait; /* Indicates that the reader is blocking
+                             * inside read call waiting for data.
+                             */
+    volatile long SendWait; /* Indicates that the writter is blocking
+                             * inside write call waiting for readed
+                             * to consume some data.
+                             */
+    volatile long Readed;
+    volatile long Length;   /* The length of the message in the buffer.
+                             * Reset to zero when the peer reads the buffer.
                              */
-    DWORD   shm;            /* Client shared memory mapping.
-                             * This handle is already duplicated
-                             * to the server process when written
-                             * by the client.
+    BYTE Data[IPCSOCK_MSGSIZE];
+} IPCSOCK_MSG;
+
+
+/**
+ * [shm] Client metadata header
+ * This is separate shared memory block so we can have
+ * clean I/O buffers
+ */
+typedef struct IPCSOCK_CLIENT
+{
+    volatile long nStatus;  /* Connection status
+                             * If nonzero the connection is invalid
+                             * and must be closed immediately.
+                             */
+    DWORD   dwPageSize;     /* Data buffer size.
+                             * This should always reflect the PAGESIZE,
+                             * however the theoretical limit is 1GB
+                             * limited by (LONG_MAX / 2).
+                             * Zero for message type connections.
+                             */
+    IPCSOCK_PDATA dPos[2];
+    DWORD   nBufferMap[2];  /* Client shared memory I/O buffer mappings.
+                             * Those handles are already duplicated
+                             * to the server process when the client connects.
                              * Server uses MapViewOfFile to maps
-                             * this memory into his address space.
+                             * this memory into his address space with client
+                             * read buffer mapped as accepted socket write buffer
+                             * and vice versa.
+                             */
+    DWORD   nSync[4];       /* Read and write sync events.
+                             */
+    DWORD   nProcessLock;   /* Unique client Mutex
+                             * Released only on process termination
+                             * Present only the first time the client connects
+                             * to the server.
+                             */
+    DWORD   dwError;        /* Error code passed from the server to
+                             * to the client and vice versa
                              */
-} sipc_accept_rec_t;
+    IPCSOCK_MSG dMsg[2];    /* Reserved for message type connections.
+                             * Messages are meant to be handled as they are
+                             * written. The sender blocks until the receiver
+                             * handles the messages.
+                             */
+
+} IPCSOCK_CLIENT;
+
+/**
+ * [shm] Accept queue record
+ */
+typedef struct IPCSOCK_ACCEPT {
+    volatile long nStatus;  /* Accept status.
+                             * Initially set to process id of the connecting client.
+                             * Set to zero by the server if accepted.
+                             * If non-zero it contains error code.
+                             * Client must close the connection and
+                             * report the error to the caller.
+                             */
+    DWORD   nClientMeta;    /* Connection metadata map
+                             * Already duplicated to the server
+                             * It points to IPCSOCK_CLIENT and uses 64K
+                             * which is a waste of resources, but we cannot
+                             * allocate less then dwAllocationGranularity.
+                             */
+} IPCSOCK_ACCEPT;
 
 /**
  * [shm] Main server header
  */
-typedef struct sipc_server_header_t
+typedef struct IPCSOCK_SERVER
 {
-    DWORD   pid;            /* Server process id
+    DWORD   dwProcessId;    /* Server process id
                              */
-    DWORD   mutex;          /* Process temination mutex
+    DWORD   nProcessLock;   /* Process temination mutex
                              * Client duplicates that handle to monitor
                              * the server death.
                              */
-    DWORD   areq;           /* Accept request semaphore.
+    DWORD   nReserved;      /* Reserved for future use.
+                             */
+    struct {
+        DWORD   dwSize;     /* Number of records in the accept circular buffer.
+                             * This is effectively a backlog.
+                             */
+        volatile DWORD dwRdPos;
+                             /* accept() call circular buffer read position.
+                             * Incremented when accept reads the record.
+                             */
+        volatile DWORD dwWrPos;
+                             /* accept() call circular buffer write position.
+                             * Maintained by the client and protected using lock
+                             * mutex.
+                             */
+    } Queue;
+
+    DWORD   nAcceptLock;    /* Synchronization Mutex
+                             * Created by server and used to protect the accept
+                             * queue circular buffer. The time either server
+                             * or client spends in the lock is minimal and
+                             * thus it shouldn't affect the performance.
+                             */
+    DWORD   nAcceptSema;    /* Accept posted semaphore.
                              * When ever there is accept() call
-                             * it increments the acount and releases the semaphore
-                             * by one. It then waits for connect requested event
-                             * which client signals after he updates the record.
+                             * it releases the semaphore by one. It then waits for
+                             * connect requested event which client signals after
+                             * he updates the record.
                              */
-    DWORD   creq;           /* Event on which server waits on accept.
+    DWORD   nAcceptSync;    /* Event on which server waits on accept.
                              * Client duplicates that handle and signals after
                              * he writes the connection record.
                              */
-    int     backlog;        /* Backlog size or number of records
+
+    DWORD   dwUnused[7];    /* Reserved for future use.
+                             * Currently acts as a struct alignmet to 64 bytes.
                              */
-    volatile long acount;   /* Number of currently queued accept() calls.
-                             * Incremented by the accept call.
-                             * Decremented when the connect record is pulled from
-                             * the queue.
-                             */
-    volatile long ccount;   /* Number of currently queued connect() calls.
-                             * Incremented by the client after he gots the
-                             * areq semaphore. Client must not continue the
-                             * connect procedure if the result of atomic increment
-                             * is backlog.
-                             * Server decrements that value after connect record
-                             * is removed from the queue.
-                             */
-    volatile long wridx;    /* Index in r[] where the client will
-                             * write the next client connect record.
-                             * Before written it is incremented.
-                             * if the atomic increment returns backlog
-                             * it has to be set to 1 using atomic exchange.
-                             */
-    sipc_accept_rec_t r[0]; /* Accept record circle queue.
-                             * Client which got gemaphore always writes
-                             * at wridx.
+    IPCSOCK_ACCEPT a[0];    /* Accept records circular buffer.
                              * Must be the last struct entry.
                              * For 64K page size it should be
-                             * (65536 - 32) / 8 == 8188
-                             * This means that the backlog is 8190 max
-                             * for a 64K page size.
-                             */
-} sipc_server_header_t;
+                             * (65536 - 64) / 32 == 2045
+                             * This means that the burst number of connections is
+                             * 2044 for a 64K page size (one less then Queue.dwSize)
+                             */
+} IPCSOCK_SERVER;
+
+typedef struct IPCSOCK      IPCSOCK;
+typedef struct IPCSERVER    IPCSERVER;
+typedef struct IPCREMOTE    IPCREMOTE;
+
+typedef struct IPCSOCK*     LPIPCSOCK;
+typedef struct IPCSERVER*   LPIPCSERVER;
+typedef struct IPCREMOTE*   LPIPCREMOTE;
 
 /**
- * [shm] Client header data
- * This is initial data in the clients shared memory set-up
- * before connect call.
+ * The list of all remote processes this process
+ * has a connection established.
  */
-typedef struct sipc_client_header_t
+struct IPCREMOTE
 {
-    DWORD   mutex;          /* Client's process mutex.
-                             * This is ususally never destroyed
-                             * for the process life-time.
-                             * Signal on that mutex means that
-                             * the client's process has died.
-                             */
-    DWORD   aack;           /* Accept acknowledge signal.
-                             * Client blocks on this signal
-                             * after he writes the connection record.
-                             * Server sets this event after it creates
-                             * the connection or fails and it that case
-                             * the error is set to non-zero.
-                             */
-    DWORD   sync[4];        /* Read  sync events
-                             * They are write events on the server side.
-                             * First signals that there are data to read
-                             * and second read readines.
-                             * Write sync events
-                             * They are read events on the server side.
-                             * First signals that there are data written
-                             * and second write ready.
-                             */
-    DWORD   rdlen;          /* Size of the read buffer */
-    DWORD   wrlen;          /* Size of the write buffer */
-    DWORD   error;          /* Accept status set by server
-                             * so that client can get the
-                             * error code if accept is refused.
-                             */
-    DWORD   sessionid;      /* Session id number set by the server
-                             * when the connection is established.
-                             */
+    ACR_RING_ENTRY(IPCREMOTE)   rLink;
+    IPCSOCK_SERVER *s;              /* [c] Server shared memory mapping */
+    char            szAddress[256]; /* [c] Server address               */
+    DWORD           dwProcessId;    /* [a] Client's process id
+                                     *     Used as a key for server's registration
+                                     *     of multiple clients.
+                                     */
+    HANDLE          hProcessLock;   /* [a] Remote process Mutex         */
+    HANDLE          dProcessLock;   /* [c] Duplicated client process Mutex
+                                     *     into the server's process.
+                                     */
+    HANDLE          hServerMeta;    /* [c] Remote process Metadata      */
+    HANDLE          hServerProc;
+    HANDLE          hAcceptSema;
+    HANDLE          hAcceptLock;
+    HANDLE          hAcceptSync;
+    volatile long   nReferences;    /* Number of connections.
+                                     * When decremented to zero, the handles are
+                                     * destroyed and connection to the server
+                                     * process is closed
+                                     */
+};
 
-} sipc_client_header_t;
+struct IPCSOCK
+{
+    ACR_RING_ENTRY(IPCSOCK) rLink;
+    LPIPCSERVER         sp;            /* Set only for accepted connections    */
+    LPIPCREMOTE         rp;            /* Set only for client side connections */
+    IPCSOCK_CLIENT     *c;
+    DWORD               dwPageSize;    /* Allocation Page size */
+    DWORD               dwTimeout;     /* Socket timeout */
+    DWORD               flags;         /* Socket flags */
+    void               *pAttachment;   /* Custom user object */
+    HANDLE              hClientMeta;   /* Metadata mapping   */
+    HANDLE              hBufferMap[2]; /* IO Buffer mappings
+                                        * revrted on server side
+                                        */
+    HANDLE              hSync[4];
+    LPBYTE              pRdbData;
+    LPBYTE              pWrbData;
+    /* Read/Write positions
+     * They are pointers to shared memory p[] array
+     */
+    IPCSOCK_PDATA      *Rd;
+    IPCSOCK_PDATA      *Wr;
+    IPCSOCK_MSG        *Mr;
+    IPCSOCK_MSG        *Mw;
+    volatile long       nState;
+    volatile long       nReferences;
+    long                nStatus;
+    volatile long      *pStatus;
+};
 
 /**
- * [shm] Read/Write buffer data
+ * [mem] Server socket structure
  */
-typedef struct sipc_data_buffer_t
+struct IPCSERVER
 {
-    volatile long rd;       /* Currently reading from the buffer  */
-    volatile long wr;       /* Currently writting to the buffer   */
-    volatile long pos;      /* Current position in data buffer    */
-    volatile long len;      /* Current data len from pos          */
-    BYTE          d[0];
-} sipc_data_buffer_t;
-
-/** Client Connection event mappings
- */
-#define CC_RD   0       /* Client has Read the Data.
-                         * Set by the client after read call
-                         * finished to indicate that some data was
-                         * read. The server waits on that event when his
-                         * send buffer is full.
+    /* A ring containing all of the connections that are active.
+     *
+     */
+    ACR_RING_HEAD(IPCSOCK_RING, IPCSOCK) rConnections;
+    IPCSOCK_SERVER     *s;
+    HANDLE              hServerMap;
+    HANDLE              hAcceptSema;
+    HANDLE              hAcceptSync;
+    HANDLE              hAcceptLock;
+    volatile long       nConnections;
+    DWORD               dwFlags;
+};
+
+/**
+ * Current socket state flags
+ *
+/**
+ * Buffer state flags
+ */
+#define IPCSOCK_READ (1L)
+#define IPCSOCK_SEND (2L)
+
+#define IPCSOCK_MODE_STREAM 0
+#define IPCSOCK_MODE_MSG    1
+
+#define IPCSOCK_SEND_ALL    1
+#define IPCSOCK_FLUSH       2
+
+/**
+ * Connection status values that
+ * are valid for IPCSOCK_CLIENT->nStatus filed
+ */
+#define IPCSOCK_OK          0
+#define IPCSOCK_SHUTDOWN    1   /* Peer has shutdown the connection
+                                 * Only read might return what's already
+                                 * in the buffer
+                                 */
+#define IPCSOCK_CLOSED      2   /* Connection closed.
+                                 * Peer must close this connection.
+                                 */
+#define IPCSOCK_ABORTED     4   /* Connection aborted during accept sequence
+                                 * Connection is invalid and must be closed.
+                                 */
+#define IPCSOCK_CONNECTING  8   /* Connection is currently in the connect
+                                 * stage. Used to prevent close/shutdown.
+                                 */
+
+/** Connection event mappings
+ */
+#define IPCSOCK_RDR  0  /* Read Data Ready.
+                         * Receiver block on this event when his
+                         * read buffer is empty.
                          */
-#define CC_RTR  1       /* Client is Ready To Receive data.
-                         * Client blocks on that event when there is no
-                         * data in the read queue.
+#define IPCSOCK_RTR  1  /* Ready To Receive data.
+                         * Set by the reciever when he reads some data
+                         * from the buffer so that writer blocking on
+                         * the RTT can continue writting.
                          */
-#define CC_TD   2       /* Client has Transmit the Data.
-                         * Set by the client after write call finishes
+#define IPCSOCK_TDR  2  /* Transmit Data Ready.
+                         * Set by the writter after write call finishes
                          * to indicate that some data was written.
-                         * Server blocks on this event when the receive
-                         * buffer is empty.
+                         * Peer blocks on this event (mapped as RDR) when his
+                         * receive buffer is empty.
                          */
-#define CC_RTT  3       /* Client is Ready To Transmit data.
-                         * Client blocks on that event when the write
-                         * buffer is full and server has not read the data.
-                         * after server reads the data it signals this
-                         * evet so client can continue with writting.
+#define IPCSOCK_RTT  3  /* Ready To Transmit data.
+                         * Writter blocks on that event when the write
+                         * buffer is full and peer has not read the data.
+                         * After peer reads the data it signals this
+                         * evet so writter can continue with writting.
                          */
 
+#if defined(__cplusplus)
+extern "C" {
+#endif
+
 /**
- * Server Sonnection event mappings
- * Those are the same events as for client
- * but their idexes have different meaning.
+ * Read the data
+ * @param pSocket the socket to use.
+ * @param pData output data buffer.
+ * @param nSize size of the output buffer.
+ * @return number of bytes read that can be lower then
+ *         nSize or -1 on error.
+ * @notice The function will block if input buffer is
+ *         empty until the peer writes some data or until
+ *         the socket times out.
  */
-#define SC_RD   3       /* Server has Read the Data.
-                         * Signal the client to continue writting.
-                         */
-#define SC_RTR  2       /* Block if read buffer is empty.
-                         * Client sets the CC_TD to indicate that he wrote
-                         * some data.
-                         */
-#define SC_TD   1       /* Server has wrote the data.
-                         * Set the client CC_RTR event to singlal the client
-                         * to continue reading.
-                         */
-#define SC_RTT  0       /* Wait if the write buffer is full
-                         * Client sets CC_RD when he read some data so
-                         * we can continue writting.
-                         */
+int
+AcrIpcRead(LPIPCSOCK pSocket, void *pData, int nSize);
+
+/**
+ * Write the data.
+ * @param pSocket the socket to use.
+ * @param pData input data buffer
+ * @param nSize number of bytes to write
+ * @return number of bytes written which can be lower
+ *         then nSize or -1 on error.
+ * @notice The function will block if output buffer is
+ *         full until the peer read some data or until
+ *         the socket times out.
+ */
+int
+AcrIpcWrite(LPIPCSOCK pSocket, const void *pData, int nSize, int nFlags);
+
+/**
+ * Send the message.
+ * @param pSocket the socket to use.
+ * @param pData input data buffer
+ * @param nSize number of bytes to send
+ * @param nFlags send flags. If set to IPCSOCK_FLUSH the
+ *        operation will block until the peer reads the
+ *        entire message send.
+ * @return zero on success or error code.
+ * @notice The function will block if output buffer is
+ *         full until the peer reads the message or until
+ *         the socket times out.
+ */
+int AcrIpcSend(LPIPCSOCK pSocket, const void *pData, int nSize, int nFlags);
+
+/**
+ * Get the data messages
+ * @param pSocket the socket to use.
+ * @param pData output data buffer.
+ * @param nSize size of the output buffer.
+ * @return number of bytes read that can be lower then
+ *         nSize or -1 on error.
+ * @notice The function will block if input buffer is
+ *         empty until the peer writes some data or until
+ *         the socket times out.
+ */
+int AcrIpcRecv(LPIPCSOCK pSocket, void *pData, int nSize);
 
-#endif /* _IPCSOCK_H_ */
+#if defined(__cplusplus)
+}
+#endif
+#endif /* _ARCH_IPCS_H_ */

Modified: commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.c
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.c?rev=1154448&r1=1154447&r2=1154448&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.c Sat Aug  6 06:23:24 2011
@@ -14,35 +14,1702 @@
  * limitations under the License.
  */
 
-#include "acr/jnitypes.h"
-#include "acr/error.h"
+#include "acr/jniapi.h"
+#include "acr/debug.h"
 #include "acr/memory.h"
 #include "acr/netapi.h"
-#include "acr/unsafe.h"
 #include "acr/port.h"
-#include "ipcsock.h"
+#include "acr/unsafe.h"
+#include "acr/misc.h"
+#include "arch_defs.h"
 #include "arch_opts.h"
 #include "arch_sync.h"
+#include "arch_ipcs.h"
 
-#define SOCKADDR_CAST(BA) \
-    (acr_sockaddr_t *)AcrGetArrayCritical(env, (BA))
-#define SOCKADDR_RELEASE(BA, SA) \
-    AcrReleaseArrayCritical(env, (BA), (SA))
+#pragma intrinsic(_InterlockedAnd)
+#pragma intrinsic(_InterlockedOr)
 
 /* Never destroyed
  * Susessfull Wait on that handle means the process died.
  * This is single instance Mutex which either client or server duplicates.
  */
-extern HANDLE acr_alived_mutex;
-static HANDLE current_process;
-static DWORD  current_pid;
+static HANDLE           gProcessLock;
+static HANDLE           gThisProcess;
+static DWORD            dwCurrentPid;
+static CRITICAL_SECTION gSynchronized;
+extern LPSYSTEM_INFO    acr_osinf;
+
+#define SOCKADDR_CAST(BA) \
+    (acr_sockaddr_t *)AcrGetArrayCritical(env, (BA))
+#define SOCKADDR_RELEASE(BA, SA) \
+    AcrReleaseArrayCritical(env, (BA), (SA))
+
+#define IS_NULL_HANDLE(h)   ((h) == 0)
+#define DW2H(I)             (HANDLE)UIntToPtr((I))
+#define H2DW(I)             PtrToUint((I))
+#define SAFE_UNMAP1(M)              \
+    if ((M) != 0) {                 \
+        UnmapViewOfFile(M);         \
+        (M) = 0;                    \
+    } else (void)0
+
+#define SAFE_UNMAP2(M, S)           \
+    if ((M) != 0) {                 \
+        UnmapViewOfFile((M) + (S)); \
+        UnmapViewOfFile((M));       \
+        (M) = 0;                    \
+    } else (void)0
+
+#if defined(IPC_DEFAULT_SECURITY)
+#define IPCSECURITY_TOKEN   0
+#else
+#define IPCSECURITY_TOKEN   SaWithNullDacl()
+#endif
+
+#define PAGESIZE            acr_osinf->dwAllocationGranularity
+#define IPCSOCK_ACK_TIMEOUT 60000
+
+typedef struct IPCREMOTEREGISTRY {
+    ACR_RING_HEAD(IPCRSR_RING, IPCREMOTE) rRemotes;
+
+} IPCREMOTEREGISTRY;
+
+typedef struct IPCCONNECTIONS
+{
+    ACR_RING_HEAD(IPCCONN_RING, IPCSOCK) rConnections;
+    volatile long       nConnections;
+
+} IPCCONNECTIONS;
 
-ACR_NET_EXPORT(void, IpcEndpoint, init0)(JNI_STDARGS)
+static IPCREMOTEREGISTRY gLiveRemotes;
+static IPCREMOTEREGISTRY gDeadRemotes;
+
+static PSECURITY_ATTRIBUTES
+SaWithNullDacl(void)
+{
+    static SECURITY_ATTRIBUTES sa = { 0 };
+    PSECURITY_DESCRIPTOR pSD;
+
+    if (sa.nLength != 0) {
+        /* Return cached entry */
+        return &sa;
+    }
+    sa.nLength = sizeof(SECURITY_ATTRIBUTES);
+    sa.lpSecurityDescriptor = 0;
+
+    pSD = calloc(1, SECURITY_DESCRIPTOR_MIN_LENGTH);
+    if (pSD == 0) {
+        SetLastError(ERROR_OUTOFMEMORY);
+        return &sa;
+    }
+    if (!InitializeSecurityDescriptor(pSD, SECURITY_DESCRIPTOR_REVISION))
+        goto cleanup;
+    if (!SetSecurityDescriptorDacl(pSD, TRUE, (PACL)0, FALSE))
+        goto cleanup;
+    sa.lpSecurityDescriptor = pSD;
+    sa.bInheritHandle       = FALSE;
+
+    return &sa;
+cleanup:
+    AcrFree(pSD);
+    return &sa;
+}
+
+static __inline __int64
+GetCurrentMilliseconds(void)
+{
+    FILETIME now;
+    ULARGE_INTEGER ui;
+
+    GetSystemTimeAsFileTime(&now);
+    ui.LowPart  = now.dwLowDateTime;
+    ui.HighPart = now.dwHighDateTime;
+    return ui.QuadPart / 10000L;
+}
+
+static __inline void
+CloseRemoteHandle(HANDLE hRemoteProcess, DWORD dwHandle)
+{
+    if (hRemoteProcess != 0 && dwHandle != 0) {
+        DuplicateHandle(hRemoteProcess, DW2H(dwHandle),
+                        hRemoteProcess, 0, 0,
+                        FALSE, DUPLICATE_CLOSE_SOURCE);
+    }
+}
+
+static __inline void
+AcquireMutex(HANDLE hMutex)
+{
+    WaitForSingleObject(hMutex, INFINITE);
+}
+
+ACR_NET_EXPORT(jint, IpcEndpoint, init0)(JNI_STDARGS)
 {
+    static int inited = 0;
+
+    if (inited++)
+        return WSAEALREADY;
     /* Catch some common immutable variables
      * which won't change for the process life-time.
      */
-    current_process = GetCurrentProcess();
-    current_pid     = GetCurrentProcessId();
+    gThisProcess = GetCurrentProcess();
+    dwCurrentPid = GetCurrentProcessId();
+    gProcessLock = CreateMutex(IPCSECURITY_TOKEN, TRUE, 0);
+    if (gProcessLock == 0) {
+        inited = 0;
+        return GetLastError();
+    }
+    ACR_RING_INIT(&gLiveRemotes.rRemotes, IPCREMOTE, rLink);
+    ACR_RING_INIT(&gDeadRemotes.rRemotes, IPCREMOTE, rLink);
+    InitializeCriticalSectionAndSpinCount(&gSynchronized, 4000);
+    return 0;
+}
+
+/**
+ * Opens connection to the remote server and adds it to
+ * the global list of all opened remotes.
+ */
+static LPIPCREMOTE
+AcrIpcRemoteOpen(LPCSTR szAddress)
+{
+    int rc;
+    IPCREMOTE *pr;
+
+    EnterCriticalSection(&gSynchronized);
+    ACR_RING_FOREACH(pr, &gLiveRemotes.rRemotes, IPCREMOTE, rLink) {
+        if (strcmp(pr->szAddress, szAddress) == 0) {
+            /* Already registered */
+            InterlockedIncrement(&pr->nReferences);
+            LeaveCriticalSection(&gSynchronized);
+            return pr;
+        }
+    }
+    if (!ACR_RING_EMPTY(&gDeadRemotes.rRemotes, IPCREMOTE, rLink)) {
+        pr = ACR_RING_FIRST(&gDeadRemotes.rRemotes);
+        ACR_RING_REMOVE(pr, rLink);
+    }
+    else {
+        pr = (LPIPCREMOTE)calloc(1, sizeof(IPCREMOTE));
+        if (pr == 0) {
+            LeaveCriticalSection(&gSynchronized);
+            SetLastError(ERROR_OUTOFMEMORY);
+            return 0;
+        }
+        ACR_RING_ELEM_INIT(pr, rLink);
+    }
+    pr->dwProcessId = 0;
+    pr->nReferences = 1;
+    if ((pr->hServerMeta = OpenFileMappingA(FILE_MAP_ALL_ACCESS, FALSE, szAddress)) == 0)
+        goto failed;
+    if ((pr->s = MapViewOfFile(pr->hServerMeta, FILE_MAP_ALL_ACCESS, 0, 0, 0)) == 0)
+        goto failed;
+    printf("[client] Opened server mapping for process %d of size %d\n", pr->s->dwProcessId, pr->s->Queue.dwSize);
+    if ((pr->hServerProc = OpenProcess(STANDARD_RIGHTS_ALL | PROCESS_DUP_HANDLE,
+                                       FALSE, pr->s->dwProcessId)) == 0)
+        goto failed;
+    if (!DuplicateHandle(pr->hServerProc, DW2H(pr->s->nProcessLock),
+                         gThisProcess, &pr->hProcessLock,
+                         0, FALSE, DUPLICATE_SAME_ACCESS) ||
+        !DuplicateHandle(pr->hServerProc, DW2H(pr->s->nAcceptSema),
+                         gThisProcess, &pr->hAcceptSema,
+                         0, FALSE, DUPLICATE_SAME_ACCESS) ||
+        !DuplicateHandle(pr->hServerProc, DW2H(pr->s->nAcceptSync),
+                         gThisProcess, &pr->hAcceptSync,
+                         0, FALSE, DUPLICATE_SAME_ACCESS) ||
+        !DuplicateHandle(pr->hServerProc, DW2H(pr->s->nAcceptLock),
+                         gThisProcess, &pr->hAcceptLock,
+                         0, FALSE, DUPLICATE_SAME_ACCESS))
+        goto failed;
+    strcpy(pr->szAddress, szAddress);
+    ACR_RING_INSERT_TAIL(&gLiveRemotes.rRemotes, pr, IPCREMOTE, rLink);
+    /* TODO: Map handles
+     */
+    LeaveCriticalSection(&gSynchronized);
+    return pr;
+failed:
+    rc = GetLastError();
+
+    SAFE_CLOSE_HANDLE(pr->hServerProc);
+    SAFE_CLOSE_HANDLE(pr->hAcceptSema);
+    SAFE_CLOSE_HANDLE(pr->hAcceptSync);
+    SAFE_CLOSE_HANDLE(pr->hAcceptLock);
+    SAFE_CLOSE_HANDLE(pr->hProcessLock);
+    SAFE_CLOSE_HANDLE(pr->hServerMeta);
+    SAFE_UNMAP1(pr->s);
+
+
+    ACR_RING_INSERT_TAIL(&gDeadRemotes.rRemotes, pr, IPCREMOTE, rLink);
+    LeaveCriticalSection(&gSynchronized);
+
+    SetLastError(rc);
+    return 0;
+}
+
+/**
+ * Creates a registration with the remote client and adds it to the
+ * the global list of all connected remotes.
+ */
+static LPIPCREMOTE
+AcrIpcRemoteAttach(DWORD dwProcessId)
+{
+    IPCREMOTE *pr;
+
+    EnterCriticalSection(&gSynchronized);
+    ACR_RING_FOREACH(pr, &gLiveRemotes.rRemotes, IPCREMOTE, rLink) {
+        if (pr->dwProcessId == dwProcessId) {
+            /* Already registered */
+            InterlockedIncrement(&pr->nReferences);
+            LeaveCriticalSection(&gSynchronized);
+            return pr;
+        }
+    }
+    if (!ACR_RING_EMPTY(&gDeadRemotes.rRemotes, IPCREMOTE, rLink)) {
+        pr = ACR_RING_FIRST(&gDeadRemotes.rRemotes);
+        ACR_RING_REMOVE(pr, rLink);
+    }
+    else {
+        pr = (LPIPCREMOTE)calloc(1, sizeof(IPCREMOTE));
+        if (pr == 0) {
+            LeaveCriticalSection(&gSynchronized);
+            SetLastError(ERROR_OUTOFMEMORY);
+            return 0;
+        }
+        ACR_RING_ELEM_INIT(pr, rLink);
+    }
+    pr->nReferences = 1;
+    pr->dwProcessId = dwProcessId;
+    ACR_RING_INSERT_TAIL(&gLiveRemotes.rRemotes, pr, IPCREMOTE, rLink);
+
+    LeaveCriticalSection(&gSynchronized);
+    return pr;
+}
+
+static void
+AcrIpcRemoteUnref(LPIPCREMOTE pRemote)
+{
+    if (pRemote == 0)
+        return;
+    EnterCriticalSection(&gSynchronized);
+    if (InterlockedDecrement(&pRemote->nReferences) == 0) {
+        SAFE_CLOSE_HANDLE(pRemote->hAcceptSema);
+        SAFE_CLOSE_HANDLE(pRemote->hAcceptSync);
+        SAFE_CLOSE_HANDLE(pRemote->hAcceptLock);
+        SAFE_CLOSE_HANDLE(pRemote->hProcessLock);
+        SAFE_CLOSE_HANDLE(pRemote->hServerMeta);
+        SAFE_UNMAP1(pRemote->s);
+
+        ACR_RING_REMOVE(pRemote, rLink);
+        ACR_RING_INSERT_TAIL(&gDeadRemotes.rRemotes, pRemote, IPCREMOTE, rLink);
+    }
+    LeaveCriticalSection(&gSynchronized);
+}
+
+ACR_NET_EXPORT(jlong, IpcServerEndpoint, create0)(JNI_STDARGS, jint flags)
+{
+    IPCSERVER *sp;
+
+    if ((sp = (LPIPCSERVER)calloc(1, sizeof(IPCSERVER))) == 0) {
+        /* Allocation failed */
+        ACR_THROW_NET_ERROR(ACR_ENOMEM);
+        return 0;
+    }
+    ACR_RING_INIT(&sp->rConnections, IPCSOCK, rLink);
+    sp->dwFlags = flags;
+    return P2J(sp);
+}
+
+ACR_NET_EXPORT(jint, IpcServerEndpoint, bind0)(JNI_STDARGS, jlong fp,
+                                               jbyteArray cb, jint backlog)
+{
+    int rc = 0;
+    DWORD dwShareLen;
+    DWORD dwShareSiz = backlog * sizeof(IPCSOCK_ACCEPT) + sizeof(IPCSOCK_SERVER);
+    acr_sockaddr_t *ca = SOCKADDR_CAST(cb);
+    LPIPCSERVER     sp = J2P(fp, LPIPCSERVER);
+    dwShareLen = ACR_ALIGN(dwShareSiz, PAGESIZE);
+
+    if (sp->hServerMap != 0) {
+        SOCKADDR_RELEASE(cb, ca);
+        return WSAEISCONN;
+    }
+    sp->hServerMap = CreateFileMappingA(INVALID_HANDLE_VALUE,
+                                        IPCSECURITY_TOKEN,
+                                        PAGE_READWRITE,
+                                        0, dwShareLen,
+                                        ca->hostname);
+    if (IS_NULL_HANDLE(sp->hServerMap))
+        goto failed;
+    if ((sp->s = MapViewOfFile(sp->hServerMap, FILE_MAP_ALL_ACCESS, 0, 0, 0)) == 0 ||
+        (sp->hAcceptSema = CreateSemaphore(IPCSECURITY_TOKEN, 0, backlog, 0)) == 0 ||
+        (sp->hAcceptSync = CreateEvent(IPCSECURITY_TOKEN, FALSE, FALSE, 0)) == 0 ||
+        (sp->hAcceptLock = CreateMutex(IPCSECURITY_TOKEN, FALSE, 0)) == 0)
+        goto failed;
+    sp->s->Queue.dwSize   = backlog;
+    sp->s->dwProcessId    = dwCurrentPid;
+    sp->s->nProcessLock   = H2DW(gProcessLock);
+    sp->s->nAcceptSema    = H2DW(sp->hAcceptSema);
+    sp->s->nAcceptSync    = H2DW(sp->hAcceptSync);
+    sp->s->nAcceptLock    = H2DW(sp->hAcceptLock);
+
+    SOCKADDR_RELEASE(cb, ca);
+    return 0;
+failed:
+    rc = GetLastError();
+    if (sp->s != 0) {
+        UnmapViewOfFile(sp->s);
+        sp->s = 0;
+    }
+    SAFE_CLOSE_HANDLE(sp->hServerMap);
+    SAFE_CLOSE_HANDLE(sp->hAcceptSema);
+    SAFE_CLOSE_HANDLE(sp->hAcceptSync);
+    SAFE_CLOSE_HANDLE(sp->hAcceptLock);
+    SOCKADDR_RELEASE(cb, ca);
+
+    return rc;
+}
+
+static LPBYTE
+AcrIoBufMap(HANDLE hMap, DWORD dwSize)
+{
+    int nAttempts = 0;
+    LPBYTE pBase  = 0;
+
+    if (IS_NULL_HANDLE(hMap)) {
+        SetLastError(ERROR_INVALID_HANDLE);
+        return 0;
+    }
+    while (nAttempts++ < 1000) {
+        if (pBase != 0)
+            VirtualFree(pBase,  0, MEM_RELEASE);
+        pBase = MapViewOfFileEx(hMap, FILE_MAP_ALL_ACCESS, 0, 0, dwSize, pBase);
+        if (pBase == 0) {
+            /* Initial map failed
+             * We cannot continue
+             */
+            return 0;
+        }
+        /* Map the same section to the next address page
+         * This allows to write beyond the end of buffer space
+         */
+        if (MapViewOfFileEx(hMap, FILE_MAP_ALL_ACCESS, 0, 0, dwSize, pBase + dwSize) != 0)
+            return pBase; /* Mapped two in a row */
+        UnmapViewOfFile(pBase);
+        printf("[debug] Mapping again %d\n", nAttempts);
+        /* Yield the processor */
+        SwitchToThread();
+        pBase = VirtualAlloc(0, 2 * dwSize, MEM_RESERVE, PAGE_READWRITE);
+        if (pBase == 0)
+            return 0;
+    }
+    SetLastError(ERROR_NOT_ENOUGH_MEMORY);
+    return 0;
+}
+
+static __inline BOOL
+IsAcceptQueueFull(IPCSOCK_SERVER *s)
+{
+    return ((s->Queue.dwWrPos + 1) % s->Queue.dwSize) == s->Queue.dwRdPos;
+}
+
+static __inline BOOL
+IsAcceptQueueEmpty(IPCSOCK_SERVER *s)
+{
+    return s->Queue.dwWrPos == s->Queue.dwRdPos;
+}
+
+static __inline void
+ApcIpcUnref(LPIPCSOCK cp)
+{
+    if (InterlockedDecrement(&cp->nReferences) == 0) {
+        int rc = GetLastError();
+        SAFE_UNMAP2(cp->pRdbData, cp->dwPageSize);
+        SAFE_UNMAP2(cp->pWrbData, cp->dwPageSize);
+        SAFE_UNMAP1(cp->c);
+        AcrIpcRemoteUnref(cp->rp);
+        AcrFree(cp);
+        SetLastError(rc);
+    }
+}
+
+ACR_NET_EXPORT(jlong, IpcServerEndpoint, accept0)(JNI_STDARGS, jlong fp,
+                                                  jint timeout)
+
+{
+    int   i, rc = 0;
+    DWORD ws;
+    HANDLE hProcessLock = 0;
+    HANDLE hClientMeta;
+    IPCSOCK_ACCEPT *a;
+    IPCSOCK   *cp  = 0;
+    INT64 nTimeup  = 0;
+    LPIPCSERVER sp = J2P(fp, LPIPCSERVER);
+
+    if (timeout != -1 && timeout != 0) {
+        /* Get "future" timeout */
+        nTimeup = GetCurrentMilliseconds() + timeout;
+    }
+retry:
+    AcquireMutex(sp->hAcceptLock);
+    if (IsAcceptQueueFull(sp->s)) {
+        /* Reached the backlog limit.
+         */
+        ReleaseMutex(sp->hAcceptLock);
+        SetLastError(WSAEMFILE);
+        return 0;
+    }
+    ReleaseMutex(sp->hAcceptLock);
+    /* Start ACCEPT sequence by posting to the semaphore.
+     * We do it once for each accept call.
+     */
+    if (!ReleaseSemaphore(sp->hAcceptSema, 1, 0)) {
+        return 0;
+    }
+again:
+    printf("[server] Waiting on accept ...\n");
+    /* Wait for a client connect */
+    ws = WaitForSingleObject(sp->hAcceptSync, timeout);
+    printf("[server] Waiting on accept : %d\n", ws);
+    switch (ws) {
+        case WAIT_OBJECT_0:
+            /* Client signaled there is a new
+             * valid connect record
+             */
+        break;
+        case WAIT_TIMEOUT:
+            /* Timeout */
+            printf("[server] accept timeout\n");
+            if (timeout == 0)
+                SetLastError(WSAEWOULDBLOCK);
+            else
+                SetLastError(WSAETIMEDOUT);
+            return 0;
+        break;
+        case WAIT_FAILED:
+            return 0;
+        break;
+        default:
+            /* Error!  */
+            printf("[server] illegal accept wait result\n");
+            SetLastError(rc);
+            return 0;
+        break;
+    }
+    AcquireMutex(sp->hAcceptLock);
+    if (IsAcceptQueueEmpty(sp->s)) {
+        /* Some other thread accepted the signaled connection
+         * Can happen if multiple threads are accepting.
+         */
+        if ((timeout == -1) || ((nTimeup > 0) && (nTimeup > GetCurrentMilliseconds()))) {
+            ReleaseMutex(sp->hAcceptLock);
+            printf("[server] Retrying ...\n");
+            goto again;
+        }
+        /* Timeout reached */
+        SetLastError(WSAETIMEDOUT);
+        return 0;
+    }
+    /* Do not reorder memory access
+     * Probably not needed
+     */
+    MemoryBarrier();
+    i = sp->s->Queue.dwRdPos++;
+    sp->s->Queue.dwRdPos %= sp->s->Queue.dwSize;
+    ReleaseMutex(sp->hAcceptLock);
+
+    a = sp->s->a + i;
+    printf("[server] Processing accept for client %d:%d\n", i, a->nStatus);
+    hClientMeta  = DW2H(a->nClientMeta);
+    if (a->nStatus == 0 || hClientMeta == 0) {
+        InterlockedExchange(&a->nStatus, WSAENOTSOCK);
+        /* XXX: Should we return faulty connect attempt
+         * to the caller?
+         */
+        if ((timeout == -1) || ((nTimeup > 0) && (nTimeup > GetCurrentMilliseconds()))) {
+            printf("[server] Restarting ...\n");
+            goto retry;
+        }
+        rc = WSAETIMEDOUT;
+        goto finally;
+    }
+    if ((cp = (LPIPCSOCK)calloc(1, sizeof(IPCSOCK))) == 0) {
+        /* Allocation failed */
+        rc = ERROR_OUTOFMEMORY;
+        goto finally;
+    }
+    if ((cp->rp = AcrIpcRemoteAttach(a->nStatus)) == 0) {
+        /* Allocation failed */
+        rc = ERROR_OUTOFMEMORY;
+        goto finally;
+    }
+    if ((cp->c = MapViewOfFile(hClientMeta, FILE_MAP_ALL_ACCESS, 0, 0, 0)) == 0) {
+        rc = GetLastError();
+        goto failed;
+    }
+    hProcessLock = DW2H(cp->c->nProcessLock);
+    if (hProcessLock == 0) {
+        if (cp->rp->hProcessLock == 0) {
+            /* Missing handle */
+            rc = WSAENOTSOCK;
+            goto failed;
+        }
+    }
+    else {
+        if (cp->rp->hProcessLock == 0)
+            cp->rp->hProcessLock = hProcessLock;
+        hProcessLock = 0;
+    }
+    cp->pStatus      = &cp->c->nStatus;
+    cp->dwPageSize   = cp->c->dwPageSize;
+    cp->dwTimeout    = timeout;
+    cp->nReferences  = 1;
+    cp->hClientMeta  = hClientMeta;
+    for (i = 0; i < 4; i++) {
+        if (IS_NULL_HANDLE(cp->c->nSync[i])) {
+            /* This should never happen */
+            rc = WSAENOTSOCK;
+            goto failed;
+
+        }
+    }
+    /* Signals are reverse mapped RRD <-> TDR and RTR <-> RTT */
+    cp->hSync[IPCSOCK_RDR] = DW2H(cp->c->nSync[IPCSOCK_TDR]);
+    cp->hSync[IPCSOCK_RTR] = DW2H(cp->c->nSync[IPCSOCK_RTT]);
+    cp->hSync[IPCSOCK_TDR] = DW2H(cp->c->nSync[IPCSOCK_RDR]);
+    cp->hSync[IPCSOCK_RTT] = DW2H(cp->c->nSync[IPCSOCK_RTR]);
+    if (cp->dwPageSize != 0) {
+        /* Map IO buffers for stream connections */
+        cp->hBufferMap[0] = DW2H(cp->c->nBufferMap[1]);
+        cp->hBufferMap[1] = DW2H(cp->c->nBufferMap[0]);
+        /* Map I/O circular buffers */
+        cp->pRdbData = AcrIoBufMap(cp->hBufferMap[0], cp->dwPageSize);
+        if (cp->pRdbData == 0) {
+            rc = GetLastError();
+            goto failed;
+        }
+        cp->pWrbData = AcrIoBufMap(cp->hBufferMap[1], cp->dwPageSize);
+        if (cp->pWrbData == 0) {
+            rc = GetLastError();
+            goto failed;
+        }
+    }
+    /* *** ACCEPTED ***
+     * Reset id to zero so that client can continue.
+     */
+    InterlockedExchange(&a->nStatus, IPCSOCK_OK);
+    InterlockedExchange(cp->pStatus, IPCSOCK_OK);
+    SetEvent(cp->hSync[IPCSOCK_TDR]);
+    cp->Rd = &cp->c->dPos[1];
+    cp->Wr = &cp->c->dPos[0];
+    cp->Mr = &cp->c->dMsg[1];
+    cp->Mw = &cp->c->dMsg[0];
+
+
+    /* Link the connection with the server rec */
+    cp->sp = sp;
+    ACR_RING_ELEM_INIT(cp, rLink);
+    EnterCriticalSection(&gSynchronized);
+    ACR_RING_INSERT_TAIL(&sp->rConnections, cp, IPCSOCK, rLink);
+    InterlockedIncrement(&sp->nConnections);
+    LeaveCriticalSection(&gSynchronized);
+    return P2J(cp);
+
+failed:
+    InterlockedExchange(&a->nStatus, WSAECONNREFUSED);
+    InterlockedExchange(cp->pStatus, IPCSOCK_ABORTED);
+    if (cp->c != 0)
+        cp->c->dwError = rc;
+    if (cp->hSync[IPCSOCK_TDR] != 0)
+        SetEvent(cp->hSync[IPCSOCK_TDR]);
+    for (i = 0; i < 4; i++) {
+        SAFE_CLOSE_HANDLE(cp->hSync[i]);
+    }
+    for (i = 0; i < 2; i++) {
+        SAFE_CLOSE_HANDLE(cp->hBufferMap[i]);
+    }
+    SAFE_UNMAP2(cp->pRdbData, cp->dwPageSize);
+    SAFE_UNMAP2(cp->pWrbData, cp->dwPageSize);
+    SAFE_UNMAP1(cp->c);
+    AcrIpcRemoteUnref(cp->rp);
+    AcrFree(cp);
+finally:
+    CloseHandle(hProcessLock);
+    CloseHandle(hClientMeta);
+    SetLastError(rc);
+    return 0;
+}
+
+ACR_NET_EXPORT(jlong, IpcEndpoint, create0)(JNI_STDARGS, jboolean msg)
+{
+    IPCSOCK *cp;
+
+    if ((cp = calloc(1, sizeof(IPCSOCK))) == 0) {
+        ACR_THROW_NET_ERROR(ACR_ENOMEM);
+        /* Allocation failed */
+        return 0;
+    }
+    ACR_RING_ELEM_INIT(cp, rLink);
+    if (msg == JNI_FALSE)
+        cp->dwPageSize = PAGESIZE;
+    cp->dwTimeout   = INFINITE;
+    cp->nStatus     = IPCSOCK_CLOSED;
+    cp->pStatus     = &cp->nStatus;
+    cp->nReferences = 1;
+    return P2J(cp);
+};
+
+ACR_NET_EXPORT(jint, IpcEndpoint, connect0)(JNI_STDARGS, jlong fp, jbyteArray cb, jint timeout)
+{
+    int i, rc = 0;
+    INT64  nTimeup     = 0;
+    IPCSOCK_ACCEPT  *a = 0;
+    HANDLE wh[2];
+    HANDLE hDuplicate;
+    BOOL   bRemoteInit = FALSE;
+    LPIPCSOCK        cp = J2P(fp, LPIPCSOCK);
+    acr_sockaddr_t  *ca = 0;
+
+    if (cp == 0)
+        return WSAENOTSOCK;
+    if (cp->c != 0)
+        return WSAEISCONN;
+    if (timeout != -1 && timeout != 0) {
+        /* Get "future" timeout */
+        nTimeup = GetCurrentMilliseconds() + timeout;
+    }
+    ca = SOCKADDR_CAST(cb);
+    InterlockedIncrement(&cp->nReferences);
+    _InterlockedOr(&cp->nStatus, IPCSOCK_CONNECTING);
+    if ((cp->rp = AcrIpcRemoteOpen(ca->hostname)) == 0) {
+        _InterlockedOr(cp->pStatus, IPCSOCK_ABORTED);
+        ApcIpcUnref(cp);
+        SOCKADDR_RELEASE(cb, ca);
+        return GetLastError();
+    }
+    SOCKADDR_RELEASE(cb, ca);
+    printf("[client] Connecting to %d ...\n", cp->rp->s->dwProcessId);
+
+    /* Create our metadata page */
+    if ((cp->hClientMeta = CreateFileMapping(INVALID_HANDLE_VALUE,
+                                             IPCSECURITY_TOKEN,
+                                             PAGE_READWRITE, 0,
+                                             PAGESIZE, 0)) == 0)
+        goto failed;
+    if ((cp->c = MapViewOfFile(cp->hClientMeta, FILE_MAP_ALL_ACCESS, 0, 0, 0)) == 0)
+        goto failed;
+    cp->c->dwError    =  WSAEINPROGRESS;
+    cp->c->dwPageSize =  cp->dwPageSize;
+    cp->pStatus       = &cp->c->nStatus;
+    wh[0] = cp->rp->hAcceptSema;
+    wh[1] = cp->rp->hProcessLock;
+again:
+    rc = WaitForMultipleObjects(2, wh, FALSE, timeout);
+    switch (rc) {
+        case WAIT_OBJECT_0:
+            /* Got a semaphore */
+        break;
+        case WAIT_OBJECT_1:
+        case WAIT_ABANDONED_1:
+            ReleaseMutex(cp->rp->hProcessLock);
+            printf("[client] Server died\n");
+            rc = WSAECONNREFUSED;
+        break;
+        case WAIT_TIMEOUT:
+            printf("[client] Timeout!\n");
+            if (timeout == 0)
+                rc = WSAEWOULDBLOCK;
+            else
+                rc = WSAETIMEDOUT;
+        break;
+        case WAIT_FAILED:
+            rc = GetLastError();
+            printf("[client] Wait failed %d\n", rc);
+        break;
+        default:
+            printf("[client] Unexpected result %d\n", rc);
+        break;
+    }
+    if (rc != 0)
+        goto cleanup;
+    AcquireMutex(cp->rp->hAcceptLock);
+    /* CRITICAL code begin
+     * Since we don't use process locks the following code
+     * is critical and needs a review.
+     * A race condition can occur when reaching the burst
+     * load limit. However this connector is not meant to be
+     * used for such usage.
+     */
+    if (IsAcceptQueueFull(cp->rp->s)) {
+        /* No strorage for our accept data
+         */
+        ReleaseMutex(cp->rp->hAcceptLock);
+        if ((timeout == -1) || ((nTimeup > 0) && (nTimeup > GetCurrentMilliseconds()))) {
+            printf("[client] No free connection slots. retrying\n");
+            goto again;
+        }
+        /* Timeout occured */
+        rc = WSAETIMEDOUT;
+        goto cleanup;
+    }
+    i = cp->rp->s->Queue.dwWrPos++;
+    cp->rp->s->Queue.dwWrPos %= cp->rp->s->Queue.dwSize;
+    a = cp->rp->s->a + i;
+    a->nStatus     = dwCurrentPid;
+    a->nClientMeta = 0;
+    ReleaseMutex(cp->rp->hAcceptLock);
+    if (!DuplicateHandle(gThisProcess, cp->hClientMeta, cp->rp->hServerProc, &hDuplicate,
+                         0, FALSE, DUPLICATE_SAME_ACCESS))
+        goto failed;
+    a->nClientMeta = H2DW(hDuplicate);
+    EnterCriticalSection(&gSynchronized);
+    if (cp->rp->dwProcessId == 0) {
+        bRemoteInit = TRUE;
+        /* Duplicate our process mutex into the server.
+         * This is only signaled if our process dies before cleanly closing
+         * the connection(s).
+         */
+        if (!DuplicateHandle(gThisProcess, gProcessLock, cp->rp->hServerProc, &cp->rp->dProcessLock,
+                             0, FALSE, DUPLICATE_SAME_ACCESS)) {
+            LeaveCriticalSection(&gSynchronized);
+            goto failed;
+        }
+        cp->rp->dwProcessId = cp->rp->s->dwProcessId;
+    }
+    LeaveCriticalSection(&gSynchronized);
+    cp->c->nProcessLock = H2DW(cp->rp->dProcessLock);
+    for (i = 0; i < 4; i++) {
+        /* Create sync events
+         */
+        cp->hSync[i] = CreateEvent(IPCSECURITY_TOKEN, FALSE, FALSE, 0);
+        if (!DuplicateHandle(gThisProcess, cp->hSync[i], cp->rp->hServerProc, &hDuplicate,
+                             0, FALSE, DUPLICATE_SAME_ACCESS))
+            goto failed;
+        cp->c->nSync[i] = H2DW(hDuplicate);
+    }
+    if (cp->dwPageSize != 0) {
+        if ((cp->hBufferMap[0] = CreateFileMappingW(INVALID_HANDLE_VALUE, IPCSECURITY_TOKEN,
+                                                    PAGE_READWRITE, 0, cp->dwPageSize, 0)) == 0 ||
+            (cp->hBufferMap[1] = CreateFileMappingW(INVALID_HANDLE_VALUE, IPCSECURITY_TOKEN,
+                                                    PAGE_READWRITE, 0, cp->dwPageSize, 0)) == 0)
+            goto failed;
+        if ((cp->pRdbData = AcrIoBufMap(cp->hBufferMap[0], cp->dwPageSize)) == 0 ||
+            (cp->pWrbData = AcrIoBufMap(cp->hBufferMap[1], cp->dwPageSize)) == 0)
+            goto failed;
+        /* Use reverse order for r/w buffers */
+        if (!DuplicateHandle(gThisProcess, cp->hBufferMap[0], cp->rp->hServerProc, &hDuplicate,
+                             0, FALSE, DUPLICATE_SAME_ACCESS))
+                goto failed;
+        cp->c->nBufferMap[0] = H2DW(hDuplicate);
+        if (!DuplicateHandle(gThisProcess, cp->hBufferMap[1], cp->rp->hServerProc, &hDuplicate,
+                             0, FALSE, DUPLICATE_SAME_ACCESS))
+                goto failed;
+        cp->c->nBufferMap[1] = H2DW(hDuplicate);
+    }
+    printf("[client] Connected to %d\n", cp->rp->s->dwProcessId);
+    SetEvent(cp->rp->hAcceptSync);
+    /* Not needed any more */
+    printf("[client] Waiting for ack\n");
+    if (timeout != -1 && timeout != 0) {
+        /* Update timeout with the time we spend inside processing so far
+         */
+        timeout = (int)(nTimeup - GetCurrentMilliseconds());
+        if (timeout < 0)
+            timeout = 1;
+    }
+    /* Now wait for the server ack */
+    wh[0] = cp->hSync[IPCSOCK_RDR];
+    wh[1] = cp->rp->hProcessLock;
+
+#if defined(IPCSOCK_ACK_TIMEOUT)
+    /* Use provided hard acknowledge timeout.
+     * There is no point to wait infinitely
+     * if the server gets too busy.
+     */
+     if (timeout < 0 || timeout > IPCSOCK_ACK_TIMEOUT)
+        timeout = IPCSOCK_ACK_TIMEOUT;
+#endif
+    rc = WaitForMultipleObjects(2, wh, FALSE, timeout);
+    switch (rc) {
+        case WAIT_OBJECT_0:
+            /* Got ack event */
+        break;
+        case WAIT_OBJECT_1:
+        case WAIT_ABANDONED_1:
+            ReleaseMutex(cp->rp->hProcessLock);
+            printf("[client] Server died\n");
+            rc = WSAECONNREFUSED;
+        break;
+        case WAIT_TIMEOUT:
+            printf("[client] Timeout!\n");
+            rc = WSAETIMEDOUT;
+        break;
+        case WAIT_FAILED:
+            rc = GetLastError();
+            printf("[client] Wait failed %d\n", rc);
+        break;
+        default:
+            printf("[client] Unexpected result %d\n", rc);
+        break;
+    }
+    if (rc == 0)
+        rc = InterlockedExchange(&a->nStatus, 0);
+    if (rc == 0) {
+        /*** CONNECTED ***
+         *
+         */
+        cp->Rd = &cp->c->dPos[0];
+        cp->Wr = &cp->c->dPos[1];
+        cp->Mr = &cp->c->dMsg[0];
+        cp->Mw = &cp->c->dMsg[1];
+        cp->c->dwError = 0;
+        InterlockedExchange(cp->pStatus, 0);
+
+        ApcIpcUnref(cp);
+        return 0;
+    }
+    printf("client] connection rejected. recycling\n");
+    goto cleanup;
+failed:
+    rc = GetLastError();
+cleanup:
+    /* A boring process of cleanning up everything we allocated
+     * and duplicated.
+     */
+    _InterlockedOr(cp->pStatus, IPCSOCK_ABORTED);
+    if (cp->c != 0) {
+        /* Close remote handles */
+        for (i = 0; i < 4; i++)
+            CloseRemoteHandle(cp->rp->hServerProc, cp->c->nSync[i]);
+        for (i = 0; i < 2; i++)
+            CloseRemoteHandle(cp->rp->hServerProc, cp->c->nBufferMap[i]);
+    }
+    if (a != 0) {
+        InterlockedExchange(&a->nStatus, 0);
+        /* Signal to parent so that it consumes the failed
+         * accept record. With a->nStatus == 0 it will be
+         * skipped and a new request will be issued
+         */
+        SetEvent(cp->rp->hAcceptSync);
+        CloseRemoteHandle(cp->rp->hServerProc, a->nClientMeta);
+    }
+    if (bRemoteInit) {
+        /* Usual case. Failed with the first connect attempt
+         * The remote will be destroyed but the server might not
+         * close the duplicate.
+         */
+        CloseRemoteHandle(cp->rp->hServerProc, cp->c->nProcessLock);
+        cp->rp->dProcessLock = 0;
+    }
+    for (i = 0; i < 4; i++) {
+        /* Close sync objects */
+        SAFE_CLOSE_HANDLE(cp->hSync[i]);
+    }
+    for (i = 0; i < 2; i++) {
+        SAFE_CLOSE_HANDLE(cp->hBufferMap[i]);
+    }
+    SAFE_UNMAP2(cp->pRdbData, cp->dwPageSize);
+    SAFE_UNMAP2(cp->pWrbData, cp->dwPageSize);
+    SAFE_CLOSE_HANDLE(cp->hClientMeta);
+    SAFE_UNMAP1(cp->c);
+    AcrIpcRemoteUnref(cp->rp);
+    cp->c  = 0;
+    cp->rp = 0;
+    ApcIpcUnref(cp);
+    return rc;
+};
+
+ACR_NET_EXPORT(jint, IpcEndpoint, tmset0)(JNI_STDARGS, jlong fp, jint timeout)
+{
+    LPIPCSOCK cp = J2P(fp, LPIPCSOCK);
 
+    if (cp == 0)
+        return WSAENOTSOCK;
+    if (cp->c == 0)
+        return WSAENOTCONN;
+    if (_InterlockedOr(cp->pStatus, 0) != 0)
+        return WSAESHUTDOWN;
+    if (timeout < 0)
+        cp->dwTimeout = INFINITE;
+    else
+        cp->dwTimeout = timeout;
+    return 0;
 }
+
+ACR_NET_EXPORT(jint, IpcEndpoint, shutdown0)(JNI_STDARGS, jlong fp)
+{
+    LPIPCSOCK cp = J2P(fp, LPIPCSOCK);
+    if (cp == 0)
+        return WSAENOTSOCK;
+    if (cp->c == 0)
+        return WSAENOTCONN;
+    if (_InterlockedOr(cp->pStatus, IPCSOCK_SHUTDOWN) != 0)
+        return WSAEALREADY;
+    cp->c->dwError = WSAESHUTDOWN;
+
+    /* Break listeners on our side
+     */
+    SetEvent(cp->hSync[IPCSOCK_RDR]);
+    SetEvent(cp->hSync[IPCSOCK_RTT]);
+    return 0;
+}
+
+ACR_NET_EXPORT(jint, IpcEndpoint, close0)(JNI_STDARGS, jlong fp)
+{
+    int i;
+    LPIPCSOCK cp = J2P(fp, LPIPCSOCK);
+
+    if (cp == 0)
+        return WSAENOTSOCK;
+    if (cp->c == 0) {
+        /* Already closed or not initialized.
+         * Just free the memory.
+         */
+        AcrFree(cp);
+        return 0;
+    }
+    if (_InterlockedAnd(cp->pStatus, IPCSOCK_CONNECTING) == IPCSOCK_CONNECTING) {
+        /* Closing the socket currenty in the connect state.
+         * Close call will have to be called again.
+         */
+        return WSAEINPROGRESS;
+    }
+    cp->c->dwError = WSAECONNABORTED;
+    /* Set the state to closed.
+     * Any furter communication on the socket must fail.
+     */
+    cp->nStatus = _InterlockedOr(cp->pStatus, IPCSOCK_CLOSED);
+    if (cp->nStatus == 0) {
+        /* Close without shutdown.
+         * Inform our listeners that we are going to close.
+         * The CloseHandle will break the loop so this might not
+         * be needed after all.
+         */
+        SetEvent(cp->hSync[IPCSOCK_RDR]);
+        SetEvent(cp->hSync[IPCSOCK_RTT]);
+    }
+    else if ((cp->nStatus & IPCSOCK_CLOSED) != 0) {
+        /* Hardly to happen in a normal circumstances
+         * We are closing the socket that is already in the close call
+         */
+        return WSAEALREADY;
+    }
+    /* Block remote updating our status */
+    cp->nStatus = IPCSOCK_CLOSED;
+    cp->pStatus = &cp->nStatus;
+    /* XXX: Should we left the client to hard timeout?
+     * Signal client that is blocking expecting we
+     * provide or consume the data
+     */
+    SetEvent(cp->hSync[IPCSOCK_TDR]);
+    SetEvent(cp->hSync[IPCSOCK_RTR]);
+    /* XXX: Some ack would be handy here
+     * so that we don't close the handles too soon
+     */
+    for (i = 0; i < 4; i++) {
+        /* Close sync objects */
+        SAFE_CLOSE_HANDLE(cp->hSync[i]);
+    }
+    for (i = 0; i < 2; i++) {
+        SAFE_CLOSE_HANDLE(cp->hBufferMap[i]);
+    }
+    SAFE_CLOSE_HANDLE(cp->hClientMeta);
+    if (cp->sp != 0) {
+        EnterCriticalSection(&gSynchronized);
+        ACR_RING_REMOVE(cp, rLink);
+        InterlockedDecrement(&cp->sp->nConnections);
+        LeaveCriticalSection(&gSynchronized);
+    }
+    printf("[close] Refcount=%d\n", cp->nReferences);
+    /* Depending if the socket was inside a blocking
+     * call this mignt not actually free the socket.
+     * However the call to unref in the blocking call will
+     * close the socket in that case.
+     */
+    ApcIpcUnref(cp);
+    return 0;
+}
+
+ACR_NET_EXPORT(jint, IpcServerEndpoint, close0)(JNI_STDARGS, jlong fp)
+{
+    LPIPCSERVER sp = J2P(fp, LPIPCSERVER);
+    LPIPCSOCK   cp;
+    LPIPCSOCK   np;
+
+    if (sp == 0)
+        return WSAENOTSOCK;
+    ACR_RING_FOREACH_SAFE(cp, np, &sp->rConnections, IPCSOCK, rLink) {
+        /* Unlink for the server */
+        ACR_RING_REMOVE(cp, rLink);
+        /* Mark the connection as aborted.
+         * The user must still close each individual
+         * connection to free the memory and resources.
+         */
+        cp->nStatus = _InterlockedOr(cp->pStatus, IPCSOCK_ABORTED);
+        if (cp->nStatus == 0) {
+            /* Inform our listeners that we are going to close.
+             */
+            SetEvent(cp->hSync[IPCSOCK_RDR]);
+            SetEvent(cp->hSync[IPCSOCK_RTT]);
+        }
+        sp->nConnections--;
+    }
+    if (sp->nConnections != 0) {
+        /* Should never happen [tm]
+         */
+        printf("[server] Found %d active. Should be zero\n", sp->nConnections);
+    }
+    SAFE_CLOSE_HANDLE(sp->hAcceptSema);
+    SAFE_CLOSE_HANDLE(sp->hAcceptSync);
+    SAFE_CLOSE_HANDLE(sp->hAcceptLock);
+    SAFE_CLOSE_HANDLE(sp->hServerMap);
+
+    AcrFree(sp);
+    return 0;
+}
+
+ACR_NET_EXPORT(jint, IpcEndpoint, avail0)(JNI_STDARGS, jlong fp, jboolean readside)
+{
+    LPIPCSOCK cp = J2P(fp, LPIPCSOCK);
+    long nAvail;
+
+    if (cp == 0) {
+        ACR_THROW_NET(WSAENOTSOCK);
+        return -1;
+    }
+    if (cp->c == 0) {
+        ACR_THROW_NET(WSAENOTCONN);
+        return -1;
+    }
+    cp->nStatus = _InterlockedOr(cp->pStatus, 0);
+    if (cp->nStatus > IPCSOCK_SHUTDOWN) {
+        /* Any attempt to read on closed connection is error.
+         */
+        if (cp->c != 0)
+            ACR_THROW_NET(cp->c->dwError);
+        else
+            ACR_THROW_NET(WSAENOTCONN);
+        return -1;
+    }
+    if (WaitForSingleObject(cp->rp->hProcessLock, 0) != WAIT_TIMEOUT) {
+        ReleaseMutex(cp->rp->hProcessLock);
+        cp->c->dwError = WSAECONNRESET;
+        InterlockedExchange(cp->pStatus, IPCSOCK_ABORTED);
+        ACR_THROW_NET(cp->c->dwError);
+        return -1;
+    }
+    MemoryBarrier();
+    if (cp->dwPageSize == 0) {
+        if (readside)
+            nAvail = cp->Mr->Length - cp->Mr->Readed;
+        else
+            nAvail = cp->Mw->Length == 0 ? IPCSOCK_MSGSIZE : 0;
+    }
+    else {
+        nAvail = cp->Rd->SendPos - cp->Rd->RecvPos;
+        if (readside == JNI_FALSE)
+            nAvail = cp->dwPageSize - nAvail;
+    }
+    if (nAvail > 0)
+        return nAvail;
+    else
+        return 0;
+}
+
+int AcrIpcRead(LPIPCSOCK cp, void *pData, int nSize)
+{
+    long   nAvail;
+    long   nRead     = 0;
+    long   nCapacity = cp->dwPageSize;
+    LPBYTE pStart    = cp->pRdbData;
+    DWORD  dwTimeout = cp->dwTimeout;
+
+    if (cp == 0) {
+        SetLastError(WSAENOTSOCK);
+        return -1;
+    }
+    if (cp->c == 0) {
+        SetLastError(WSAENOTCONN);
+        return -1;
+    }
+    if (cp->dwPageSize == 0) {
+        SetLastError(WSAEPFNOSUPPORT);
+        return -1;
+    }
+    if (pData == 0 || nSize < 1) {
+        SetLastError(WSAEINVAL);
+        return -1;
+    }
+    if (_InterlockedOr(&cp->nState, IPCSOCK_READ) == IPCSOCK_READ) {
+        /* Protect against concurrent reads.
+         */
+        SetLastError(WSAEALREADY);
+        return -1;
+    }
+    InterlockedIncrement(&cp->nReferences);
+    while (nRead == 0) {
+        cp->nStatus = _InterlockedOr(cp->pStatus, 0);
+        if (cp->nStatus > IPCSOCK_SHUTDOWN) {
+            /* Any attempt to read on closed connection is an error.
+             */
+            if (cp->c != 0)
+                SetLastError(cp->c->dwError);
+            else
+                SetLastError(WSAENOTCONN);
+            nRead = -1;
+            break;
+        }
+        cp->Rd->RecvWait = TRUE;
+        MemoryBarrier();
+        nAvail = cp->Rd->SendPos - cp->Rd->RecvPos;
+        if (nAvail > 0) {
+            /* We have something in the buffer
+             */
+            cp->Rd->RecvWait = FALSE;
+            /* Copy to the user buffer */
+            nRead = nAvail > nSize ? nSize : nAvail;
+            memcpy(pData, pStart + cp->Rd->RecvPos, nRead);
+            /* Advance read pointer */
+            cp->Rd->RecvPos += nRead;
+            if (cp->Rd->RecvPos >= nCapacity) {
+                cp->Rd->RecvPos -= nCapacity;
+                cp->Rd->SendPos -= nCapacity;
+            }
+            MemoryBarrier();
+            if (cp->Rd->SendWait) {
+                /* Inform the peer that we have read some bytes */
+                SetEvent(cp->hSync[IPCSOCK_RTR]);
+            }
+        }
+        else if (cp->dwTimeout == 0) {
+            /* Non blocking mode.
+             */
+            cp->Rd->RecvWait = FALSE;
+            if (nRead == 0) {
+                SetLastError(WSAEWOULDBLOCK);
+                nRead = -1;
+            }
+            break;
+        }
+        else {
+            DWORD  ws;
+            HANDLE wh[2];
+
+            if (cp->nStatus != 0) {
+                /* Cannot require data on already SHUTDOWN connection */
+                SetLastError(WSAESHUTDOWN);
+                nRead = -1;
+                break;
+            }
+            wh[0] = cp->hSync[IPCSOCK_RDR];
+            wh[1] = cp->rp->hProcessLock;
+            /* Wait for a signal.
+             * XXX: INFINITE timeouts should really be some sane
+             *      value which will abort the connection.
+             */
+            ws = WaitForMultipleObjects(2, wh, FALSE, dwTimeout);
+            cp->Rd->RecvWait = FALSE;
+            switch (ws) {
+                case WAIT_OBJECT_0:
+                    /* Make sure we don't end up in the
+                     * wait call twice in a row
+                     */
+                    dwTimeout = 0;
+                break;
+                case WAIT_OBJECT_1:
+                case WAIT_ABANDONED_1:
+                    ReleaseMutex(cp->rp->hProcessLock);
+                    cp->c->dwError = WSAECONNRESET;
+                    InterlockedExchange(cp->pStatus, IPCSOCK_ABORTED);
+                    SetLastError(cp->c->dwError);
+                    nRead = -1;
+                break;
+                case WAIT_TIMEOUT:
+                    SetLastError(WSAETIMEDOUT);
+                    nRead = -1;
+                break;
+                default:
+                    /* This can only be WAIT_FAILED
+                     * in which case errno is already set
+                     */
+                    nRead = -1;
+                break;
+            }
+        }
+    }
+    _InterlockedAnd(&cp->nState, ~IPCSOCK_READ);
+    ApcIpcUnref(cp);
+    return nRead;
+}
+
+int AcrIpcWrite(LPIPCSOCK cp, const void *pData, int nSize, int nFlags)
+{
+    long   nCapacity;
+    LPBYTE pDest;
+    const LPBYTE bData = (const LPBYTE)pData;
+    long   nSend    = 0;
+    long   nChunk   = 0;
+    INT64  nTimeup  = 0;
+    int    nTimeout = cp->dwTimeout;
+
+    if (cp == 0) {
+        SetLastError(WSAENOTSOCK);
+        return -1;
+    }
+    if (cp->c == 0) {
+        SetLastError(WSAENOTCONN);
+        return -1;
+    }
+    if (cp->dwPageSize == 0) {
+        SetLastError(WSAEPFNOSUPPORT);
+        return -1;
+    }
+    if (pData == 0 || nSize < 1) {
+        SetLastError(WSAEINVAL);
+        return -1;
+    }
+    if (_InterlockedOr(&cp->nState, IPCSOCK_SEND) == IPCSOCK_SEND) {
+        /* Protect against concurrent writes.
+         */
+        SetLastError(WSAEALREADY);
+        return -1;
+    }
+    InterlockedIncrement(&cp->nReferences);
+    if (cp->dwTimeout != INFINITE && cp->dwTimeout != 0) {
+        /* Get "future" timeout */
+        nTimeup = GetCurrentMilliseconds() + cp->dwTimeout;
+    }
+    while (nSize > 0) {
+        cp->nStatus = _InterlockedOr(cp->pStatus, 0);
+        if (cp->nStatus != 0) {
+            /* We don't support half-closed connections
+             * so any attempt to write on shudown socket
+             * is error.
+             */
+            if (cp->c != 0)
+                SetLastError(cp->c->dwError);
+            else
+                SetLastError(WSAENOTCONN);
+            nSend = -1;
+            break;
+        }
+        cp->Wr->SendWait = TRUE;
+        MemoryBarrier();
+        nCapacity = cp->dwPageSize - (cp->Wr->SendPos - cp->Wr->RecvPos);
+        pDest     = cp->pWrbData + cp->Wr->SendPos;
+        if (nCapacity > 0) {
+            cp->Wr->SendWait = FALSE;
+            /* We have free space in the buffer
+             */
+            nChunk = nCapacity > nSize ? nSize : nCapacity;
+            memcpy(pDest, bData + nSend, nChunk);
+            /* Advance write pointer */
+            cp->Wr->SendPos += nChunk;
+            nSend += nChunk;
+            nSize -= nChunk;
+            MemoryBarrier();
+            if (cp->Wr->RecvWait) {
+                /* Inform the peer that we wrote some bytes */
+                SetEvent(cp->hSync[IPCSOCK_TDR]);
+            }
+            if ((nFlags & IPCSOCK_SEND_ALL) == 0) {
+                /* User requested a partial write
+                 */
+                break;
+            }
+        }
+        else if (cp->dwTimeout == 0) {
+            /* Non blocking mode.
+             */
+            cp->Wr->SendWait = FALSE;
+            if (nSend == 0) {
+                SetLastError(WSAEWOULDBLOCK);
+                nSend = -1;
+            }
+            break;
+        }
+        else {
+            DWORD  ws;
+            HANDLE wh[2];
+
+            wh[0] = cp->hSync[IPCSOCK_RTT];
+            wh[1] = cp->rp->hProcessLock;
+            if (nTimeup != 0) {
+                nTimeout = (int)(nTimeup - GetCurrentMilliseconds());
+                if (nTimeout < 0)
+                    nTimeout = 0;
+            }
+            /* Wait for a signal.
+             * XXX: INFINITE timeouts should really be some sane
+             *      value which will abort the connection.
+             */
+            ws = WaitForMultipleObjects(2, wh, FALSE, nTimeout);
+            cp->Wr->SendWait = FALSE;
+            switch (ws) {
+                case WAIT_OBJECT_0:
+                break;
+                case WAIT_OBJECT_1:
+                case WAIT_ABANDONED_1:
+                    ReleaseMutex(cp->rp->hProcessLock);
+                    cp->c->dwError = WSAECONNRESET;
+                    InterlockedExchange(cp->pStatus, IPCSOCK_ABORTED);
+                    SetLastError(cp->c->dwError);
+                    nSend = -1;
+                break;
+                case WAIT_TIMEOUT:
+                    SetLastError(WSAETIMEDOUT);
+                    nSend = -1;
+                break;
+                default:
+                    /* This can be only WAIT_FAILED
+                     * in which case errno is already set
+                     */
+                    nSend = -1;
+                break;
+            }
+        }
+    }
+    _InterlockedAnd(&cp->nState, ~IPCSOCK_SEND);
+    ApcIpcUnref(cp);
+    return nSend;
+}
+
+int AcrIpcSend(LPIPCSOCK cp, const void *pData, int nSize, int nFlags)
+{
+    int    rc       = 0;
+    long   nSend    = 0;
+    INT64  nTimeup  = 0;
+    int    nTimeout = cp->dwTimeout;
+
+    if (cp == 0)
+        return WSAENOTSOCK;
+    if (cp->c == 0)
+        return WSAENOTCONN;
+    if (cp->dwPageSize != 0)
+        return  WSAEPFNOSUPPORT;
+    if (pData == 0 || nSize < 1 || nSize > IPCSOCK_MSGSIZE)
+        return WSAEINVAL;
+    if (_InterlockedOr(&cp->nState, IPCSOCK_SEND) == IPCSOCK_SEND)
+        return WSAEALREADY;
+    InterlockedIncrement(&cp->nReferences);
+    if (cp->dwTimeout != INFINITE && cp->dwTimeout != 0) {
+        /* Get "future" timeout */
+        nTimeup = GetCurrentMilliseconds() + cp->dwTimeout;
+    }
+    while (rc == 0) {
+        cp->nStatus = _InterlockedOr(cp->pStatus, 0);
+        if (cp->nStatus != 0) {
+            /* We don't support half-closed connections
+             * so any attempt to write on shudown socket
+             * is error.
+             */
+            if (cp->c != 0)
+                rc = cp->c->dwError;
+            else
+                rc = WSAENOTCONN;
+            break;
+        }
+        cp->Mw->SendWait = TRUE;
+        MemoryBarrier();
+        if (cp->Mw->Length == 0) {
+            if ((nFlags & IPCSOCK_FLUSH) == 0)
+                cp->Mw->SendWait = FALSE;
+            if (nSend != 0) {
+                /* We have already send the data
+                 * and this was confirmation that it was
+                 * delivered.
+                 */
+                cp->Mw->SendWait = FALSE;
+                break;
+            }
+            /* We have free space in the buffer
+             */
+            memcpy(cp->Mw->Data, pData, nSize);
+            InterlockedIncrement(&cp->Mw->Id);
+            cp->Mw->Length = nSize;
+            MemoryBarrier();
+            if (cp->Mw->RecvWait) {
+                /* Inform the peer that we have send the message */
+                SetEvent(cp->hSync[IPCSOCK_TDR]);
+            }
+            if ((nFlags & IPCSOCK_FLUSH) == 0)
+                break;
+            nSend = nSize;
+        }
+        else if (cp->dwTimeout == 0) {
+            /* Non blocking mode.
+             */
+            cp->Mw->SendWait = FALSE;
+            rc = WSAEWOULDBLOCK;
+            break;
+        }
+        else {
+            DWORD  ws;
+            HANDLE wh[2];
+
+            wh[0] = cp->hSync[IPCSOCK_RTT];
+            wh[1] = cp->rp->hProcessLock;
+            if (nTimeup != 0) {
+                nTimeout = (int)(nTimeup - GetCurrentMilliseconds());
+                if (nTimeout < 0)
+                    nTimeout = 0;
+            }
+            /* Wait for a signal.
+             * XXX: INFINITE timeouts should really be some sane
+             *      value which will abort the connection.
+             */
+            ws = WaitForMultipleObjects(2, wh, FALSE, nTimeout);
+            cp->Mw->SendWait = FALSE;
+            switch (ws) {
+                case WAIT_OBJECT_0:
+                break;
+                case WAIT_OBJECT_1:
+                case WAIT_ABANDONED_1:
+                    ReleaseMutex(cp->rp->hProcessLock);
+                    cp->c->dwError = WSAECONNRESET;
+                    InterlockedExchange(cp->pStatus, IPCSOCK_ABORTED);
+                    rc = cp->c->dwError;
+                break;
+                case WAIT_TIMEOUT:
+                    rc = WSAETIMEDOUT;
+                break;
+                default:
+                    /* This can be only WAIT_FAILED
+                     * in which case errno is already set
+                     */
+                    rc = GetLastError();
+                break;
+            }
+        }
+    }
+    _InterlockedAnd(&cp->nState, ~IPCSOCK_SEND);
+    ApcIpcUnref(cp);
+    return rc;
+}
+
+int AcrIpcRecv(LPIPCSOCK cp, void *pData, int nSize)
+{
+    long   nAvail;
+    long   nRead     = 0;
+    DWORD  dwTimeout = cp->dwTimeout;
+
+    if (cp == 0) {
+        SetLastError(WSAENOTSOCK);
+        return -1;
+    }
+    if (cp->c == 0) {
+        SetLastError(WSAENOTCONN);
+        return -1;
+    }
+    if (cp->dwPageSize != 0) {
+        SetLastError(WSAEPFNOSUPPORT);
+        return -1;
+    }
+    if (pData == 0 || nSize < 1) {
+        SetLastError(WSAEINVAL);
+        return -1;
+    }
+    if (_InterlockedOr(&cp->nState, IPCSOCK_READ) == IPCSOCK_READ) {
+        /* Protect against concurrent reads.
+         */
+        SetLastError(WSAEALREADY);
+        return -1;
+    }
+    InterlockedIncrement(&cp->nReferences);
+    while (nRead == 0) {
+        cp->nStatus = _InterlockedOr(cp->pStatus, 0);
+        if (cp->nStatus > IPCSOCK_SHUTDOWN) {
+            /* Any attempt to read on closed connection is an error.
+             */
+            if (cp->c != 0)
+                SetLastError(cp->c->dwError);
+            else
+                SetLastError(WSAENOTCONN);
+            nRead = -1;
+            break;
+        }
+        cp->Mr->RecvWait = TRUE;
+        MemoryBarrier();
+        nAvail = cp->Mr->Length - cp->Mr->Readed;
+        if (nAvail > 0) {
+            /* We have something in the buffer
+             */
+            cp->Mr->RecvWait = FALSE;
+            /* Copy to the user buffer */
+            nRead = nAvail > nSize ? nSize : nAvail;
+            memcpy(pData, cp->Mr->Data + cp->Mr->Readed, nRead);
+            /* Advance read pointer */
+            cp->Mr->Readed += nRead;
+            if (cp->Mr->Readed >= cp->Mr->Length) {
+                cp->Mr->Readed = 0;
+                cp->Mr->Length = 0;
+            }
+            MemoryBarrier();
+            if (cp->Mr->SendWait && cp->Mr->Length == 0) {
+                /* Inform the peer that we have read the entire message
+                 */
+                SetEvent(cp->hSync[IPCSOCK_RTR]);
+            }
+        }
+        else if (cp->dwTimeout == 0) {
+            /* Non blocking mode.
+             */
+            cp->Mr->RecvWait = FALSE;
+            SetLastError(WSAEWOULDBLOCK);
+            nRead = -1;
+        }
+        else {
+            DWORD  ws;
+            HANDLE wh[2];
+
+            if (cp->nStatus != 0) {
+                /* Cannot require data on already SHUTDOWN connection */
+                SetLastError(WSAESHUTDOWN);
+                nRead = -1;
+                break;
+            }
+            wh[0] = cp->hSync[IPCSOCK_RDR];
+            wh[1] = cp->rp->hProcessLock;
+            /* Wait for a signal.
+             * XXX: INFINITE timeouts should really be some sane
+             *      value which will abort the connection.
+             */
+            ws = WaitForMultipleObjects(2, wh, FALSE, dwTimeout);
+            cp->Mr->RecvWait = FALSE;
+            switch (ws) {
+                case WAIT_OBJECT_0:
+                break;
+                case WAIT_OBJECT_1:
+                case WAIT_ABANDONED_1:
+                    ReleaseMutex(cp->rp->hProcessLock);
+                    cp->c->dwError = WSAECONNRESET;
+                    InterlockedExchange(cp->pStatus, IPCSOCK_ABORTED);
+                    SetLastError(cp->c->dwError);
+                    nRead = -1;
+                break;
+                case WAIT_TIMEOUT:
+                    SetLastError(WSAETIMEDOUT);
+                    nRead = -1;
+                break;
+                default:
+                    /* This can only be WAIT_FAILED
+                     * in which case errno is already set
+                     */
+                    nRead = -1;
+                break;
+            }
+        }
+    }
+    _InterlockedAnd(&cp->nState, ~IPCSOCK_READ);
+    ApcIpcUnref(cp);
+    return nRead;
+}
+
+ACR_NET_EXPORT(jint, IpcEndpoint, flush0)(JNI_STDARGS, jlong fp)
+{
+    int    rc       = 0;
+    long   nUsed    = 0;
+    INT64  nTimeup  = 0;
+    int    nTimeout;
+    LPIPCSOCK cp    = J2P(fp, LPIPCSOCK);
+    volatile long *pSendWait;
+
+    if (cp == 0)
+        return WSAENOTSOCK;
+    if (cp->c == 0)
+        return WSAENOTCONN;
+    if (_InterlockedOr(&cp->nState, IPCSOCK_SEND) == IPCSOCK_SEND)
+        return WSAEALREADY;
+    nTimeout = cp->dwTimeout;
+    InterlockedIncrement(&cp->nReferences);
+    if (cp->dwTimeout != INFINITE && cp->dwTimeout != 0) {
+        /* Get "future" timeout */
+        nTimeup = GetCurrentMilliseconds() + cp->dwTimeout;
+    }
+    if (cp->dwPageSize == 0)
+        pSendWait = &cp->Wr->SendWait;
+    else
+        pSendWait = &cp->Mw->SendWait;
+
+    while (rc == 0) {
+        cp->nStatus = _InterlockedOr(cp->pStatus, 0);
+        if (cp->nStatus != 0) {
+            /* We don't support half-closed connections
+             * so any attempt to write on shudown socket
+             * is error.
+             */
+            if (cp->c != 0)
+                rc = cp->c->dwError;
+            else
+                rc = WSAENOTCONN;
+            break;
+        }
+        *pSendWait = TRUE;
+        MemoryBarrier();
+        if (cp->dwPageSize == 0)
+            nUsed = cp->Wr->SendPos - cp->Wr->RecvPos;
+        else
+            nUsed = cp->Mw->Length;
+        if (nUsed > 0) {
+            /* Our send buffer is not empty.
+             * Wait for the receiver to consume data
+             */
+            if (cp->Mw->RecvWait || cp->Wr->RecvWait) {
+                /* XXX: This should be signaled already
+                 * but probably not handled (yet).
+                 */
+                SetEvent(cp->hSync[IPCSOCK_TDR]);
+            }
+            if (cp->dwTimeout == 0) {
+                /* Non blocking mode.
+                 */
+                *pSendWait = FALSE;
+                rc = WSAEWOULDBLOCK;
+            }
+            else {
+                DWORD  ws;
+                HANDLE wh[2];
+
+                wh[0] = cp->hSync[IPCSOCK_RTT];
+                wh[1] = cp->rp->hProcessLock;
+                if (nTimeup != 0) {
+                    nTimeout = (int)(nTimeup - GetCurrentMilliseconds());
+                    if (nTimeout < 0)
+                        nTimeout = 0;
+                }
+                /* Wait for a signal.
+                 * XXX: INFINITE timeouts should really be some sane
+                 *      value which will abort the connection.
+                 */
+                ws = WaitForMultipleObjects(2, wh, FALSE, nTimeout);
+                *pSendWait = FALSE;
+                switch (ws) {
+                    case WAIT_OBJECT_0:
+                    break;
+                    case WAIT_OBJECT_1:
+                    case WAIT_ABANDONED_1:
+                        ReleaseMutex(cp->rp->hProcessLock);
+                        cp->c->dwError = WSAECONNRESET;
+                        InterlockedExchange(cp->pStatus, IPCSOCK_ABORTED);
+                        rc = cp->c->dwError;
+                    break;
+                    case WAIT_TIMEOUT:
+                        rc = WSAETIMEDOUT;
+                    break;
+                    default:
+                        /* This can be only WAIT_FAILED
+                         * in which case errno is already set
+                         */
+                        rc = GetLastError();
+                    break;
+                }
+            }
+        }
+        else {
+            *pSendWait = FALSE;
+            break;
+        }
+    }
+    _InterlockedAnd(&cp->nState, ~IPCSOCK_SEND);
+    ApcIpcUnref(cp);
+    return rc;
+}
+

Added: commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsstream.c
URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsstream.c?rev=1154448&view=auto
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsstream.c (added)
+++ commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsstream.c Sat Aug  6 06:23:24 2011
@@ -0,0 +1,49 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "acr/error.h"
+#include "acr/iodefs.h"
+#include "acr/clazz.h"
+#include "acr/string.h"
+#include "acr/memory.h"
+#include "acr/iofd.h"
+#include "acr/netapi.h"
+#include "acr/unsafe.h"
+#include "acr/port.h"
+#include "acr/time.h"
+#include "arch_opts.h"
+#include "arch_sync.h"
+#include "arch_ipcs.h"
+
+ACR_NET_EXPORT(jint, IpcStream, read0)(JNI_STDARGS, jlong sp)
+{
+    int  rv = -1;
+    int  rd;
+    BYTE ch;
+    LPIPCSOCK cp = J2P(sp, LPIPCSOCK);
+
+    if (ACR_HASFLAG(cp, ACR_SO_RDEOF))
+        return -1;
+    rd = AcrIpcRead(cp, &ch, 1);
+    if (rd == -1)
+        ACR_THROW_NET_ERRNO();
+    else if (rd == 1)
+        rv = ch;
+    else
+        cp->flags |= ACR_SO_RDEOF;
+    return rv;
+}
+

Propchange: commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsstream.c
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message