qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [20/51] [abbrv] qpid-proton git commit: Update with merge of latest proton codebase and checked against latest emscripten incoming branch
Date Fri, 28 Nov 2014 13:49:58 GMT
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/windows/io.c
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/io.c b/proton-c/src/windows/io.c
index 2e3800b..b5660be 100644
--- a/proton-c/src/windows/io.c
+++ b/proton-c/src/windows/io.c
@@ -27,32 +27,47 @@
 #error "Proton requires Windows API support for XP or later."
 #endif
 #include <winsock2.h>
+#include <mswsock.h>
 #include <Ws2tcpip.h>
 #define PN_WINAPI
 
-#include "../platform.h"
+#include "platform.h"
 #include <proton/io.h>
 #include <proton/object.h>
+#include <proton/selector.h>
+#include "iocp.h"
+#include "util.h"
 
 #include <ctype.h>
 #include <errno.h>
 #include <stdio.h>
 #include <assert.h>
 
-static int pni_error_from_wsaerr(pn_error_t *error, const char *msg) {
-  errno = WSAGetLastError();
-  return pn_i_error_from_errno(error, msg);
+int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code)
+{
+  // Error code can be from GetLastError or WSAGetLastError,
+  char err[1024] = {0};
+  FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS |
+                FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL);
+  return pn_error_format(error, PN_ERR, "%s: %s", msg, err);
 }
 
-
-#define MAX_HOST (1024)
-#define MAX_SERV (64)
+static void io_log(const char *fmt, ...)
+{
+  va_list ap;
+  va_start(ap, fmt);
+  vfprintf(stderr, fmt, ap);
+  va_end(ap);
+  fflush(stderr);
+}
 
 struct pn_io_t {
-  char host[MAX_HOST];
-  char serv[MAX_SERV];
+  char host[NI_MAXHOST];
+  char serv[NI_MAXSERV];
   pn_error_t *error;
+  bool trace;
   bool wouldblock;
+  iocp_t *iocp;
 };
 
 void pn_io_initialize(void *obj)
@@ -60,21 +75,24 @@ void pn_io_initialize(void *obj)
   pn_io_t *io = (pn_io_t *) obj;
   io->error = pn_error();
   io->wouldblock = false;
+  io->trace = pn_env_bool("PN_TRACE_DRV");
 
   /* Request WinSock 2.2 */
   WORD wsa_ver = MAKEWORD(2, 2);
   WSADATA unused;
   int err = WSAStartup(wsa_ver, &unused);
   if (err) {
-    pni_error_from_wsaerr(io->error, "pipe");
-    fprintf(stderr, "Can't load WinSock: %d\n", err);
+    pni_win32_error(io->error, "WSAStartup", WSAGetLastError());
+    fprintf(stderr, "Can't load WinSock: %d\n", pn_error_text(io->error));
   }
+  io->iocp = pni_iocp();
 }
 
 void pn_io_finalize(void *obj)
 {
   pn_io_t *io = (pn_io_t *) obj;
   pn_error_free(io->error);
+  pn_free(io->iocp);
   WSACleanup();
 }
 
@@ -84,7 +102,7 @@ void pn_io_finalize(void *obj)
 
 pn_io_t *pn_io(void)
 {
-  static pn_class_t clazz = PN_CLASS(pn_io);
+  static const pn_class_t clazz = PN_CLASS(pn_io);
   pn_io_t *io = (pn_io_t *) pn_new(sizeof(pn_io_t), &clazz);
   return io;
 }
@@ -100,20 +118,40 @@ pn_error_t *pn_io_error(pn_io_t *io)
   return io->error;
 }
 
+static void ensure_unique(pn_io_t *io, pn_socket_t new_socket)
+{
+  // A brand new socket can have the same HANDLE value as a previous
+  // one after a socketclose.  If the application closes one itself
+  // (i.e. not using pn_close), we don't find out about it until here.
+  iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, new_socket);
+  if (iocpd) {
+    if (io->trace)
+      io_log("Stale external socket reference discarded\n");
+    // Re-use means former socket instance was closed
+    assert(iocpd->ops_in_progress == 0);
+    assert(iocpd->external);
+    // Clean up the straggler as best we can
+    pn_socket_t sock = iocpd->socket;
+    iocpd->socket = INVALID_SOCKET;
+    pni_iocpdesc_map_del(io->iocp, sock);  // may free the iocpdesc_t depending on refcount
+  }
+}
+
+
 /*
- * Windows pipes don't work with select(), so a socket based pipe
- * workaround is provided.  They do work with completion ports, so the
- * workaround can be disposed with in future.
+ * This heavyweight surrogate pipe could be replaced with a normal Windows pipe
+ * now that select() is no longer used.  If interrupt semantics are all that is
+ * needed, a simple user space counter and reserved completion status would
+ * probably suffice.
  */
-static int pni_socket_pair(SOCKET sv[2]);
+static int pni_socket_pair(pn_io_t *io, SOCKET sv[2]);
 
 int pn_pipe(pn_io_t *io, pn_socket_t *dest)
 {
-  int n = pni_socket_pair(dest);
+  int n = pni_socket_pair(io, dest);
   if (n) {
-    pni_error_from_wsaerr(io->error, "pipe");
+    pni_win32_error(io->error, "pipe", WSAGetLastError());
   }
-
   return n;
 }
 
@@ -125,9 +163,14 @@ static void pn_configure_sock(pn_io_t *io, pn_socket_t sock) {
   if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)) != 0) {
     perror("setsockopt");
   }
+
+  u_long nonblock = 1;
+  if (ioctlsocket(sock, FIONBIO, &nonblock)) {
+    perror("ioctlsocket");
+  }
 }
 
-static inline pn_socket_t pn_create_socket(void);
+static inline pn_socket_t pni_create_socket();
 
 pn_socket_t pn_listen(pn_io_t *io, const char *host, const char *port)
 {
@@ -138,34 +181,43 @@ pn_socket_t pn_listen(pn_io_t *io, const char *host, const char *port)
     return INVALID_SOCKET;
   }
 
-  pn_socket_t sock = pn_create_socket();
+  pn_socket_t sock = pni_create_socket();
   if (sock == INVALID_SOCKET) {
-    pni_error_from_wsaerr(io->error, "pn_create_socket");
+    pni_win32_error(io->error, "pni_create_socket", WSAGetLastError());
     return INVALID_SOCKET;
   }
+  ensure_unique(io, sock);
 
-  BOOL optval = 1;
-  if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char *) &optval, sizeof(optval)) == -1) {
-    pni_error_from_wsaerr(io->error, "setsockopt");
+  bool optval = 1;
+  if (setsockopt(sock, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char *) &optval,
+                 sizeof(optval)) == -1) {
+    pni_win32_error(io->error, "setsockopt", WSAGetLastError());
     closesocket(sock);
     return INVALID_SOCKET;
   }
 
   if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
-    pni_error_from_wsaerr(io->error, "bind");
+    pni_win32_error(io->error, "bind", WSAGetLastError());
     freeaddrinfo(addr);
     closesocket(sock);
     return INVALID_SOCKET;
   }
-
   freeaddrinfo(addr);
 
   if (listen(sock, 50) == -1) {
-    pni_error_from_wsaerr(io->error, "listen");
+    pni_win32_error(io->error, "listen", WSAGetLastError());
     closesocket(sock);
     return INVALID_SOCKET;
   }
 
+  iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false);
+  if (!iocpd) {
+    pn_i_error_from_errno(io->error, "register");
+    closesocket(sock);
+    return INVALID_SOCKET;
+  }
+
+  pni_iocpdesc_start(iocpd);
   return sock;
 }
 
@@ -181,66 +233,83 @@ pn_socket_t pn_connect(pn_io_t *io, const char *hostarg, const char *port)
     return INVALID_SOCKET;
   }
 
-  pn_socket_t sock = pn_create_socket();
+  pn_socket_t sock = pni_create_socket();
   if (sock == INVALID_SOCKET) {
-    pni_error_from_wsaerr(io->error, "pn_create_socket");
+    pni_win32_error(io->error, "proton pni_create_socket", WSAGetLastError());
+    freeaddrinfo(addr);
     return INVALID_SOCKET;
   }
 
+  ensure_unique(io, sock);
   pn_configure_sock(io, sock);
-
-  if (connect(sock, addr->ai_addr, addr->ai_addrlen) != 0) {
-    if (WSAGetLastError() != WSAEWOULDBLOCK) {
-      pni_error_from_wsaerr(io->error, "connect");
-      freeaddrinfo(addr);
-      closesocket(sock);
-      return INVALID_SOCKET;
-    }
-  }
-
-  freeaddrinfo(addr);
-
-  return sock;
+  return pni_iocp_begin_connect(io->iocp, sock, addr, io->error);
 }
 
-pn_socket_t pn_accept(pn_io_t *io, pn_socket_t socket, char *name, size_t size)
+pn_socket_t pn_accept(pn_io_t *io, pn_socket_t listen_sock, char *name, size_t size)
 {
   struct sockaddr_in addr = {0};
   addr.sin_family = AF_INET;
   socklen_t addrlen = sizeof(addr);
-  pn_socket_t sock = accept(socket, (struct sockaddr *) &addr, &addrlen);
-  if (sock == INVALID_SOCKET) {
-    pni_error_from_wsaerr(io->error, "accept");
-    return sock;
+  iocpdesc_t *listend = pni_iocpdesc_map_get(io->iocp, listen_sock);
+  pn_socket_t accept_sock;
+
+  if (listend)
+    accept_sock = pni_iocp_end_accept(listend, (struct sockaddr *) &addr, &addrlen, &io->wouldblock, io->error);
+  else {
+    // User supplied socket
+    accept_sock = accept(listen_sock, (struct sockaddr *) &addr, &addrlen);
+    if (accept_sock == INVALID_SOCKET)
+      pni_win32_error(io->error, "sync accept", WSAGetLastError());
+  }
+
+  if (accept_sock == INVALID_SOCKET)
+    return accept_sock;
+
+  int code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST,
+                         io->serv, NI_MAXSERV, 0);
+  if (code)
+    code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST,
+                       io->serv, NI_MAXSERV, NI_NUMERICHOST | NI_NUMERICSERV);
+  if (code) {
+    pn_error_format(io->error, PN_ERR, "getnameinfo: %s\n", gai_strerror(code));
+    pn_close(io, accept_sock);
+    return INVALID_SOCKET;
   } else {
-    int code;
-    if ((code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, MAX_HOST, io->serv, MAX_SERV, 0))) {
-      pn_error_format(io->error, PN_ERR, "getnameinfo: %s\n", gai_strerror(code));
-      if (closesocket(sock) == -1)
-        pni_error_from_wsaerr(io->error, "closesocket");
-      return INVALID_SOCKET;
-    } else {
-      pn_configure_sock(io, sock);
-      snprintf(name, size, "%s:%s", io->host, io->serv);
-      return sock;
+    pn_configure_sock(io, accept_sock);
+    snprintf(name, size, "%s:%s", io->host, io->serv);
+    if (listend) {
+      pni_iocpdesc_start(pni_iocpdesc_map_get(io->iocp, accept_sock));
     }
+    return accept_sock;
   }
 }
 
-static inline pn_socket_t pn_create_socket(void) {
+static inline pn_socket_t pni_create_socket() {
   return socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto);
 }
 
 ssize_t pn_send(pn_io_t *io, pn_socket_t sockfd, const void *buf, size_t len) {
-  ssize_t count = send(sockfd, (const char *) buf, len, 0);
-  io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK;
+  ssize_t count;
+  iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, sockfd);
+  if (iocpd) {
+    count = pni_iocp_begin_write(iocpd, buf, len, &io->wouldblock, io->error);
+  } else {
+    count = send(sockfd, (const char *) buf, len, 0);
+    io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK;
+  }
   return count;
 }
 
 ssize_t pn_recv(pn_io_t *io, pn_socket_t socket, void *buf, size_t size)
 {
-  ssize_t count = recv(socket, (char *) buf, size, 0);
-  io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK;
+  ssize_t count;
+  iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket);
+  if (iocpd) {
+    count = pni_iocp_recv(iocpd, buf, size, &io->wouldblock, io->error);
+  } else {
+    count = recv(socket, (char *) buf, size, 0);
+    io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK;
+  }
   return count;
 }
 
@@ -257,7 +326,12 @@ ssize_t pn_read(pn_io_t *io, pn_socket_t socket, void *buf, size_t size)
 
 void pn_close(pn_io_t *io, pn_socket_t socket)
 {
-  closesocket(socket);
+  iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket);
+  if (iocpd)
+    pni_iocp_begin_close(iocpd);
+  else {
+    closesocket(socket);
+  }
 }
 
 bool pn_wouldblock(pn_io_t *io)
@@ -265,8 +339,24 @@ bool pn_wouldblock(pn_io_t *io)
   return io->wouldblock;
 }
 
+pn_selector_t *pn_io_selector(pn_io_t *io)
+{
+  if (io->iocp->selector == NULL)
+    io->iocp->selector = pni_selector_create(io->iocp);
+  return io->iocp->selector;
+}
+
+static void configure_pipe_socket(pn_io_t *io, pn_socket_t sock)
+{
+  u_long v = 1;
+  ioctlsocket (sock, FIONBIO, &v);
+  ensure_unique(io, sock);
+  iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false);
+  pni_iocpdesc_start(iocpd);
+}
 
-static int pni_socket_pair (SOCKET sv[2]) {
+
+static int pni_socket_pair (pn_io_t *io, SOCKET sv[2]) {
   // no socketpair on windows.  provide pipe() semantics using sockets
 
   SOCKET sock = socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto);
@@ -330,9 +420,9 @@ static int pni_socket_pair (SOCKET sv[2]) {
     }
   }
 
-  u_long v = 1;
-  ioctlsocket (sv[0], FIONBIO, &v);
-  ioctlsocket (sv[1], FIONBIO, &v);
+  configure_pipe_socket(io, sv[0]);
+  configure_pipe_socket(io, sv[1]);
   closesocket(sock);
   return 0;
 }
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/windows/iocp.c
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/iocp.c b/proton-c/src/windows/iocp.c
new file mode 100644
index 0000000..614b130
--- /dev/null
+++ b/proton-c/src/windows/iocp.c
@@ -0,0 +1,1138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+#if _WIN32_WINNT < 0x0501
+#error "Proton requires Windows API support for XP or later."
+#endif
+#include <winsock2.h>
+#include <mswsock.h>
+#include <Ws2tcpip.h>
+#define PN_WINAPI
+
+#include "../platform.h"
+#include <proton/object.h>
+#include <proton/io.h>
+#include <proton/selector.h>
+#include <proton/error.h>
+#include <proton/transport.h>
+#include "iocp.h"
+#include "util.h"
+#include <assert.h>
+
+/*
+ * Windows IO Completion Port support for Proton.
+ *
+ * Overlapped writes are used to avoid lengthy stalls between write
+ * completion and starting a new write.  Non-overlapped reads are used
+ * since Windows accumulates inbound traffic without stalling and
+ * managing read buffers would not avoid a memory copy at the pn_read
+ * boundary.
+ */
+
+// Max number of overlapped accepts per listener
+#define IOCP_MAX_ACCEPTS 10
+
+// AcceptEx squishes the local and remote addresses and optional data
+// all together when accepting the connection. Reserve enough for
+// IPv6 addresses, even if the socket is IPv4. The 16 bytes padding
+// per address is required by AcceptEx.
+#define IOCP_SOCKADDRMAXLEN (sizeof(sockaddr_in6) + 16)
+#define IOCP_SOCKADDRBUFLEN (2 * IOCP_SOCKADDRMAXLEN)
+
+static void iocp_log(const char *fmt, ...)
+{
+  va_list ap;
+  va_start(ap, fmt);
+  vfprintf(stderr, fmt, ap);
+  va_end(ap);
+  fflush(stderr);
+}
+
+static void set_iocp_error_status(pn_error_t *error, int code, HRESULT status)
+{
+  char buf[512];
+  if (FormatMessage(FORMAT_MESSAGE_MAX_WIDTH_MASK | FORMAT_MESSAGE_FROM_SYSTEM,
+                    0, status, 0, buf, sizeof(buf), 0))
+    pn_error_set(error, code, buf);
+  else {
+    fprintf(stderr, "pn internal Windows error: %lu\n", GetLastError());
+  }
+}
+
+static void reap_check(iocpdesc_t *);
+static void bind_to_completion_port(iocpdesc_t *iocpd);
+static void iocp_shutdown(iocpdesc_t *iocpd);
+static void start_reading(iocpdesc_t *iocpd);
+static bool is_listener(iocpdesc_t *iocpd);
+static void release_sys_sendbuf(SOCKET s);
+
+static void iocpdesc_fail(iocpdesc_t *iocpd, HRESULT status, const char* text)
+{
+  pni_win32_error(iocpd->error, text, status);
+  if (iocpd->iocp->iocp_trace) {
+    iocp_log("connection terminated: %s\n", pn_error_text(iocpd->error));
+  }
+  if (!is_listener(iocpd) && !iocpd->write_closed && !pni_write_pipeline_size(iocpd->pipeline))
+    iocp_shutdown(iocpd);
+  iocpd->write_closed = true;
+  iocpd->read_closed = true;
+  pni_events_update(iocpd, iocpd->events | PN_READABLE | PN_WRITABLE);
+}
+
+// Helper functions to use specialized IOCP AcceptEx() and ConnectEx()
+static LPFN_ACCEPTEX lookup_accept_ex(SOCKET s)
+{
+  GUID guid = WSAID_ACCEPTEX;
+  DWORD bytes = 0;
+  LPFN_ACCEPTEX fn;
+  WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
+           &fn, sizeof(fn), &bytes, NULL, NULL);
+  assert(fn);
+  return fn;
+}
+
+static LPFN_CONNECTEX lookup_connect_ex(SOCKET s)
+{
+  GUID guid = WSAID_CONNECTEX;
+  DWORD bytes = 0;
+  LPFN_CONNECTEX fn;
+  WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
+           &fn, sizeof(fn), &bytes, NULL, NULL);
+  assert(fn);
+  return fn;
+}
+
+static LPFN_GETACCEPTEXSOCKADDRS lookup_get_accept_ex_sockaddrs(SOCKET s)
+{
+  GUID guid = WSAID_GETACCEPTEXSOCKADDRS;
+  DWORD bytes = 0;
+  LPFN_GETACCEPTEXSOCKADDRS fn;
+  WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
+           &fn, sizeof(fn), &bytes, NULL, NULL);
+  assert(fn);
+  return fn;
+}
+
+// match accept socket to listener socket
+static iocpdesc_t *create_same_type_socket(iocpdesc_t *iocpd)
+{
+  sockaddr_storage sa;
+  socklen_t salen = sizeof(sa);
+  if (getsockname(iocpd->socket, (sockaddr*)&sa, &salen) == -1)
+    return NULL;
+  SOCKET s = socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
+  if (s == INVALID_SOCKET)
+    return NULL;
+  return pni_iocpdesc_create(iocpd->iocp, s, false);
+}
+
+static bool is_listener(iocpdesc_t *iocpd)
+{
+  return iocpd && iocpd->acceptor;
+}
+
+// === Async accept processing
+
+typedef struct {
+  iocp_result_t base;
+  iocpdesc_t *new_sock;
+  char address_buffer[IOCP_SOCKADDRBUFLEN];
+  DWORD unused;
+} accept_result_t;
+
+static accept_result_t *accept_result(iocpdesc_t *listen_sock) {
+  accept_result_t *result = (accept_result_t *) pn_new(sizeof(accept_result_t), 0);
+  memset(result, 0, sizeof(accept_result_t));
+  if (result) {
+    result->base.type = IOCP_ACCEPT;
+    result->base.iocpd = listen_sock;
+  }
+  return result;
+}
+
+static void reset_accept_result(accept_result_t *result) {
+  memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
+  memset(&result->address_buffer, 0, IOCP_SOCKADDRBUFLEN);
+}
+
+struct pni_acceptor_t {
+  int accept_queue_size;
+  pn_list_t *accepts;
+  iocpdesc_t *listen_sock;
+  bool signalled;
+  LPFN_ACCEPTEX fn_accept_ex;
+  LPFN_GETACCEPTEXSOCKADDRS fn_get_accept_ex_sockaddrs;
+};
+
+#define pni_acceptor_compare NULL
+#define pni_acceptor_inspect NULL
+#define pni_acceptor_hashcode NULL
+
+static void pni_acceptor_initialize(void *object)
+{
+  pni_acceptor_t *acceptor = (pni_acceptor_t *) object;
+  acceptor->accepts = pn_list(IOCP_MAX_ACCEPTS, 0);
+}
+
+static void pni_acceptor_finalize(void *object)
+{
+  pni_acceptor_t *acceptor = (pni_acceptor_t *) object;
+  size_t len = pn_list_size(acceptor->accepts);
+  for (size_t i = 0; i < len; i++)
+    pn_free(pn_list_get(acceptor->accepts, i));
+  pn_free(acceptor->accepts);
+}
+
+static pni_acceptor_t *pni_acceptor(iocpdesc_t *iocpd)
+{
+  static const pn_class_t clazz = PN_CLASS(pni_acceptor);
+  pni_acceptor_t *acceptor = (pni_acceptor_t *) pn_new(sizeof(pni_acceptor_t), &clazz);
+  acceptor->listen_sock = iocpd;
+  acceptor->accept_queue_size = 0;
+  acceptor->signalled = false;
+  pn_socket_t sock = acceptor->listen_sock->socket;
+  acceptor->fn_accept_ex = lookup_accept_ex(sock);
+  acceptor->fn_get_accept_ex_sockaddrs = lookup_get_accept_ex_sockaddrs(sock);
+  return acceptor;
+}
+
+static void begin_accept(pni_acceptor_t *acceptor, accept_result_t *result)
+{
+  if (acceptor->listen_sock->closing) {
+    if (result) {
+      pn_free(result);
+      acceptor->accept_queue_size--;
+    }
+    if (acceptor->accept_queue_size == 0)
+      acceptor->signalled = true;
+    return;
+  }
+
+  if (result) {
+    reset_accept_result(result);
+  } else {
+    if (acceptor->accept_queue_size < IOCP_MAX_ACCEPTS &&
+        pn_list_size(acceptor->accepts) == acceptor->accept_queue_size ) {
+      result = accept_result(acceptor->listen_sock);
+      acceptor->accept_queue_size++;
+    } else {
+      // an async accept is still pending or max concurrent accepts already hit
+      return;
+    }
+  }
+
+  result->new_sock = create_same_type_socket(acceptor->listen_sock);
+  if (result->new_sock) {
+    // Not yet connected.
+    result->new_sock->read_closed = true;
+    result->new_sock->write_closed = true;
+
+    bool success = acceptor->fn_accept_ex(acceptor->listen_sock->socket, result->new_sock->socket,
+                     result->address_buffer, 0, IOCP_SOCKADDRMAXLEN, IOCP_SOCKADDRMAXLEN,
+                     &result->unused, (LPOVERLAPPED) result);
+    if (!success && WSAGetLastError() != ERROR_IO_PENDING) {
+      result->base.status = WSAGetLastError();
+      pn_list_add(acceptor->accepts, result);
+      pni_events_update(acceptor->listen_sock, acceptor->listen_sock->events | PN_READABLE);
+    } else {
+      acceptor->listen_sock->ops_in_progress++;
+      // This socket is equally involved in the async operation.
+      result->new_sock->ops_in_progress++;
+    }
+  } else {
+    iocpdesc_fail(acceptor->listen_sock, WSAGetLastError(), "create accept socket");
+  }
+}
+
+static void complete_accept(accept_result_t *result, HRESULT status)
+{
+  result->new_sock->ops_in_progress--;
+  iocpdesc_t *ld = result->base.iocpd;
+  if (ld->read_closed) {
+    if (!result->new_sock->closing)
+      pni_iocp_begin_close(result->new_sock);
+    pn_free(result);    // discard
+    reap_check(ld);
+  } else {
+    result->base.status = status;
+    pn_list_add(ld->acceptor->accepts, result);
+    pni_events_update(ld, ld->events | PN_READABLE);
+  }
+}
+
+pn_socket_t pni_iocp_end_accept(iocpdesc_t *ld, sockaddr *addr, socklen_t *addrlen, bool *would_block, pn_error_t *error)
+{
+  if (!is_listener(ld)) {
+    set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP);
+    return INVALID_SOCKET;
+  }
+  if (ld->read_closed) {
+    set_iocp_error_status(error, PN_ERR, WSAENOTSOCK);
+    return INVALID_SOCKET;
+  }
+  if (pn_list_size(ld->acceptor->accepts) == 0) {
+    if (ld->events & PN_READABLE && ld->iocp->iocp_trace)
+      iocp_log("listen socket readable with no available accept completions\n");
+    *would_block = true;
+    return INVALID_SOCKET;
+  }
+
+  accept_result_t *result = (accept_result_t *) pn_list_get(ld->acceptor->accepts, 0);
+  pn_list_del(ld->acceptor->accepts, 0, 1);
+  if (!pn_list_size(ld->acceptor->accepts))
+    pni_events_update(ld, ld->events & ~PN_READABLE);  // No pending accepts
+
+  pn_socket_t accept_sock;
+  if (result->base.status) {
+    accept_sock = INVALID_SOCKET;
+    pni_win32_error(ld->error, "accept failure", result->base.status);
+    if (ld->iocp->iocp_trace)
+      iocp_log("%s\n", pn_error_text(ld->error));
+    // App never sees this socket so close it here.
+    pni_iocp_begin_close(result->new_sock);
+  } else {
+    accept_sock = result->new_sock->socket;
+    // AcceptEx special setsockopt:
+    setsockopt(accept_sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&ld->socket,
+                  sizeof (SOCKET));
+    if (addr && addrlen && *addrlen > 0) {
+      sockaddr_storage *local_addr = NULL;
+      sockaddr_storage *remote_addr = NULL;
+      int local_addrlen, remote_addrlen;
+      LPFN_GETACCEPTEXSOCKADDRS fn = ld->acceptor->fn_get_accept_ex_sockaddrs;
+      fn(result->address_buffer, 0, IOCP_SOCKADDRMAXLEN, IOCP_SOCKADDRMAXLEN,
+         (SOCKADDR **) &local_addr, &local_addrlen, (SOCKADDR **) &remote_addr,
+         &remote_addrlen);
+      *addrlen = pn_min(*addrlen, remote_addrlen);
+      memmove(addr, remote_addr, *addrlen);
+    }
+  }
+
+  if (accept_sock != INVALID_SOCKET) {
+    // Connected.
+    result->new_sock->read_closed = false;
+    result->new_sock->write_closed = false;
+  }
+
+  // Done with the completion result, so reuse it
+  result->new_sock = NULL;
+  begin_accept(ld->acceptor, result);
+  return accept_sock;
+}
+
+
+// === Async connect processing
+
+typedef struct {
+  iocp_result_t base;
+  char address_buffer[IOCP_SOCKADDRBUFLEN];
+  struct addrinfo *addrinfo;
+} connect_result_t;
+
+#define connect_result_initialize NULL
+#define connect_result_compare NULL
+#define connect_result_inspect NULL
+#define connect_result_hashcode NULL
+
+static void connect_result_finalize(void *object)
+{
+  connect_result_t *result = (connect_result_t *) object;
+  // Do not release addrinfo until ConnectEx completes
+  if (result->addrinfo)
+    freeaddrinfo(result->addrinfo);
+}
+
+static connect_result_t *connect_result(iocpdesc_t *iocpd, struct addrinfo *addr) {
+  static const pn_class_t clazz = PN_CLASS(connect_result);
+  connect_result_t *result = (connect_result_t *) pn_new(sizeof(connect_result_t), &clazz);
+  if (result) {
+    memset(result, 0, sizeof(connect_result_t));
+    result->base.type = IOCP_CONNECT;
+    result->base.iocpd = iocpd;
+    result->addrinfo = addr;
+  }
+  return result;
+}
+
+pn_socket_t pni_iocp_begin_connect(iocp_t *iocp, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error)
+{
+  // addr lives for the duration of the async connect.  Caller has passed ownership here.
+  // See connect_result_finalize().
+  // Use of Windows-specific ConnectEx() requires our socket to be "loosely" pre-bound:
+  sockaddr_storage sa;
+  memset(&sa, 0, sizeof(sa));
+  sa.ss_family = addr->ai_family;
+  if (bind(sock, (SOCKADDR *) &sa, addr->ai_addrlen)) {
+    pni_win32_error(error, "begin async connection", WSAGetLastError());
+    if (iocp->iocp_trace)
+      iocp_log("%s\n", pn_error_text(error));
+    closesocket(sock);
+    freeaddrinfo(addr);
+    return INVALID_SOCKET;
+  }
+
+  iocpdesc_t *iocpd = pni_iocpdesc_create(iocp, sock, false);
+  bind_to_completion_port(iocpd);
+  LPFN_CONNECTEX fn_connect_ex = lookup_connect_ex(iocpd->socket);
+  connect_result_t *result = connect_result(iocpd, addr);
+  DWORD unused;
+  bool success = fn_connect_ex(iocpd->socket, result->addrinfo->ai_addr, result->addrinfo->ai_addrlen,
+                               NULL, 0, &unused, (LPOVERLAPPED) result);
+  if (!success && WSAGetLastError() != ERROR_IO_PENDING) {
+    pni_win32_error(error, "ConnectEx failure", WSAGetLastError());
+    pn_free(result);
+    iocpd->write_closed = true;
+    iocpd->read_closed = true;
+    pni_iocp_begin_close(iocpd);
+    sock = INVALID_SOCKET;
+    if (iocp->iocp_trace)
+      iocp_log("%s\n", pn_error_text(error));
+  } else {
+    iocpd->ops_in_progress++;
+  }
+  return sock;
+}
+
+static void complete_connect(connect_result_t *result, HRESULT status)
+{
+  iocpdesc_t *iocpd = result->base.iocpd;
+  if (iocpd->closing) {
+    pn_free(result);
+    reap_check(iocpd);
+    return;
+  }
+
+  if (status) {
+    iocpdesc_fail(iocpd, status, "Connect failure");
+  } else {
+    release_sys_sendbuf(iocpd->socket);
+    if (setsockopt(iocpd->socket, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT,  NULL, 0)) {
+      iocpdesc_fail(iocpd, WSAGetLastError(), "Connect failure (update context)");
+    } else {
+      pni_events_update(iocpd, PN_WRITABLE);
+      start_reading(iocpd);
+    }
+  }
+  pn_free(result);
+  return;
+}
+
+
+// === Async writes
+
+static bool write_in_progress(iocpdesc_t *iocpd)
+{
+  return pni_write_pipeline_size(iocpd->pipeline) != 0;
+}
+
+write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen)
+{
+  write_result_t *result = (write_result_t *) calloc(sizeof(write_result_t), 1);
+  if (result) {
+    result->base.type = IOCP_WRITE;
+    result->base.iocpd = iocpd;
+    result->buffer.start = buf;
+    result->buffer.size = buflen;
+  }
+  return result;
+}
+
+static int submit_write(write_result_t *result, const void *buf, size_t len)
+{
+  WSABUF wsabuf;
+  wsabuf.buf = (char *) buf;
+  wsabuf.len = len;
+  memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
+  return WSASend(result->base.iocpd->socket, &wsabuf, 1, NULL, 0,
+                 (LPOVERLAPPED) result, 0);
+}
+
+ssize_t pni_iocp_begin_write(iocpdesc_t *iocpd, const void *buf, size_t len, bool *would_block, pn_error_t *error)
+{
+  if (len == 0) return 0;
+  *would_block = false;
+  if (is_listener(iocpd)) {
+    set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP);
+    return INVALID_SOCKET;
+  }
+  if (iocpd->closing) {
+    set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN);
+    return SOCKET_ERROR;
+  }
+  if (iocpd->write_closed) {
+    assert(pn_error_code(iocpd->error));
+    pn_error_copy(error, iocpd->error);
+    if (iocpd->iocp->iocp_trace)
+      iocp_log("write error: %s\n", pn_error_text(error));
+    return SOCKET_ERROR;
+  }
+  if (len == 0) return 0;
+  if (!(iocpd->events & PN_WRITABLE)) {
+    *would_block = true;
+    return SOCKET_ERROR;
+  }
+
+  size_t written = 0;
+  size_t requested = len;
+  const char *outgoing = (const char *) buf;
+  size_t available = pni_write_pipeline_reserve(iocpd->pipeline, len);
+  if (!available) {
+    *would_block = true;
+    return SOCKET_ERROR;
+  }
+
+  for (size_t wr_count = 0; wr_count < available; wr_count++) {
+    write_result_t *result = pni_write_pipeline_next(iocpd->pipeline);
+    assert(result);
+    result->base.iocpd = iocpd;
+    ssize_t actual_len = pn_min(len, result->buffer.size);
+    result->requested = actual_len;
+    memmove((void *)result->buffer.start, outgoing, actual_len);
+    outgoing += actual_len;
+    written += actual_len;
+    len -= actual_len;
+
+    int werror = submit_write(result, result->buffer.start, actual_len);
+    if (werror && WSAGetLastError() != ERROR_IO_PENDING) {
+      pni_write_pipeline_return(iocpd->pipeline, result);
+      iocpdesc_fail(iocpd, WSAGetLastError(), "overlapped send");
+      return SOCKET_ERROR;
+    }
+    iocpd->ops_in_progress++;
+  }
+
+  if (!pni_write_pipeline_writable(iocpd->pipeline))
+    pni_events_update(iocpd, iocpd->events & ~PN_WRITABLE);
+  return written;
+}
+
+static void complete_write(write_result_t *result, DWORD xfer_count, HRESULT status)
+{
+  iocpdesc_t *iocpd = result->base.iocpd;
+  if (iocpd->closing) {
+    pni_write_pipeline_return(iocpd->pipeline, result);
+    if (!iocpd->write_closed && !write_in_progress(iocpd))
+      iocp_shutdown(iocpd);
+    reap_check(iocpd);
+    return;
+  }
+  if (status == 0 && xfer_count > 0) {
+    if (xfer_count != result->requested) {
+      // Is this recoverable?  How to preserve order if multiple overlapped writes?
+      pni_write_pipeline_return(iocpd->pipeline, result);
+      iocpdesc_fail(iocpd, WSA_OPERATION_ABORTED, "Partial overlapped write on socket");
+      return;
+    } else {
+      // Success.
+      pni_write_pipeline_return(iocpd->pipeline, result);
+      if (pni_write_pipeline_writable(iocpd->pipeline))
+        pni_events_update(iocpd, iocpd->events | PN_WRITABLE);
+      return;
+    }
+  }
+  pni_write_pipeline_return(iocpd->pipeline, result);
+  iocpdesc_fail(iocpd, status, "IOCP async write error");
+}
+
+
+// === Async reads
+
+struct read_result_t {
+  iocp_result_t base;
+  size_t drain_count;
+  char unused_buf[1];
+};
+
+static read_result_t *read_result(iocpdesc_t *iocpd)
+{
+  read_result_t *result = (read_result_t *) calloc(sizeof(read_result_t), 1);
+  if (result) {
+    result->base.type = IOCP_READ;
+    result->base.iocpd = iocpd;
+  }
+  return result;
+}
+
+static void begin_zero_byte_read(iocpdesc_t *iocpd)
+{
+  if (iocpd->read_in_progress) return;
+  if (iocpd->read_closed) {
+    pni_events_update(iocpd, iocpd->events | PN_READABLE);
+    return;
+  }
+
+  read_result_t *result = iocpd->read_result;
+  memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
+  DWORD flags = 0;
+  WSABUF wsabuf;
+  wsabuf.buf = result->unused_buf;
+  wsabuf.len = 0;
+  int rc = WSARecv(iocpd->socket, &wsabuf, 1, NULL, &flags,
+                       &result->base.overlapped, 0);
+  if (rc && WSAGetLastError() != ERROR_IO_PENDING) {
+    iocpdesc_fail(iocpd, WSAGetLastError(), "IOCP read error");
+    return;
+  }
+  iocpd->ops_in_progress++;
+  iocpd->read_in_progress = true;
+}
+
+static void drain_until_closed(iocpdesc_t *iocpd) {
+  int max_drain = 16 * 1024;
+  char buf[512];
+  read_result_t *result = iocpd->read_result;
+  while (result->drain_count < max_drain) {
+    int rv = recv(iocpd->socket, buf, 512, 0);
+    if (rv > 0)
+      result->drain_count += rv;
+    else if (rv == 0) {
+      iocpd->read_closed = true;
+      return;
+    } else if (WSAGetLastError() == WSAEWOULDBLOCK) {
+      // wait a little longer
+      start_reading(iocpd);
+      return;
+    }
+    else
+      break;
+  }
+  // Graceful close indication unlikely, force the issue
+  if (iocpd->iocp->iocp_trace)
+    if (result->drain_count >= max_drain)
+      iocp_log("graceful close on reader abandoned (too many chars)\n");
+    else
+      iocp_log("graceful close on reader abandoned: %d\n", WSAGetLastError());
+  iocpd->read_closed = true;
+  closesocket(iocpd->socket);
+  iocpd->socket = INVALID_SOCKET;
+}
+
+
+static void complete_read(read_result_t *result, DWORD xfer_count, HRESULT status)
+{
+  iocpdesc_t *iocpd = result->base.iocpd;
+  iocpd->read_in_progress = false;
+
+  if (iocpd->closing) {
+    // Application no longer reading, but we are looking for a zero length read
+    if (!iocpd->read_closed)
+      drain_until_closed(iocpd);
+    reap_check(iocpd);
+    return;
+  }
+
+  if (status == 0 && xfer_count == 0) {
+    // Success.
+    pni_events_update(iocpd, iocpd->events | PN_READABLE);
+  } else {
+    iocpdesc_fail(iocpd, status, "IOCP read complete error");
+  }
+}
+
+ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error)
+{
+  if (size == 0) return 0;
+  *would_block = false;
+  if (is_listener(iocpd)) {
+    set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP);
+    return SOCKET_ERROR;
+  }
+  if (iocpd->closing) {
+    // Previous call to pn_close()
+    set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN);
+    return SOCKET_ERROR;
+  }
+  if (iocpd->read_closed) {
+    if (pn_error_code(iocpd->error))
+      pn_error_copy(error, iocpd->error);
+    else
+      set_iocp_error_status(error, PN_ERR, WSAENOTCONN);
+    return SOCKET_ERROR;
+  }
+
+  size_t count = recv(iocpd->socket, (char *) buf, size, 0);
+  if (count > 0) {
+    pni_events_update(iocpd, iocpd->events & ~PN_READABLE);
+    begin_zero_byte_read(iocpd);
+    return count;
+  } else if (count == 0) {
+    iocpd->read_closed = true;
+    return 0;
+  }
+  if (WSAGetLastError() == WSAEWOULDBLOCK)
+    *would_block = true;
+  else
+    set_iocp_error_status(error, PN_ERR, WSAGetLastError());
+  return SOCKET_ERROR;
+}
+
+static void start_reading(iocpdesc_t *iocpd)
+{
+  begin_zero_byte_read(iocpd);
+}
+
+
+// === The iocp descriptor
+
+static void pni_iocpdesc_initialize(void *object)
+{
+  iocpdesc_t *iocpd = (iocpdesc_t *) object;
+  memset(iocpd, 0, sizeof(iocpdesc_t));
+  iocpd->socket = INVALID_SOCKET;
+}
+
+static void pni_iocpdesc_finalize(void *object)
+{
+  iocpdesc_t *iocpd = (iocpdesc_t *) object;
+  pn_free(iocpd->acceptor);
+  pn_error_free(iocpd->error);
+   if (iocpd->pipeline)
+    if (write_in_progress(iocpd))
+      iocp_log("iocp descriptor write leak\n");
+    else
+      pn_free(iocpd->pipeline);
+  if (iocpd->read_in_progress)
+    iocp_log("iocp descriptor read leak\n");
+  else
+    free(iocpd->read_result);
+}
+
+static uintptr_t pni_iocpdesc_hashcode(void *object)
+{
+  iocpdesc_t *iocpd = (iocpdesc_t *) object;
+  return iocpd->socket;
+}
+
+#define pni_iocpdesc_compare NULL
+#define pni_iocpdesc_inspect NULL
+
+// Reference counted in the iocpdesc map, zombie_list, selector.
+static iocpdesc_t *pni_iocpdesc(pn_socket_t s)
+{
+  static pn_class_t clazz = PN_CLASS(pni_iocpdesc);
+  assert (s != INVALID_SOCKET);
+  iocpdesc_t *iocpd = (iocpdesc_t *) pn_new(sizeof(iocpdesc_t), &clazz);
+  assert(iocpd);
+  iocpd->socket = s;
+  return iocpd;
+}
+
+static bool is_listener_socket(pn_socket_t s)
+{
+  BOOL tval = false;
+  int tvalsz = sizeof(tval);
+  int code = getsockopt(s, SOL_SOCKET, SO_ACCEPTCONN, (char *)&tval, &tvalsz);
+  return code == 0 && tval;
+}
+
+iocpdesc_t *pni_iocpdesc_create(iocp_t *iocp, pn_socket_t s, bool external) {
+  assert(!pni_iocpdesc_map_get(iocp, s));
+  bool listening = is_listener_socket(s);
+  iocpdesc_t *iocpd = pni_iocpdesc(s);
+  iocpd->iocp = iocp;
+  if (iocpd) {
+    iocpd->external = external;
+    iocpd->error = pn_error();
+    if (listening) {
+      iocpd->acceptor = pni_acceptor(iocpd);
+    } else {
+      iocpd->pipeline = pni_write_pipeline(iocpd);
+      iocpd->read_result = read_result(iocpd);
+    }
+    pni_iocpdesc_map_push(iocpd);
+  }
+  return iocpd;
+}
+
+// === Fast lookup of a socket's iocpdesc_t
+
+iocpdesc_t *pni_iocpdesc_map_get(iocp_t *iocp, pn_socket_t s) {
+  iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_get(iocp->iocpdesc_map, s);
+  return iocpd;
+}
+
+void pni_iocpdesc_map_push(iocpdesc_t *iocpd) {
+  pn_hash_put(iocpd->iocp->iocpdesc_map, iocpd->socket, iocpd);
+  pn_decref(iocpd);
+  assert(pn_refcount(iocpd) == 1);
+}
+
+void pni_iocpdesc_map_del(iocp_t *iocp, pn_socket_t s) {
+  pn_hash_del(iocp->iocpdesc_map, (uintptr_t) s);
+}
+
+static void bind_to_completion_port(iocpdesc_t *iocpd)
+{
+  if (iocpd->bound) return;
+  if (!iocpd->iocp->completion_port) {
+    iocpdesc_fail(iocpd, WSAEINVAL, "Incomplete setup, no completion port.");
+    return;
+  }
+
+  if (CreateIoCompletionPort ((HANDLE) iocpd->socket, iocpd->iocp->completion_port, 0, 0))
+    iocpd->bound = true;
+  else {
+    iocpdesc_fail(iocpd, GetLastError(), "IOCP socket setup.");
+  }
+}
+
+static void release_sys_sendbuf(SOCKET s)
+{
+  // Set the socket's send buffer size to zero.
+  int sz = 0;
+  int status = setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&sz, sizeof(int));
+  assert(status == 0);
+}
+
+void pni_iocpdesc_start(iocpdesc_t *iocpd)
+{
+  if (iocpd->bound) return;
+  bind_to_completion_port(iocpd);
+  if (is_listener(iocpd)) {
+    begin_accept(iocpd->acceptor, NULL);
+  }
+  else {
+    release_sys_sendbuf(iocpd->socket);
+    pni_events_update(iocpd, PN_WRITABLE);
+    start_reading(iocpd);
+  }
+}
+
+static void complete(iocp_result_t *result, bool success, DWORD num_transferred) {
+  result->iocpd->ops_in_progress--;
+  DWORD status = success ? 0 : GetLastError();
+
+  switch (result->type) {
+  case IOCP_ACCEPT:
+    complete_accept((accept_result_t *) result, status);
+    break;
+  case IOCP_CONNECT:
+    complete_connect((connect_result_t *) result, status);
+    break;
+  case IOCP_WRITE:
+    complete_write((write_result_t *) result, num_transferred, status);
+    break;
+  case IOCP_READ:
+    complete_read((read_result_t *) result, num_transferred, status);
+    break;
+  default:
+    assert(false);
+  }
+}
+
+void pni_iocp_drain_completions(iocp_t *iocp)
+{
+  while (true) {
+    DWORD timeout_ms = 0;
+    DWORD num_transferred = 0;
+    ULONG_PTR completion_key = 0;
+    OVERLAPPED *overlapped = 0;
+
+    bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred,
+                                               &completion_key, &overlapped, timeout_ms);
+    if (!overlapped)
+      return;  // timed out
+    iocp_result_t *result = (iocp_result_t *) overlapped;
+    complete(result, good_op, num_transferred);
+  }
+}
+
+// returns: -1 on error, 0 on timeout, 1 successful completion
+int pni_iocp_wait_one(iocp_t *iocp, int timeout, pn_error_t *error) {
+  DWORD win_timeout = (timeout < 0) ? INFINITE : (DWORD) timeout;
+  DWORD num_transferred = 0;
+  ULONG_PTR completion_key = 0;
+  OVERLAPPED *overlapped = 0;
+
+  bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred,
+                                            &completion_key, &overlapped, win_timeout);
+  if (!overlapped)
+    if (GetLastError() == WAIT_TIMEOUT)
+      return 0;
+    else {
+      if (error)
+        pni_win32_error(error, "GetQueuedCompletionStatus", GetLastError());
+      return -1;
+    }
+
+  iocp_result_t *result = (iocp_result_t *) overlapped;
+  complete(result, good_op, num_transferred);
+  return 1;
+}
+
+// === Close (graceful and otherwise)
+
+// zombie_list is for sockets transitioning out of iocp on their way to zero ops_in_progress
+// and fully closed.
+
+static void zombie_list_add(iocpdesc_t *iocpd)
+{
+  assert(iocpd->closing);
+  if (!iocpd->ops_in_progress) {
+    // No need to make a zombie.
+    if (iocpd->socket != INVALID_SOCKET) {
+      closesocket(iocpd->socket);
+      iocpd->socket = INVALID_SOCKET;
+      iocpd->read_closed = true;
+    }
+    return;
+  }
+  // Allow 2 seconds for graceful shutdown before releasing socket resource.
+  iocpd->reap_time = pn_i_now() + 2000;
+  pn_list_add(iocpd->iocp->zombie_list, iocpd);
+}
+
+static void reap_check(iocpdesc_t *iocpd)
+{
+  if (iocpd->closing && !iocpd->ops_in_progress) {
+    if (iocpd->socket != INVALID_SOCKET) {
+      closesocket(iocpd->socket);
+      iocpd->socket = INVALID_SOCKET;
+    }
+    pn_list_remove(iocpd->iocp->zombie_list, iocpd);
+    // iocpd is decref'ed and possibly released
+  }
+}
+
+pn_timestamp_t pni_zombie_deadline(iocp_t *iocp)
+{
+  if (pn_list_size(iocp->zombie_list)) {
+    iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, 0);
+    return iocpd->reap_time;
+  }
+  return 0;
+}
+
+void pni_zombie_check(iocp_t *iocp, pn_timestamp_t now)
+{
+  pn_list_t *zl = iocp->zombie_list;
+  // Look for stale zombies that should have been reaped by "now"
+  for (size_t idx = 0; idx < pn_list_size(zl); idx++) {
+    iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(zl, idx);
+    if (iocpd->reap_time > now)
+      return;
+    if (iocpd->socket == INVALID_SOCKET)
+      continue;
+    assert(iocpd->ops_in_progress > 0);
+    if (iocp->iocp_trace)
+      iocp_log("async close: graceful close timeout exceeded\n");
+    closesocket(iocpd->socket);
+    iocpd->socket = INVALID_SOCKET;
+    iocpd->read_closed = true;
+    // outstanding ops should complete immediately now
+  }
+}
+
+static void drain_zombie_completions(iocp_t *iocp)
+{
+  // No more pn_selector_select() from App, but zombies still need care and feeding
+  // until their outstanding async actions complete.
+  pni_iocp_drain_completions(iocp);
+
+  // Discard any that have no pending async IO
+  size_t sz = pn_list_size(iocp->zombie_list);
+  for (size_t idx = 0; idx < sz;) {
+    iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, idx);
+    if (!iocpd->ops_in_progress) {
+      pn_list_del(iocp->zombie_list, idx, 1);
+      sz--;
+    } else {
+      idx++;
+    }
+  }
+
+  pn_timestamp_t now = pn_i_now();
+  pn_timestamp_t deadline = now + 2000;
+
+  while (pn_list_size(iocp->zombie_list)) {
+    if (now >= deadline)
+      break;
+    int rv = pni_iocp_wait_one(iocp, deadline - now, NULL);
+    if (rv < 0) {
+      iocp_log("unexpected IOCP failure on Proton IO shutdown %d\n", GetLastError());
+      break;
+    }
+    now = pn_i_now();
+  }
+  if (now >= deadline && pn_list_size(iocp->zombie_list))
+    // Should only happen if really slow TCP handshakes, i.e. total network failure
+    iocp_log("network failure on Proton shutdown\n");
+}
+
+static pn_list_t *iocp_map_close_all(iocp_t *iocp)
+{
+  // Zombify stragglers, i.e. no pn_close() from the application.
+  pn_list_t *externals = pn_list(0, PN_REFCOUNT);
+  for (pn_handle_t entry = pn_hash_head(iocp->iocpdesc_map); entry;
+       entry = pn_hash_next(iocp->iocpdesc_map, entry)) {
+    iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_value(iocp->iocpdesc_map, entry);
+    // Just listeners first.
+    if (is_listener(iocpd)) {
+      if (iocpd->external) {
+        // Owned by application, just keep a temporary reference to it.
+        // iocp_result_t structs must not be free'd until completed or
+        // the completion port is closed.
+        if (iocpd->ops_in_progress)
+          pn_list_add(externals, iocpd);
+        pni_iocpdesc_map_del(iocp, iocpd->socket);
+      } else {
+        // Make it a zombie.
+        pni_iocp_begin_close(iocpd);
+      }
+    }
+  }
+  pni_iocp_drain_completions(iocp);
+
+  for (pn_handle_t entry = pn_hash_head(iocp->iocpdesc_map); entry;
+       entry = pn_hash_next(iocp->iocpdesc_map, entry)) {
+    iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_value(iocp->iocpdesc_map, entry);
+    if (iocpd->external) {
+      iocpd->read_closed = true;   // Do not consume from read side
+      iocpd->write_closed = true;  // Do not shutdown write side
+      if (iocpd->ops_in_progress)
+        pn_list_add(externals, iocpd);
+      pni_iocpdesc_map_del(iocp, iocpd->socket);
+    } else {
+      // Make it a zombie.
+      pni_iocp_begin_close(iocpd);
+    }
+  }
+  return externals;
+}
+
+static void zombie_list_hard_close_all(iocp_t *iocp)
+{
+  pni_iocp_drain_completions(iocp);
+  size_t zs = pn_list_size(iocp->zombie_list);
+  for (size_t i = 0; i < zs; i++) {
+    iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i);
+    if (iocpd->socket != INVALID_SOCKET) {
+      closesocket(iocpd->socket);
+      iocpd->socket = INVALID_SOCKET;
+      iocpd->read_closed = true;
+      iocpd->write_closed = true;
+    }
+  }
+  pni_iocp_drain_completions(iocp);
+
+  // Zombies should be all gone.  Do a sanity check.
+  zs = pn_list_size(iocp->zombie_list);
+  int remaining = 0;
+  int ops = 0;
+  for (size_t i = 0; i < zs; i++) {
+    iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i);
+    remaining++;
+    ops += iocpd->ops_in_progress;
+  }
+  if (remaining)
+    iocp_log("Proton: %d unfinished close operations (ops count = %d)\n", remaining, ops);
+}
+
+static void iocp_shutdown(iocpdesc_t *iocpd)
+{
+  if (shutdown(iocpd->socket, SD_SEND)) {
+    if (iocpd->iocp->iocp_trace)
+      iocp_log("socket shutdown failed %d\n", WSAGetLastError());
+  }
+  iocpd->write_closed = true;
+  if (iocpd->read_closed) {
+    closesocket(iocpd->socket);
+    iocpd->socket = INVALID_SOCKET;
+  }
+}
+
+void pni_iocp_begin_close(iocpdesc_t *iocpd)
+{
+  assert (!iocpd->closing);
+  if (is_listener(iocpd)) {
+    // Listening socket is easy.  Close the socket which will cancel async ops.
+    pn_socket_t old_sock = iocpd->socket;
+    iocpd->socket = INVALID_SOCKET;
+    iocpd->closing = true;
+    iocpd->read_closed = true;
+    iocpd->write_closed = true;
+    closesocket(old_sock);
+    // Pending accepts will now complete.  Zombie can die when all consumed.
+    zombie_list_add(iocpd);
+    pni_iocpdesc_map_del(iocpd->iocp, old_sock);  // may pn_free *iocpd
+  } else {
+    // Continue async operation looking for graceful close confirmation or timeout.
+    pn_socket_t old_sock = iocpd->socket;
+    iocpd->closing = true;
+    if (!iocpd->write_closed && !write_in_progress(iocpd))
+      iocp_shutdown(iocpd);
+    zombie_list_add(iocpd);
+    pni_iocpdesc_map_del(iocpd->iocp, old_sock);  // may pn_free *iocpd
+  }
+}
+
+
+// === iocp_t
+
+#define pni_iocp_hashcode NULL
+#define pni_iocp_compare NULL
+#define pni_iocp_inspect NULL
+
+void pni_iocp_initialize(void *obj)
+{
+  iocp_t *iocp = (iocp_t *) obj;
+  memset(iocp, 0, sizeof(iocp_t));
+  pni_shared_pool_create(iocp);
+  iocp->completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+  assert(iocp->completion_port != NULL);
+  iocp->iocpdesc_map = pn_hash(0, 0.75, PN_REFCOUNT);
+  iocp->zombie_list = pn_list(0, PN_REFCOUNT);
+  iocp->iocp_trace = pn_env_bool("PN_TRACE_DRV");
+  iocp->selector = NULL;
+}
+
+void pni_iocp_finalize(void *obj)
+{
+  iocp_t *iocp = (iocp_t *) obj;
+  // Move sockets to closed state, except external sockets.
+  pn_list_t *externals = iocp_map_close_all(iocp);
+  // Now everything with ops_in_progress is in the zombie_list or the externals list.
+  assert(!pn_hash_head(iocp->iocpdesc_map));
+  pn_free(iocp->iocpdesc_map);
+
+  drain_zombie_completions(iocp);    // Last chance for graceful close
+  zombie_list_hard_close_all(iocp);
+  CloseHandle(iocp->completion_port);  // This cancels all our async ops
+  iocp->completion_port = NULL;
+
+  if (pn_list_size(externals) && iocp->iocp_trace)
+    iocp_log("%d external sockets not closed and removed from Proton IOCP control\n", pn_list_size(externals));
+
+  // Now safe to free everything that might be touched by a former async operation.
+  pn_free(externals);
+  pn_free(iocp->zombie_list);
+  pni_shared_pool_free(iocp);
+}
+
+iocp_t *pni_iocp()
+{
+  static const pn_class_t clazz = PN_CLASS(pni_iocp);
+  iocp_t *iocp = (iocp_t *) pn_new(sizeof(iocp_t), &clazz);
+  return iocp;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/windows/iocp.h
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/iocp.h b/proton-c/src/windows/iocp.h
new file mode 100644
index 0000000..bc64dd0
--- /dev/null
+++ b/proton-c/src/windows/iocp.h
@@ -0,0 +1,141 @@
+#ifndef PROTON_SRC_IOCP_H
+#define PROTON_SRC_IOCP_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/import_export.h>
+#include <proton/selectable.h>
+#include <proton/type_compat.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct pni_acceptor_t pni_acceptor_t;
+typedef struct write_result_t write_result_t;
+typedef struct read_result_t read_result_t;
+typedef struct write_pipeline_t write_pipeline_t;
+typedef struct iocpdesc_t iocpdesc_t;
+
+
+// One per pn_io_t.
+
+struct iocp_t {
+  HANDLE completion_port;
+  pn_hash_t *iocpdesc_map;
+  pn_list_t *zombie_list;
+  int shared_pool_size;
+  char *shared_pool_memory;
+  write_result_t **shared_results;
+  write_result_t **available_results;
+  int shared_available_count;
+  size_t writer_count;
+  int loopback_bufsize;
+  bool iocp_trace;
+  pn_selector_t *selector;
+};
+
+
+// One for each socket.
+// This iocpdesc_t structure is ref counted by the iocpdesc_map, zombie_list,
+// selector->iocp_descriptors list.  It should remain ref counted in the
+// zombie_list until ops_in_progress == 0 or the completion port is closed.
+
+struct iocpdesc_t {
+  pn_socket_t socket;
+  iocp_t *iocp;
+  pni_acceptor_t *acceptor;
+  pn_error_t *error;
+  int ops_in_progress;
+  bool read_in_progress;
+  write_pipeline_t *pipeline;
+  read_result_t *read_result;
+  bool external;       // true if socket set up outside Proton
+  bool bound;          // associted with the completion port
+  bool closing;        // pn_close called
+  bool read_closed;    // EOF or read error
+  bool write_closed;   // shutdown sent or write error
+  pn_selector_t *selector;
+  pn_selectable_t *selectable;
+  int events;
+  int interests;
+  pn_timestamp_t deadline;
+  iocpdesc_t *triggered_list_next;
+  iocpdesc_t *triggered_list_prev;
+  iocpdesc_t *deadlines_next;
+  iocpdesc_t *deadlines_prev;
+  pn_timestamp_t reap_time;;
+};
+
+typedef enum { IOCP_ACCEPT, IOCP_CONNECT, IOCP_READ, IOCP_WRITE } iocp_type_t;
+
+typedef struct {
+  OVERLAPPED overlapped;
+  iocp_type_t type;
+  iocpdesc_t *iocpd;
+  HRESULT status;
+} iocp_result_t;
+
+struct write_result_t {
+  iocp_result_t base;
+  size_t requested;
+  bool in_use;
+  pn_bytes_t buffer;
+};
+
+iocpdesc_t *pni_iocpdesc_create(iocp_t *, pn_socket_t s, bool external);
+iocpdesc_t *pni_iocpdesc_map_get(iocp_t *, pn_socket_t s);
+void pni_iocpdesc_map_del(iocp_t *, pn_socket_t s);
+void pni_iocpdesc_map_push(iocpdesc_t *iocpd);
+void pni_iocpdesc_start(iocpdesc_t *iocpd);
+void pni_iocp_drain_completions(iocp_t *);
+int pni_iocp_wait_one(iocp_t *, int timeout, pn_error_t *);
+void pni_iocp_start_accepting(iocpdesc_t *iocpd);
+pn_socket_t pni_iocp_end_accept(iocpdesc_t *ld, sockaddr *addr, socklen_t *addrlen, bool *would_block, pn_error_t *error);
+pn_socket_t pni_iocp_begin_connect(iocp_t *, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error);
+ssize_t pni_iocp_begin_write(iocpdesc_t *, const void *, size_t, bool *, pn_error_t *);
+ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error);
+void pni_iocp_begin_close(iocpdesc_t *iocpd);
+iocp_t *pni_iocp();
+
+void pni_events_update(iocpdesc_t *iocpd, int events);
+write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen);
+write_pipeline_t *pni_write_pipeline(iocpdesc_t *iocpd);
+size_t pni_write_pipeline_size(write_pipeline_t *);
+bool pni_write_pipeline_writable(write_pipeline_t *);
+void pni_write_pipeline_return(write_pipeline_t *, write_result_t *);
+size_t pni_write_pipeline_reserve(write_pipeline_t *, size_t);
+write_result_t *pni_write_pipeline_next(write_pipeline_t *);
+void pni_shared_pool_create(iocp_t *);
+void pni_shared_pool_free(iocp_t *);
+void pni_zombie_check(iocp_t *, pn_timestamp_t);
+pn_timestamp_t pni_zombie_deadline(iocp_t *);
+
+pn_selector_t *pni_selector_create(iocp_t *iocp);
+
+int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* iocp.h */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/windows/selector.c
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/selector.c b/proton-c/src/windows/selector.c
index 2d1b855..b01c27a 100644
--- a/proton-c/src/windows/selector.c
+++ b/proton-c/src/windows/selector.c
@@ -19,21 +19,6 @@
  *
  */
 
-/*
- * Copy of posix poll-based selector with minimal changes to use
- * select().  TODO: fully native implementaton with I/O completion
- * ports.
- *
- * This implementation comments out the posix max_fds arg to select
- * which has no meaning on windows.  The number of fd_set slots are
- * configured at compile time via FD_SETSIZE, chosen "large enough"
- * for the limited scalability of select() at the expense of
- * 3*N*sizeof(unsigned int) bytes per driver instance.  select (and
- * associated macros like FD_ZERO) are otherwise unaffected
- * performance-wise by increasing FD_SETSIZE.
- */
-
-#define FD_SETSIZE 2048
 #ifndef _WIN32_WINNT
 #define _WIN32_WINNT 0x0501
 #endif
@@ -44,37 +29,53 @@
 #include <Ws2tcpip.h>
 #define PN_WINAPI
 
-#include "../platform.h"
+#include "platform.h"
+#include <proton/object.h>
 #include <proton/io.h>
 #include <proton/selector.h>
 #include <proton/error.h>
 #include <assert.h>
-#include "../selectable.h"
-#include "../util.h"
+#include "selectable.h"
+#include "util.h"
+#include "iocp.h"
+
+static void interests_update(iocpdesc_t *iocpd, int interests);
+static void deadlines_update(iocpdesc_t *iocpd, pn_timestamp_t t);
 
 struct pn_selector_t {
-  fd_set readfds;
-  fd_set writefds;
-  fd_set exceptfds;
+  iocp_t *iocp;
   pn_timestamp_t *deadlines;
   size_t capacity;
   pn_list_t *selectables;
+  pn_list_t *iocp_descriptors;
   pn_timestamp_t deadline;
   size_t current;
+  iocpdesc_t *current_triggered;
   pn_timestamp_t awoken;
   pn_error_t *error;
+  iocpdesc_t *triggered_list_head;
+  iocpdesc_t *triggered_list_tail;
+  iocpdesc_t *deadlines_head;
+  iocpdesc_t *deadlines_tail;
 };
 
 void pn_selector_initialize(void *obj)
 {
   pn_selector_t *selector = (pn_selector_t *) obj;
+  selector->iocp = NULL;
   selector->deadlines = NULL;
   selector->capacity = 0;
   selector->selectables = pn_list(0, 0);
+  selector->iocp_descriptors = pn_list(0, PN_REFCOUNT);
   selector->deadline = 0;
   selector->current = 0;
+  selector->current_triggered = NULL;
   selector->awoken = 0;
   selector->error = pn_error();
+  selector->triggered_list_head = NULL;
+  selector->triggered_list_tail = NULL;
+  selector->deadlines_head = NULL;
+  selector->deadlines_tail = NULL;
 }
 
 void pn_selector_finalize(void *obj)
@@ -82,28 +83,51 @@ void pn_selector_finalize(void *obj)
   pn_selector_t *selector = (pn_selector_t *) obj;
   free(selector->deadlines);
   pn_free(selector->selectables);
+  pn_free(selector->iocp_descriptors);
   pn_error_free(selector->error);
+  selector->iocp->selector = NULL;
 }
 
 #define pn_selector_hashcode NULL
 #define pn_selector_compare NULL
 #define pn_selector_inspect NULL
 
-pn_selector_t *pn_selector(void)
+pn_selector_t *pni_selector()
 {
-  static pn_class_t clazz = PN_CLASS(pn_selector);
+  static const pn_class_t clazz = PN_CLASS(pn_selector);
   pn_selector_t *selector = (pn_selector_t *) pn_new(sizeof(pn_selector_t), &clazz);
   return selector;
 }
 
+pn_selector_t *pni_selector_create(iocp_t *iocp)
+{
+  pn_selector_t *selector = pni_selector();
+  selector->iocp = iocp;
+  return selector;
+}
+
 void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable)
 {
   assert(selector);
   assert(selectable);
   assert(pni_selectable_get_index(selectable) < 0);
+  pn_socket_t sock = pn_selectable_fd(selectable);
+
+  iocpdesc_t *iocpd = NULL;
+  if (sock != INVALID_SOCKET) {
+    iocpd = pni_iocpdesc_map_get(selector->iocp, sock);
+    if (!iocpd) {
+      // Socket created outside proton.  Hook it up to iocp.
+      iocpd = pni_iocpdesc_create(selector->iocp, sock, true);
+      pni_iocpdesc_start(iocpd);
+    } else {
+      assert(iocpd->iocp == selector->iocp);
+    }
+  }
 
   if (pni_selectable_get_index(selectable) < 0) {
     pn_list_add(selector->selectables, selectable);
+    pn_list_add(selector->iocp_descriptors, iocpd);
     size_t size = pn_list_size(selector->selectables);
 
     if (selector->capacity < size) {
@@ -112,6 +136,10 @@ void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable)
     }
 
     pni_selectable_set_index(selectable, size - 1);
+    if (iocpd) {
+      iocpd->selector = selector;
+      iocpd->selectable = selectable;
+    }
   }
 
   pn_selector_update(selector, selectable);
@@ -121,18 +149,22 @@ void pn_selector_update(pn_selector_t *selector, pn_selectable_t *selectable)
 {
   int idx = pni_selectable_get_index(selectable);
   assert(idx >= 0);
- /*
-  selector->fds[idx].fd = pn_selectable_fd(selectable);
-  selector->fds[idx].events = 0;
-  selector->fds[idx].revents = 0;
-  if (pn_selectable_capacity(selectable) > 0) {
-    selector->fds[idx].events |= POLLIN;
-  }
-  if (pn_selectable_pending(selectable) > 0) {
-    selector->fds[idx].events |= POLLOUT;
-  }
- */
   selector->deadlines[idx] = pn_selectable_deadline(selectable);
+
+  pn_socket_t sock = pn_selectable_fd(selectable);
+  iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(selector->iocp_descriptors, idx);
+  if (iocpd) {
+    assert(sock == iocpd->socket || iocpd->closing);
+    int interests = 0;
+    if (pn_selectable_capacity(selectable) > 0) {
+      interests |= PN_READABLE;
+    }
+    if (pn_selectable_pending(selectable) > 0) {
+      interests |= PN_WRITABLE;
+    }
+    interests_update(iocpd, interests);
+    deadlines_update(iocpd, selector->deadlines[idx]);
+  }
 }
 
 void pn_selector_remove(pn_selector_t *selector, pn_selectable_t *selectable)
@@ -142,107 +174,94 @@ void pn_selector_remove(pn_selector_t *selector, pn_selectable_t *selectable)
 
   int idx = pni_selectable_get_index(selectable);
   assert(idx >= 0);
+  iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(selector->iocp_descriptors, idx);
+  if (iocpd) {
+    if (selector->current_triggered == iocpd)
+      selector->current_triggered = iocpd->triggered_list_next;
+    interests_update(iocpd, 0);
+    deadlines_update(iocpd, 0);
+    assert(selector->triggered_list_head != iocpd && !iocpd->triggered_list_prev);
+    assert(selector->deadlines_head != iocpd && !iocpd->deadlines_prev);
+    iocpd->selector = NULL;
+    iocpd->selectable = NULL;
+  }
   pn_list_del(selector->selectables, idx, 1);
+  pn_list_del(selector->iocp_descriptors, idx, 1);
   size_t size = pn_list_size(selector->selectables);
   for (size_t i = idx; i < size; i++) {
     pn_selectable_t *sel = (pn_selectable_t *) pn_list_get(selector->selectables, i);
     pni_selectable_set_index(sel, i);
   }
-
   pni_selectable_set_index(selectable, -1);
 }
 
 int pn_selector_select(pn_selector_t *selector, int timeout)
 {
   assert(selector);
-
-  FD_ZERO(&selector->readfds);
-  FD_ZERO(&selector->writefds);
-  FD_ZERO(&selector->exceptfds);
-
-  size_t size = pn_list_size(selector->selectables);
-  if (size > FD_SETSIZE) {
-    // This Windows limitation will go away when switching to completion ports
-    pn_error_set(selector->error, PN_ERR, "maximum sockets exceeded for Windows selector");
-    return PN_ERR;
-  }
+  pn_error_clear(selector->error);
+  pn_timestamp_t deadline = 0;
+  pn_timestamp_t now = pn_i_now();
 
   if (timeout) {
-    pn_timestamp_t deadline = 0;
-    for (size_t i = 0; i < size; i++) {
-      pn_timestamp_t d = selector->deadlines[i];
-      if (d)
-        deadline = (deadline == 0) ? d : pn_min(deadline, d);
-    }
-
-    if (deadline) {
-      pn_timestamp_t now = pn_i_now();
-      int delta = selector->deadline - now;
-      if (delta < 0) {
-        timeout = 0;
-      } else if (delta < timeout) {
-        timeout = delta;
-      }
-    }
+    if (selector->deadlines_head)
+      deadline = selector->deadlines_head->deadline;
   }
+  if (deadline) {
+    int delta = deadline - now;
+    if (delta < 0) {
+      delta = 0;
+    } 
+    if (timeout < 0)
+      timeout = delta;
+    else if (timeout > delta)
+      timeout = delta;
+  }	
+  deadline = (timeout >= 0) ? now + timeout : 0;
 
-  struct timeval to = {0};
-  struct timeval *to_arg = &to;
-  // block only if (timeout == 0) and (closed_count == 0)
-  if (timeout > 0) {
-    // convert millisecs to sec and usec:
-    to.tv_sec = timeout/1000;
-    to.tv_usec = (timeout - (to.tv_sec * 1000)) * 1000;
-  }
-  else if (timeout < 0) {
-    to_arg = NULL;
-  }
+  // Process all currently available completions, even if matched events available
+  pni_iocp_drain_completions(selector->iocp);
+  pni_zombie_check(selector->iocp, now);
+  // Loop until an interested event is matched, or until deadline
+  while (true) {
+    if (selector->triggered_list_head)
+      break;
+    if (deadline && deadline <= now)
+      break;
+    pn_timestamp_t completion_deadline = deadline;
+    pn_timestamp_t zd = pni_zombie_deadline(selector->iocp);
+    if (zd)
+      completion_deadline = completion_deadline ? pn_min(zd, completion_deadline) : zd;
 
-  for (size_t i = 0; i < size; i++) {
-    pn_selectable_t *sel = (pn_selectable_t *) pn_list_get(selector->selectables, i);
-    pn_socket_t fd = pn_selectable_fd(sel);
-    if (pn_selectable_capacity(sel) > 0) {
-      FD_SET(fd, &selector->readfds);
-    }
-    if (pn_selectable_pending(sel) > 0) {
-      FD_SET(fd, &selector->writefds);
+    int completion_timeout = (!completion_deadline) ? -1 : completion_deadline - now;
+    int rv = pni_iocp_wait_one(selector->iocp, completion_timeout, selector->error);
+    if (rv < 0)
+      return pn_error_code(selector->error);
+
+    now = pn_i_now();
+    if (zd && zd <= now) {
+      pni_zombie_check(selector->iocp, now);
     }
   }
 
-  int result = select(0 /* ignored in win32 */, &selector->readfds, &selector->writefds, &selector->exceptfds, to_arg);
-  if (result == -1) {
-    pn_i_error_from_errno(selector->error, "select");
-  } else {
-    selector->current = 0;
-    selector->awoken = pn_i_now();
+  selector->current = 0;
+  selector->awoken = now;
+  selector->current_triggered = selector->triggered_list_head;
+  for (iocpdesc_t *iocpd = selector->deadlines_head; iocpd; iocpd = iocpd->deadlines_next) {
+    if (iocpd->deadline <= now)
+      pni_events_update(iocpd, iocpd->events | PN_EXPIRED);
+    else
+      break;
   }
-
   return pn_error_code(selector->error);
 }
 
 pn_selectable_t *pn_selector_next(pn_selector_t *selector, int *events)
 {
-  pn_list_t *l = selector->selectables;
-  size_t size = pn_list_size(l);
-  while (selector->current < size) {
-    pn_selectable_t *sel = (pn_selectable_t *) pn_list_get(l, selector->current);
-    pn_timestamp_t deadline = selector->deadlines[selector->current];
-    int ev = 0;
-    pn_socket_t fd = pn_selectable_fd(sel);
-    if (FD_ISSET(fd, &selector->readfds)) {
-      ev |= PN_READABLE;
-    }
-    if (FD_ISSET(fd, &selector->writefds)) {
-      ev |= PN_WRITABLE;
-    }
-    if (deadline && selector->awoken >= deadline) {
-      ev |= PN_EXPIRED;
-    }
-    selector->current++;
-    if (ev) {
-      *events = ev;
-      return sel;
-    }
+  if (selector->current_triggered) {
+    iocpdesc_t *iocpd = selector->current_triggered;
+    *events = iocpd->interests & iocpd->events;
+    selector->current_triggered = iocpd->triggered_list_next;
+    return iocpd->selectable;
   }
   return NULL;
 }
@@ -252,3 +271,91 @@ void pn_selector_free(pn_selector_t *selector)
   assert(selector);
   pn_free(selector);
 }
+
+
+static void triggered_list_add(pn_selector_t *selector, iocpdesc_t *iocpd)
+{
+  if (iocpd->triggered_list_prev || selector->triggered_list_head == iocpd)
+    return; // already in list
+  LL_ADD(selector, triggered_list, iocpd);
+}
+
+static void triggered_list_remove(pn_selector_t *selector, iocpdesc_t *iocpd)
+{
+  if (!iocpd->triggered_list_prev && selector->triggered_list_head != iocpd)
+    return; // not in list
+  LL_REMOVE(selector, triggered_list, iocpd);
+  iocpd->triggered_list_prev = NULL;
+  iocpd->triggered_list_next = NULL;
+}
+
+
+void pni_events_update(iocpdesc_t *iocpd, int events)
+{
+  int old_events = iocpd->events;
+  if (old_events == events)
+    return;
+  iocpd->events = events;
+  if (iocpd->selector) {
+    if (iocpd->events & iocpd->interests)
+      triggered_list_add(iocpd->selector, iocpd);
+    else
+      triggered_list_remove(iocpd->selector, iocpd);
+  }
+}
+
+static void interests_update(iocpdesc_t *iocpd, int interests)
+{
+  int old_interests = iocpd->interests;
+  if (old_interests == interests)
+    return;
+  iocpd->interests = interests;
+  if (iocpd->selector) {
+    if (iocpd->events & iocpd->interests)
+      triggered_list_add(iocpd->selector, iocpd);
+    else
+      triggered_list_remove(iocpd->selector, iocpd);
+  }
+}
+
+static void deadlines_remove(pn_selector_t *selector, iocpdesc_t *iocpd)
+{
+  if (!iocpd->deadlines_prev && selector->deadlines_head != iocpd)
+    return; // not in list
+  LL_REMOVE(selector, deadlines, iocpd);
+  iocpd->deadlines_prev = NULL;
+  iocpd->deadlines_next = NULL;
+}
+
+
+static void deadlines_update(iocpdesc_t *iocpd, pn_timestamp_t deadline)
+{
+  if (deadline == iocpd->deadline)
+    return;
+  iocpd->deadline = deadline;
+  pn_selector_t *selector = iocpd->selector;
+  if (!deadline) {
+    deadlines_remove(selector, iocpd);
+    pni_events_update(iocpd, iocpd->events & ~PN_EXPIRED);
+    interests_update(iocpd, iocpd->interests & ~PN_EXPIRED);
+  } else {
+    if (iocpd->deadlines_prev || selector->deadlines_head == iocpd) {
+      deadlines_remove(selector, iocpd);
+      pni_events_update(iocpd, iocpd->events & ~PN_EXPIRED);
+    }
+    interests_update(iocpd, iocpd->interests | PN_EXPIRED);
+    iocpdesc_t *dl_iocpd = LL_HEAD(selector, deadlines);
+    while (dl_iocpd && dl_iocpd->deadline <= deadline)
+      dl_iocpd = dl_iocpd->deadlines_next;
+    if (dl_iocpd) {
+      // insert
+      iocpd->deadlines_prev = dl_iocpd->deadlines_prev;
+      iocpd->deadlines_next = dl_iocpd;
+      dl_iocpd->deadlines_prev = iocpd;
+      if (selector->deadlines_head == dl_iocpd)
+        selector->deadlines_head = iocpd;
+    } else {
+      LL_ADD(selector, deadlines, iocpd);  // append
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/windows/write_pipeline.c
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/write_pipeline.c b/proton-c/src/windows/write_pipeline.c
new file mode 100644
index 0000000..3160fa8
--- /dev/null
+++ b/proton-c/src/windows/write_pipeline.c
@@ -0,0 +1,312 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * A simple write buffer pool.  Each socket has a dedicated "primary"
+ * buffer and can borrow from a shared pool with limited size tuning.
+ * Could enhance e.g. with separate pools per network interface and fancier
+ * memory tuning based on interface speed, system resources, and
+ * number of connections, etc.
+ */
+
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+#if _WIN32_WINNT < 0x0501
+#error "Proton requires Windows API support for XP or later."
+#endif
+#include <winsock2.h>
+#include <Ws2tcpip.h>
+#define PN_WINAPI
+
+#include "platform.h"
+#include <proton/object.h>
+#include <proton/io.h>
+#include <proton/selector.h>
+#include <proton/error.h>
+#include <assert.h>
+#include "selectable.h"
+#include "util.h"
+#include "iocp.h"
+
+// Max overlapped writes per socket
+#define IOCP_MAX_OWRITES 16
+// Write buffer size
+#define IOCP_WBUFSIZE 16384
+
+static void pipeline_log(const char *fmt, ...)
+{
+    va_list ap;
+    va_start(ap, fmt);
+    vfprintf(stderr, fmt, ap);
+    va_end(ap);
+    fflush(stderr);
+}
+
+void pni_shared_pool_create(iocp_t *iocp)
+{
+  // TODO: more pools (or larger one) when using multiple non-loopback interfaces
+  iocp->shared_pool_size = 16;
+  char *env = getenv("PNI_WRITE_BUFFERS"); // Internal: for debugging
+  if (env) {
+    int sz = atoi(env);
+    if (sz >= 0 && sz < 256) {
+      iocp->shared_pool_size = sz;
+    }
+  }
+  iocp->loopback_bufsize = 0;
+  env = getenv("PNI_LB_BUFSIZE"); // Internal: for debugging
+  if (env) {
+    int sz = atoi(env);
+    if (sz >= 0 && sz <= 128 * 1024) {
+      iocp->loopback_bufsize = sz;
+    }
+  }
+
+  if (iocp->shared_pool_size) {
+    iocp->shared_pool_memory = (char *) VirtualAlloc(NULL, IOCP_WBUFSIZE * iocp->shared_pool_size, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
+    HRESULT status = GetLastError();
+    if (!iocp->shared_pool_memory) {
+      perror("Proton write buffer pool allocation failure\n");
+      iocp->shared_pool_size = 0;
+      iocp->shared_available_count = 0;
+      return;
+    }
+
+    iocp->shared_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *));
+    iocp->available_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *));
+    iocp->shared_available_count = iocp->shared_pool_size;
+    char *mem = iocp->shared_pool_memory;
+    for (int i = 0; i < iocp->shared_pool_size; i++) {
+      iocp->shared_results[i] = iocp->available_results[i] = pni_write_result(NULL, mem, IOCP_WBUFSIZE);
+      mem += IOCP_WBUFSIZE;
+    }
+  }
+}
+
+void pni_shared_pool_free(iocp_t *iocp)
+{
+  for (int i = 0; i < iocp->shared_pool_size; i++) {
+    write_result_t *result = iocp->shared_results[i];
+    if (result->in_use)
+      pipeline_log("Proton buffer pool leak\n");
+    else
+      free(result);
+  }
+  if (iocp->shared_pool_size) {
+    free(iocp->shared_results);
+    free(iocp->available_results);
+    if (iocp->shared_pool_memory) {
+      if (!VirtualFree(iocp->shared_pool_memory, 0, MEM_RELEASE)) {
+        perror("write buffers release failed");
+      }
+      iocp->shared_pool_memory = NULL;
+    }
+  }
+}
+
+static void shared_pool_push(write_result_t *result)
+{
+  iocp_t *iocp = result->base.iocpd->iocp;
+  assert(iocp->shared_available_count < iocp->shared_pool_size);
+  iocp->available_results[iocp->shared_available_count++] = result;
+}
+
+static write_result_t *shared_pool_pop(iocp_t *iocp)
+{
+  return iocp->shared_available_count ? iocp->available_results[--iocp->shared_available_count] : NULL;
+}
+
+struct write_pipeline_t {
+  iocpdesc_t *iocpd;
+  size_t pending_count;
+  write_result_t *primary;
+  size_t reserved_count;
+  size_t next_primary_index;
+  size_t depth;
+  bool is_writer;
+};
+
+#define write_pipeline_compare NULL
+#define write_pipeline_inspect NULL
+#define write_pipeline_hashcode NULL
+
+static void write_pipeline_initialize(void *object)
+{
+  write_pipeline_t *pl = (write_pipeline_t *) object;
+  pl->pending_count = 0;
+  const char *pribuf = (const char *) malloc(IOCP_WBUFSIZE);
+  pl->primary = pni_write_result(NULL, pribuf, IOCP_WBUFSIZE);
+  pl->depth = 0;
+  pl->is_writer = false;
+}
+
+static void write_pipeline_finalize(void *object)
+{
+  write_pipeline_t *pl = (write_pipeline_t *) object;
+  free((void *)pl->primary->buffer.start);
+  free(pl->primary);
+}
+
+write_pipeline_t *pni_write_pipeline(iocpdesc_t *iocpd)
+{
+  static const pn_class_t clazz = PN_CLASS(write_pipeline);
+  write_pipeline_t *pipeline = (write_pipeline_t *) pn_new(sizeof(write_pipeline_t), &clazz);
+  pipeline->iocpd = iocpd;
+  pipeline->primary->base.iocpd = iocpd;
+  return pipeline;
+}
+
+static void confirm_as_writer(write_pipeline_t *pl)
+{
+  if (!pl->is_writer) {
+    iocp_t *iocp = pl->iocpd->iocp;
+    iocp->writer_count++;
+    pl->is_writer = true;
+  }
+}
+
+static void remove_as_writer(write_pipeline_t *pl)
+{
+  if (!pl->is_writer)
+    return;
+  iocp_t *iocp = pl->iocpd->iocp;
+  assert(iocp->writer_count);
+  pl->is_writer = false;
+  iocp->writer_count--;
+}
+
+/*
+ * Optimal depth will depend on properties of the NIC, server, and driver.  For now,
+ * just distinguish between loopback interfaces and the rest.  Optimizations in the
+ * loopback stack allow decent performance with depth 1 and actually cause major
+ * performance hiccups if set to large values.
+ */
+static void set_depth(write_pipeline_t *pl)
+{
+  pl->depth = 1;
+  sockaddr_storage sa;
+  socklen_t salen = sizeof(sa);
+  char buf[INET6_ADDRSTRLEN];
+  DWORD buflen = sizeof(buf);
+
+  if (getsockname(pl->iocpd->socket,(sockaddr*) &sa, &salen) == 0 &&
+      getnameinfo((sockaddr*) &sa, salen, buf, buflen, NULL, 0, NI_NUMERICHOST) == 0) {
+    if ((sa.ss_family == AF_INET6 && strcmp(buf, "::1")) ||
+        (sa.ss_family == AF_INET && strncmp(buf, "127.", 4))) {
+      // not loopback
+      pl->depth = IOCP_MAX_OWRITES;
+    } else {
+      iocp_t *iocp = pl->iocpd->iocp;
+      if (iocp->loopback_bufsize) {
+        const char *p = (const char *) realloc((void *) pl->primary->buffer.start, iocp->loopback_bufsize);
+        if (p) {
+          pl->primary->buffer.start = p;
+          pl->primary->buffer.size = iocp->loopback_bufsize;
+        }
+      }
+    }
+  }
+}
+
+// Reserve as many buffers as possible for count bytes.
+size_t pni_write_pipeline_reserve(write_pipeline_t *pl, size_t count)
+{
+  if (pl->primary->in_use)
+    return 0;  // I.e. io->wouldblock
+  if (!pl->depth)
+    set_depth(pl);
+  if (pl->depth == 1) {
+    // always use the primary
+    pl->reserved_count = 1;
+    pl->next_primary_index = 0;
+    return 1;
+  }
+
+  iocp_t *iocp = pl->iocpd->iocp;
+  confirm_as_writer(pl);
+  int wanted = (count / IOCP_WBUFSIZE);
+  if (count % IOCP_WBUFSIZE)
+    wanted++;
+  size_t pending = pl->pending_count;
+  assert(pending < pl->depth);
+  int bufs = pn_min(wanted, pl->depth - pending);
+  // Can draw from shared pool or the primary... but share with others.
+  size_t writers = iocp->writer_count;
+  int shared_count = (iocp->shared_available_count + writers - 1) / writers;
+  bufs = pn_min(bufs, shared_count + 1);
+  pl->reserved_count = pending + bufs;
+
+  if (bufs == wanted &&
+      pl->reserved_count < (pl->depth / 2) &&
+      iocp->shared_available_count > (2 * writers + bufs)) {
+    // No shortage: keep the primary as spare for future use
+    pl->next_primary_index = pl->reserved_count;
+  } else if (bufs == 1) {
+    pl->next_primary_index = pending;
+  } else {
+    // let approx 1/3 drain before replenishing
+    pl->next_primary_index = ((pl->reserved_count + 2) / 3) - 1;
+    if (pl->next_primary_index < pending)
+      pl->next_primary_index = pending;
+  }
+  return bufs;
+}
+
+write_result_t *pni_write_pipeline_next(write_pipeline_t *pl)
+{
+  size_t sz = pl->pending_count;
+  if (sz >= pl->reserved_count)
+    return NULL;
+  write_result_t *result;
+  if (sz == pl->next_primary_index) {
+    result = pl->primary;
+  } else {
+    assert(pl->iocpd->iocp->shared_available_count > 0);
+    result = shared_pool_pop(pl->iocpd->iocp);
+  }
+
+  result->in_use = true;
+  pl->pending_count++;
+  return result;
+}
+
+void pni_write_pipeline_return(write_pipeline_t *pl, write_result_t *result)
+{
+  result->in_use = false;
+  pl->pending_count--;
+  pl->reserved_count = 0;
+  if (result != pl->primary)
+    shared_pool_push(result);
+  if (pl->pending_count == 0)
+    remove_as_writer(pl);
+}
+
+bool pni_write_pipeline_writable(write_pipeline_t *pl)
+{
+  // Only writable if not full and we can guarantee a buffer:
+  return pl->pending_count < pl->depth && !pl->primary->in_use;
+}
+
+size_t pni_write_pipeline_size(write_pipeline_t *pl)
+{
+  return pl->pending_count;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/Proton.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/Proton.java b/proton-j/src/main/java/org/apache/qpid/proton/Proton.java
index ceea5a8..39b04e5 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/Proton.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/Proton.java
@@ -30,87 +30,58 @@ import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.Codec;
 import org.apache.qpid.proton.codec.Data;
-import org.apache.qpid.proton.codec.DataFactory;
 import org.apache.qpid.proton.driver.Driver;
-import org.apache.qpid.proton.driver.DriverFactory;
+import org.apache.qpid.proton.engine.Engine;
 import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.EngineFactory;
 import org.apache.qpid.proton.engine.SslDomain;
 import org.apache.qpid.proton.engine.SslPeerDetails;
 import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.MessageFactory;
 import org.apache.qpid.proton.messenger.Messenger;
-import org.apache.qpid.proton.messenger.MessengerFactory;
-
-import org.apache.qpid.proton.engine.impl.CollectorImpl;
 
 public final class Proton
 {
 
-    public static ProtonFactory.ImplementationType ANY = ProtonFactory.ImplementationType.ANY;
-    public static ProtonFactory.ImplementationType PROTON_C = ProtonFactory.ImplementationType.PROTON_C;
-    public static ProtonFactory.ImplementationType PROTON_J = ProtonFactory.ImplementationType.PROTON_J;
-
-
-    private static final MessengerFactory MESSENGER_FACTORY =
-            (new ProtonFactoryLoader<MessengerFactory>(MessengerFactory.class)).loadFactory();
-    private static final DriverFactory DRIVER_FACTORY =
-            (new ProtonFactoryLoader<DriverFactory>(DriverFactory.class)).loadFactory();
-    private static final MessageFactory MESSAGE_FACTORY =
-            (new ProtonFactoryLoader<MessageFactory>(MessageFactory.class)).loadFactory();
-    private static final DataFactory DATA_FACTORY =
-            (new ProtonFactoryLoader<DataFactory>(DataFactory.class)).loadFactory();
-    private static final EngineFactory ENGINE_FACTORY =
-            (new ProtonFactoryLoader<EngineFactory>(EngineFactory.class)).loadFactory();
-
-    private static final ProtonFactory.ImplementationType DEFAULT_IMPLEMENTATION =
-            ProtonFactoryLoader.getImpliedImplementationType();
-
     private Proton()
     {
     }
 
-    public static ProtonFactory.ImplementationType getDefaultImplementationType()
-    {
-        return DEFAULT_IMPLEMENTATION;
-    }
-
     public static Collector collector()
     {
-        return new CollectorImpl();
+        return Engine.collector();
     }
 
     public static Connection connection()
     {
-        return ENGINE_FACTORY.createConnection();
+        return Engine.connection();
     }
 
     public static Transport transport()
     {
-        return ENGINE_FACTORY.createTransport();
+        return Engine.transport();
     }
 
     public static SslDomain sslDomain()
     {
-        return ENGINE_FACTORY.createSslDomain();
+        return Engine.sslDomain();
     }
 
     public static SslPeerDetails sslPeerDetails(String hostname, int port)
     {
-        return ENGINE_FACTORY.createSslPeerDetails(hostname, port);
+        return Engine.sslPeerDetails(hostname, port);
     }
 
     public static Data data(long capacity)
     {
-        return DATA_FACTORY.createData(capacity);
+        return Codec.data(capacity);
     }
 
     public static Message message()
     {
-        return MESSAGE_FACTORY.createMessage();
+        return Message.Factory.create();
     }
 
     public static Message message(Header header,
@@ -118,134 +89,25 @@ public final class Proton
                       Properties properties, ApplicationProperties applicationProperties,
                       Section body, Footer footer)
     {
-        return MESSAGE_FACTORY.createMessage(header, deliveryAnnotations,
-                                             messageAnnotations, properties,
-                                             applicationProperties, body, footer);
+        return Message.Factory.create(header, deliveryAnnotations,
+                                      messageAnnotations, properties,
+                                      applicationProperties, body, footer);
     }
 
 
     public static Messenger messenger()
     {
-        return MESSENGER_FACTORY.createMessenger();
+        return Messenger.Factory.create();
     }
 
     public static Messenger messenger(String name)
     {
-        return MESSENGER_FACTORY.createMessenger(name);
+        return Messenger.Factory.create(name);
     }
 
     public static Driver driver() throws IOException
     {
-        return DRIVER_FACTORY.createDriver();
+        return Driver.Factory.create();
     }
 
-
-
-    public static Connection connection(ProtonFactory.ImplementationType implementation)
-    {
-        return getEngineFactory(implementation).createConnection();
-    }
-
-    public static Transport transport(ProtonFactory.ImplementationType implementation)
-    {
-        return getEngineFactory(implementation).createTransport();
-    }
-
-    public static SslDomain sslDomain(ProtonFactory.ImplementationType implementation)
-    {
-        return getEngineFactory(implementation).createSslDomain();
-    }
-
-    public static SslPeerDetails sslPeerDetails(ProtonFactory.ImplementationType implementation, String hostname, int port)
-    {
-        return getEngineFactory(implementation).createSslPeerDetails(hostname, port);
-    }
-
-    public static Data data(ProtonFactory.ImplementationType implementation, long capacity)
-    {
-        return getDataFactory(implementation).createData(capacity);
-    }
-
-    public static Message message(ProtonFactory.ImplementationType implementation)
-    {
-        return getMessageFactory(implementation).createMessage();
-    }
-
-    public static Message message(ProtonFactory.ImplementationType implementation, Header header,
-                      DeliveryAnnotations deliveryAnnotations, MessageAnnotations messageAnnotations,
-                      Properties properties, ApplicationProperties applicationProperties,
-                      Section body, Footer footer)
-    {
-        return getMessageFactory(implementation).createMessage(header, deliveryAnnotations,
-                                                               messageAnnotations, properties,
-                                                               applicationProperties, body, footer);
-    }
-
-
-    public static Messenger messenger(ProtonFactory.ImplementationType implementation)
-    {
-        return getMessengerFactory(implementation).createMessenger();
-    }
-
-    public static Messenger messenger(ProtonFactory.ImplementationType implementation, String name)
-    {
-        return getMessengerFactory(implementation).createMessenger(name);
-    }
-
-    public static Driver driver(ProtonFactory.ImplementationType implementation) throws IOException
-    {
-        return getDriverFactory(implementation).createDriver();
-    }
-
-
-    private static final ConcurrentMap<ProtonFactory.ImplementationType, EngineFactory> _engineFactories =
-            new ConcurrentHashMap<ProtonFactory.ImplementationType, EngineFactory>();
-    private static final ConcurrentMap<ProtonFactory.ImplementationType, MessageFactory> _messageFactories =
-            new ConcurrentHashMap<ProtonFactory.ImplementationType, MessageFactory>();
-    private static final ConcurrentMap<ProtonFactory.ImplementationType, MessengerFactory> _messengerFactories =
-                new ConcurrentHashMap<ProtonFactory.ImplementationType, MessengerFactory>();
-    private static final ConcurrentMap<ProtonFactory.ImplementationType, DataFactory> _dataFactories =
-                new ConcurrentHashMap<ProtonFactory.ImplementationType, DataFactory>();
-    private static final ConcurrentMap<ProtonFactory.ImplementationType, DriverFactory> _driverFactories =
-                new ConcurrentHashMap<ProtonFactory.ImplementationType, DriverFactory>();
-
-    private static EngineFactory getEngineFactory(ProtonFactory.ImplementationType implementation)
-    {
-        return getFactory(EngineFactory.class, implementation, _engineFactories);
-    }
-
-    private static MessageFactory getMessageFactory(ProtonFactory.ImplementationType implementation)
-    {
-        return getFactory(MessageFactory.class, implementation, _messageFactories);
-    }
-
-    private static MessengerFactory getMessengerFactory(ProtonFactory.ImplementationType implementation)
-    {
-        return getFactory(MessengerFactory.class, implementation, _messengerFactories);
-    }
-
-    private static DriverFactory getDriverFactory(ProtonFactory.ImplementationType implementation)
-    {
-        return getFactory(DriverFactory.class, implementation, _driverFactories);
-    }
-
-    private static DataFactory getDataFactory(ProtonFactory.ImplementationType implementation)
-    {
-        return getFactory(DataFactory.class, implementation, _dataFactories);
-    }
-
-    private static <T extends ProtonFactory>  T getFactory(Class<T> factoryClass, ProtonFactory.ImplementationType implementation,
-                                                           ConcurrentMap<ProtonFactory.ImplementationType, T> factories)
-    {
-        T factory = factories.get(implementation);
-        if(factory == null)
-        {
-            factories.putIfAbsent(implementation, (new ProtonFactoryLoader<T>(factoryClass,implementation)).loadFactory());
-            factory = factories.get(implementation);
-
-        }
-        return factory;
-    }
-
-
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/ProtonFactory.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/ProtonFactory.java b/proton-j/src/main/java/org/apache/qpid/proton/ProtonFactory.java
deleted file mode 100644
index b855b0c..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/ProtonFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.proton;
-
-public interface ProtonFactory
-{
-    enum ImplementationType
-    {
-        PROTON_C,
-        PROTON_J,
-        ANY;
-    }
-
-    ImplementationType getImplementationType();
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message