qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [07/11] qpid-proton git commit: PROTON-1344: proactor batch events, rename connection_driver
Date Thu, 17 Nov 2016 18:18:54 GMT
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/src/io/connection_engine.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/connection_engine.cpp b/proton-c/bindings/cpp/src/io/connection_engine.cpp
deleted file mode 100644
index 5e6483f..0000000
--- a/proton-c/bindings/cpp/src/io/connection_engine.cpp
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "proton/io/connection_engine.hpp"
-
-#include "proton/event_loop.hpp"
-#include "proton/error.hpp"
-#include "proton/messaging_handler.hpp"
-#include "proton/uuid.hpp"
-
-#include "contexts.hpp"
-#include "messaging_adapter.hpp"
-#include "msg.hpp"
-#include "proton_bits.hpp"
-#include "proton_event.hpp"
-
-#include <proton/connection.h>
-#include <proton/transport.h>
-#include <proton/event.h>
-
-#include <algorithm>
-
-
-namespace proton {
-namespace io {
-
-void connection_engine::init() {
-    if (pn_connection_engine_init(&engine_, pn_connection(), pn_transport()) != 0) {
-        this->~connection_engine(); // Dtor won't be called on throw from ctor.
-        throw proton::error(std::string("connection_engine allocation failed"));
-    }
-}
-
-connection_engine::connection_engine() : handler_(0), container_(0) { init(); }
-
-connection_engine::connection_engine(class container& cont, event_loop* loop) : handler_(0), container_(&cont) {
-    init();
-    connection_context& ctx = connection_context::get(connection());
-    ctx.container = container_;
-    ctx.event_loop.reset(loop);
-}
-
-connection_engine::~connection_engine() {
-    pn_connection_engine_destroy(&engine_);
-}
-
-void connection_engine::configure(const connection_options& opts, bool server) {
-    proton::connection c(connection());
-    opts.apply_unbound(c);
-    if (server) pn_transport_set_server(engine_.transport);
-    pn_connection_engine_bind(&engine_);
-    opts.apply_bound(c);
-    handler_ =  opts.handler();
-    connection_context::get(connection()).collector = engine_.collector;
-}
-
-void connection_engine::connect(const connection_options& opts) {
-    connection_options all;
-    if (container_) {
-        all.container_id(container_->id());
-        all.update(container_->client_connection_options());
-    }
-    all.update(opts);
-    configure(all, false);
-    connection().open();
-}
-
-void connection_engine::accept(const connection_options& opts) {
-    connection_options all;
-    if (container_) {
-        all.container_id(container_->id());
-        all.update(container_->server_connection_options());
-    }
-    all.update(opts);
-    configure(all, true);
-}
-
-bool connection_engine::dispatch() {
-    pn_event_t* c_event;
-    while ((c_event = pn_connection_engine_event(&engine_)) != NULL) {
-        proton_event cpp_event(c_event, container_);
-        try {
-            if (handler_ != 0) {
-                messaging_adapter adapter(*handler_);
-                cpp_event.dispatch(adapter);
-            }
-        } catch (const std::exception& e) {
-            pn_condition_t *cond = pn_transport_condition(engine_.transport);
-            if (!pn_condition_is_set(cond)) {
-                pn_condition_format(cond, "exception", "%s", e.what());
-            }
-        }
-        pn_connection_engine_pop_event(&engine_);
-    }
-    return !pn_connection_engine_finished(&engine_);
-}
-
-mutable_buffer connection_engine::read_buffer() {
-    pn_rwbytes_t buffer = pn_connection_engine_read_buffer(&engine_);
-    return mutable_buffer(buffer.start, buffer.size);
-}
-
-void connection_engine::read_done(size_t n) {
-    return pn_connection_engine_read_done(&engine_, n);
-}
-
-void connection_engine::read_close() {
-    pn_connection_engine_read_close(&engine_);
-}
-
-const_buffer connection_engine::write_buffer() {
-    pn_bytes_t buffer = pn_connection_engine_write_buffer(&engine_);
-    return const_buffer(buffer.start, buffer.size);
-}
-
-void connection_engine::write_done(size_t n) {
-    return pn_connection_engine_write_done(&engine_, n);
-}
-
-void connection_engine::write_close() {
-    pn_connection_engine_write_close(&engine_);
-}
-
-void connection_engine::disconnected(const proton::error_condition& err) {
-    pn_condition_t* condition = pn_transport_condition(engine_.transport);
-    if (!pn_condition_is_set(condition))  {
-        set_error_condition(err, condition);
-    }
-    pn_connection_engine_close(&engine_);
-}
-
-proton::connection connection_engine::connection() const {
-    return make_wrapper(engine_.connection);
-}
-
-proton::transport connection_engine::transport() const {
-    return make_wrapper(engine_.transport);
-}
-
-proton::container* connection_engine::container() const {
-    return container_;
-}
-
-}}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/src/receiver.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/receiver.cpp b/proton-c/bindings/cpp/src/receiver.cpp
index b84722c..e5ec55a 100644
--- a/proton-c/bindings/cpp/src/receiver.cpp
+++ b/proton-c/bindings/cpp/src/receiver.cpp
@@ -74,7 +74,7 @@ void receiver::drain() {
             // Create dummy flow event where "drain finish" can be detected.
             pn_connection_t *pnc = pn_session_connection(pn_link_session(pn_object()));
             connection_context& cctx = connection_context::get(pnc);
-            // connection_engine collector is per connection.  Reactor collector is global.
+            // connection_driver collector is per connection.  Reactor collector is global.
             pn_collector_t *coll = cctx.collector;
             if (!coll)
                 coll = pn_reactor_collector(pn_object_reactor(pnc));

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/src/thread_safe_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/thread_safe_test.cpp b/proton-c/bindings/cpp/src/thread_safe_test.cpp
index f8dc3d8..5b5d487 100644
--- a/proton-c/bindings/cpp/src/thread_safe_test.cpp
+++ b/proton-c/bindings/cpp/src/thread_safe_test.cpp
@@ -24,7 +24,7 @@
 #include "proton_bits.hpp"
 
 #include "proton/thread_safe.hpp"
-#include "proton/io/connection_engine.hpp"
+#include "proton/io/connection_driver.hpp"
 
 #include <proton/connection.h>
 
@@ -37,7 +37,7 @@ void test_new() {
     pn_connection_t* c = 0;
     thread_safe<connection>* p = 0;
     {
-        io::connection_engine e;
+        io::connection_driver e;
         c = unwrap(e.connection());
         int r = pn_refcount(c);
         ASSERT(r >= 1); // engine may have internal refs (transport, collector).
@@ -54,7 +54,7 @@ void test_new() {
     {
         std::shared_ptr<thread_safe<connection> > sp;
         {
-            io::connection_engine e;
+            io::connection_driver e;
             c = unwrap(e.connection());
             sp = make_shared_thread_safe(e.connection());
         }
@@ -63,7 +63,7 @@ void test_new() {
     {
         std::unique_ptr<thread_safe<connection> > up;
         {
-            io::connection_engine e;
+            io::connection_driver e;
             c = unwrap(e.connection());
             up = make_unique_thread_safe(e.connection());
         }
@@ -78,7 +78,7 @@ void test_convert() {
     connection c;
     pn_connection_t* pc = 0;
     {
-        io::connection_engine eng;
+        io::connection_driver eng;
         c = eng.connection();
         pc = unwrap(c);         // Unwrap in separate scope to avoid confusion from temp values.
     }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/docs/api/index.md
----------------------------------------------------------------------
diff --git a/proton-c/docs/api/index.md b/proton-c/docs/api/index.md
index ccd679d..9c6009f 100644
--- a/proton-c/docs/api/index.md
+++ b/proton-c/docs/api/index.md
@@ -5,35 +5,31 @@ Proton Documentation            {#index}
 
 The [Engine API](@ref engine) is a "pure AMQP" toolkit, it decodes AMQP bytes
 into proton [events](@ref event) and generates AMQP bytes from application
-calls.
+calls. There is no IO or threading code in this part of the library.
 
-The [connection engine](@ref connection_engine) provides a simple bytes in/bytes
-out, event-driven interface so you can read AMQP data from any source, process
-the resulting [events](@ref event) and write AMQP output to any destination.
+## Proactive event-driven programming
 
-There is no IO or threading code in this part of the library, so it can be
-embedded in many different environments. The proton project provides language
-bindings (Python, Ruby, Go etc.) that embed it into the standard IO and
-threading facilities of the bound language.
+The [Proactor API](@ref proactor) is a pro-active, asynchronous framework to
+build single or multi-threaded Proton C applications. It manages the IO
+transport layer so you can write portable, event-driven AMQP code using the @ref
+engine API.
 
-## Integrating with IO
+## IO Integration
 
-The [Proactor API](@ref proactor) is a pro-active, asynchronous framewokr to
-build single or multi-threaded Proton C applications.
+The [connection driver](@ref connection_driver) provides a simple bytes in/bytes
+out, event-driven interface so you can read AMQP data from any source, process
+the resulting [events](@ref event) and write AMQP output to any destination. It
+lets you use proton in in alternate event loops, or for specialized embedded
+applications.
 
-For advanced use-cases it is possible to write your own implementation of the
-proactor API for an unusual IO or threading framework. Any proton application
+It is also possible to write your own implementation of the @ref proactor if you
+are dealing with an unusual IO or threading framework. Any proton application
 written to the proactor API will be able to use your implementation.
 
-## Messenger and Reactor APIs
-
-The [Messenger](@ref messenger) [Reactor](@ref reactor) APIs were intended
-to be simple APIs that included IO support directly out of the box.
+## Messenger and Reactor APIs (deprecated)
 
-They both had good points but were both based on the assumption of a single-threaded
-environment using a POSIX-like poll() call. This was a problem for performance on some
-platforms and did not support multi-threaded applications.
+The [Messenger](@ref messenger) [Reactor](@ref reactor) APIs are older APIs
+that were limited to single-threaded applications.
 
-Note however that application code which interacts with the AMQP @ref engine and
-processes AMQP @ref "events" event is the same for the proactor and reactor APIs,
-so is quite easy to convert. The main difference is in how connections are set up.
+Existing @ref reactor applications can be converted easily to use the @ref proactor,
+since they share the same @engine API and @ref event set.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/include/proton/connection.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/connection.h b/proton-c/include/proton/connection.h
index 0ed23b0..70fad73 100644
--- a/proton-c/include/proton/connection.h
+++ b/proton-c/include/proton/connection.h
@@ -38,7 +38,7 @@ extern "C" {
 /**
  * @file
  *
- * Connection API for the proton Engine.
+ * Connection API for the proton @ref engine
  *
  * @defgroup connection Connection
  * @ingroup engine

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/include/proton/connection_driver.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/connection_driver.h b/proton-c/include/proton/connection_driver.h
new file mode 100644
index 0000000..4fa3fb9
--- /dev/null
+++ b/proton-c/include/proton/connection_driver.h
@@ -0,0 +1,243 @@
+#ifndef PROTON_CONNECTION_DRIVER_H
+#define PROTON_CONNECTION_DRIVER_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * @file
+ *
+ * @defgroup connection_driver Connection Driver
+ *
+ * **Experimental**: Toolkit for integrating proton with arbitrary network or IO
+ * transports. Provides a single point of control for an AMQP connection and
+ * a simple bytes-in/bytes-out interface that lets you:
+ *
+ * - process AMQP-encoded bytes from some input byte stream
+ * - generate ::pn_event_t events for your application to handle
+ * - encode resulting AMQP output bytes for some output byte stream
+ *
+ * The pn_connection_driver_() functions provide a simplified API and extra
+ * logic to use ::pn_connection_t and ::pn_transport_t as a unit.  You can also
+ * access them directly for features that are not exposed via the @ref
+ * connection_driver API.
+ *
+ * The engine buffers events and data, you should run it until
+ * pn_connection_driver_finished() is true, to ensure all reading, writing and
+ * event handling (including ERROR and FINAL events) is finished.
+ *
+ * ## Error handling
+ *
+ * The pn_connection_driver_*() functions do not return an error code. IO errors set
+ * the transport condition and are returned as a PN_TRANSPORT_ERROR. The integration
+ * code can set errors using pn_connection_driver_errorf()
+ *
+ * ## IO patterns
+ *
+ * This API supports asynchronous, proactive, non-blocking and reactive IO. An
+ * integration does not have to follow the dispatch-read-write sequence above,
+ * but note that you should handle all available events before calling
+ * pn_connection_driver_read_buffer() and check that `size` is non-zero before
+ * starting a blocking or asynchronous read call. A `read` started while there
+ * are unprocessed CLOSE events in the buffer may never complete.
+ *
+ * AMQP is a full-duplex, asynchronous protocol. The "read" and "write" sides of
+ * an AMQP connection can close separately
+ *
+ * ## Thread safety
+ *
+ * The @ref engine types are not thread safe, but each connection and its
+ * associated types forms an independent unit. Different connections can be
+ * processed concurrently by different threads.
+ *
+ * @{
+ */
+
+#include <proton/import_export.h>
+#include <proton/event.h>
+#include <proton/types.h>
+
+#include <stdarg.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Struct containing the 3 elements needed to driver AMQP IO and events, aggregated as a unit.
+ */
+typedef struct pn_connection_driver_t {
+  pn_connection_t *connection;
+  pn_transport_t *transport;
+  pn_event_batch_t batch;
+} pn_connection_driver_t;
+
+/**
+ * Set #connection and #transport to the provided values, or create a new
+ * @ref pn_connection_t or @ref pn_transport_t if either is NULL.
+ * The provided values belong to the connection driver and will be freed by
+ * pn_connection_driver_destroy()
+ *
+ * The transport is bound automatically after the PN_CONNECTION_INIT has been is
+ * handled by the application. It can be bound earlier with
+ * pn_connection_driver_bind().
+ *
+ * The following functions must be called before the transport is
+ * bound to have effect: pn_connection_set_username(), pn_connection_set_password(),
+ * pn_transport_set_server()
+ *
+ * @return PN_OUT_OF_MEMORY if any allocation fails.
+ */
+PN_EXTERN int pn_connection_driver_init(pn_connection_driver_t*, pn_connection_t*, pn_transport_t*);
+
+/** Force binding of the transport.
+ * This happens automatically after the PN_CONNECTION_INIT is processed.
+ *
+ * @return PN_STATE_ERR if the transport is already bound.
+ */
+PN_EXTERN int pn_connection_driver_bind(pn_connection_driver_t *d);
+
+/**
+ * Unbind, release and free #connection and #transport. Set all pointers to
+ * NULL.  Does not free the @ref pn_connection_driver_t struct itself.
+ */
+PN_EXTERN void pn_connection_driver_destroy(pn_connection_driver_t *);
+
+/**
+ * Get the read buffer.
+ *
+ * Copy data from your input byte source to buf.start, up to buf.size.
+ * Call pn_connection_driver_read_done() when reading is complete.
+ *
+ * buf.size==0 means reading is not possible: no buffer space or the read side is closed.
+ */
+PN_EXTERN pn_rwbytes_t pn_connection_driver_read_buffer(pn_connection_driver_t *);
+
+/**
+ * Process the first n bytes of data in pn_connection_driver_read_buffer() and
+ * reclaim the buffer space.
+ */
+PN_EXTERN void pn_connection_driver_read_done(pn_connection_driver_t *, size_t n);
+
+/**
+ * Close the read side. Call when the IO can no longer be read.
+ */
+PN_EXTERN void pn_connection_driver_read_close(pn_connection_driver_t *);
+
+/**
+ * True if read side is closed.
+ */
+PN_EXTERN bool pn_connection_driver_read_closed(pn_connection_driver_t *);
+
+/**
+ * Get the write buffer.
+ *
+ * Write data from buf.start to your IO destination, up to a max of buf.size.
+ * Call pn_connection_driver_write_done() when writing is complete.
+ *
+ * buf.size==0 means there is nothing to write.
+ */
+ PN_EXTERN pn_bytes_t pn_connection_driver_write_buffer(pn_connection_driver_t *);
+
+/**
+ * Call when the first n bytes of pn_connection_driver_write_buffer() have been
+ * written to IO. Reclaims the buffer space and reset the write buffer.
+ */
+PN_EXTERN void pn_connection_driver_write_done(pn_connection_driver_t *, size_t n);
+
+/**
+ * Close the write side. Call when IO can no longer be written to.
+ */
+PN_EXTERN void pn_connection_driver_write_close(pn_connection_driver_t *);
+
+/**
+ * True if write side is closed.
+ */
+PN_EXTERN bool pn_connection_driver_write_closed(pn_connection_driver_t *);
+
+/**
+ * Close both sides side.
+ */
+PN_EXTERN void pn_connection_driver_close(pn_connection_driver_t * c);
+
+/**
+ * Get the next event to handle.
+ *
+ * @return pointer is valid till the next call of
+ * pn_connection_driver_next(). NULL if there are no more events available now,
+ * reading/writing may produce more.
+ */
+PN_EXTERN pn_event_t* pn_connection_driver_next_event(pn_connection_driver_t *);
+
+/**
+ * True if  pn_connection_driver_next_event() will return a non-NULL event.
+ */
+PN_EXTERN bool pn_connection_driver_has_event(pn_connection_driver_t *);
+
+/**
+ * Return true if the the engine is closed for reading and writing and there are
+ * no more events.
+ *
+ * Call pn_connection_driver_free() to free all related memory.
+ */
+PN_EXTERN bool pn_connection_driver_finished(pn_connection_driver_t *);
+
+/**
+ * Set IO error information.
+ *
+ * The name and formatted description are set on the transport condition, and
+ * returned as a PN_TRANSPORT_ERROR event from pn_connection_driver_next_event().
+ *
+ * You must call this *before* pn_connection_driver_read_close() or
+ * pn_connection_driver_write_close() to ensure the error is processed.
+ */
+PN_EXTERN void pn_connection_driver_errorf(pn_connection_driver_t *d, const char *name, const char *fmt, ...);
+
+/**
+ * Set IO error information via a va_list, see pn_connection_driver_errorf()
+ */
+PN_EXTERN void pn_connection_driver_verrorf(pn_connection_driver_t *d, const char *name, const char *fmt, va_list);
+
+/**
+ * Log a string message using the connection's transport log.
+ */
+PN_EXTERN void pn_connection_driver_log(pn_connection_driver_t *d, const char *msg);
+
+/**
+ * Log a printf formatted message using the connection's transport log.
+ */
+PN_EXTERN void pn_connection_driver_logf(pn_connection_driver_t *d, char *fmt, ...);
+
+/**
+ * Log a printf formatted message using the connection's transport log.
+ */
+PN_EXTERN void pn_connection_driver_vlogf(pn_connection_driver_t *d, const char *fmt, va_list ap);
+
+/**
+ * If batch is part of a connection_driver, return the connection_driver address,
+ * else return NULL
+ */
+PN_EXTERN pn_connection_driver_t* pn_event_batch_connection_driver(pn_event_batch_t *batch);
+///@}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // PROTON_CONNECTION_DRIVER_H

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/include/proton/connection_engine.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/connection_engine.h b/proton-c/include/proton/connection_engine.h
deleted file mode 100644
index b7022a9..0000000
--- a/proton-c/include/proton/connection_engine.h
+++ /dev/null
@@ -1,313 +0,0 @@
-#ifndef PROTON_CONNECTION_ENGINE_H
-#define PROTON_CONNECTION_ENGINE_H
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * @file
- *
- * **Experimental** The connection IO API is a set of functions to simplify
- * integrating proton with different IO and concurrency platforms. The portable
- * parts of a Proton application should use the @ref engine types.  We will
- * use "application" to mean the portable part of the application and
- * "integration" to mean code that integrates with a particular IO platform.
- *
- * The connection_engine functions take a @ref pn_connection_t\*, and perform common
- * tasks involving the @ref pn_connection_t and it's @ref pn_transport_t and
- * @ref pn_collector_t so you can treat them as a unit. You can also work with
- * these types directly for features not available via @ref connection_engine API.
- *
- * @defgroup connection_engine Connection Engine
- *
- * **Experimental**: Toolkit for integrating proton with arbitrary network or IO
- * transports. Provides a single point of control for an AMQP connection and
- * a simple bytes-in/bytes-out interface that lets you:
- *
- * - process AMQP-encoded bytes from some input byte stream
- * - generate @ref pn_event_t events for your application to handle
- * - encode resulting AMQP output bytes for some output byte stream
- *
- * The engine contains a @ref pn_connection_t, @ref pn_transport_t and @ref
- * pn_collector_t and provides functions to operate on all three as a unit for
- * IO integration. You can also use them directly for anything not covered by
- * this API
- *
- * For example a simple blocking IO integration with the imaginary "my_io" library:
- *
- *     pn_connection_engine_t ce;
- *     pn_connection_engine_init(&ce);
- *     while (!pn_connection_engine_finished(&ce) {
- *         // Dispatch events to be handled by the application.
- *         pn_event_t *e;
- *         while ((e = pn_connection_engine_event(&ce))!= NULL) {
- *             my_app_handle(e); // Pass to the application handler
- *             switch (pn_event_type(e)) {
- *                 case PN_CONNECTION_INIT: pn_connection_engine_bind(&ce);
- *                 // Only for full-duplex IO where read/write can shutdown separately.
- *                 case PN_TRANSPORT_CLOSE_READ: my_io_shutdown_read(...); break;
- *                 case PN_TRANSPORT_CLOSE_WRITE: my_io_shutdown_write(...); break;
- *                 default: break;
- *             };
- *             e = pn_connection_engine_pop_event(&ce);
- *         }
- *         // Read from my_io into the connection buffer
- *         pn_rwbytes_t readbuf = pn_connection_engine_read_buffer(&ce);
- *         if (readbuf.size) {
- *             size_t n = my_io_read(readbuf.start, readbuf.size, ...);
- *             if (n > 0) {
- *                 pn_connection_engine_read_done(&ce, n);
- *             } else if (n < 0) {
- *                 pn_connection_engine_errorf(&ce, "read-err", "something-bad (%d): %s", n, ...);
- *                 pn_connection_engine_read_close(&ce);
- *             }
- *         }
- *         // Write from connection buffer to my_io
- *         pn_bytes_t writebuf = pn_connection_engine_write_buffer(&ce);
- *         if (writebuf.size) {
- *             size_t n = my_io_write_data(writebuf.start, writebuf.size, ...);
- *             if (n < 0) {
- *                 pn_connection_engine_errorf(&ce, "write-err", "something-bad (%d): %s", d, ...);
- *                 pn_connection_engine_write_close(&ce);
- *             } else {
- *                 pn_connection_engine_write_done(&ce, n);
- *             }
- *         }
- *     }
- *     // If my_io doesn't have separate read/write shutdown, then we should close it now.
- *     my_io_close(...);
- *
- * AMQP is a full-duplex, asynchronous protocol. The "read" and "write" sides of
- * an AMQP connection can close separately, the example shows how to handle this
- * for full-duplex IO or IO with a simple close.
- *
- * The engine buffers events, you must keep processing till
- * pn_connection_engine_finished() is true, to ensure all reading, writing and event
- * handling (including ERROR and FINAL events) is completely finished.
- *
- * ## Error handling
- *
- * The pn_connection_engine_*() functions do not return an error code. IO errors set
- * the transport condition and are returned as a PN_TRANSPORT_ERROR. The integration
- * code can set errors using pn_connection_engine_errorf()
- *
- * ## Other IO patterns
- *
- * This API supports asynchronous, proactive, non-blocking and reactive IO. An
- * integration does not have to follow the dispatch-read-write sequence above,
- * but note that you should handle all available events before calling
- * pn_connection_engine_read_buffer() and check that `size` is non-zero before
- * starting a blocking or asynchronous read call. A `read` started while there
- * are unprocessed CLOSE events in the buffer may never complete.
- *
- * ## Thread safety
- *
- * The @ref engine types are not thread safe, but each connection and its
- * associated types forms an independent unit. Different connections can be
- * processed concurrently by different threads.
- *
- * @defgroup connection_engine Connection IO
- * @{
- */
-
-#include <proton/import_export.h>
-#include <proton/event.h>
-#include <proton/types.h>
-
-#include <stdarg.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-/**
- * Struct containing a connection, transport and collector. See
- * pn_connection_engine_init(), pn_connection_engine_destroy() and pn_connection_engine()
- */
-typedef struct pn_connection_engine_t {
-  pn_connection_t *connection;
-  pn_transport_t *transport;
-  pn_collector_t *collector;
-} pn_connection_engine_t;
-
-/**
- * Set #connection and #transport to the provided values, or create a new
- * @ref pn_connection_t or @ref pn_transport_t if either is NULL.
- * The provided values belong to the connection engine and will be freed by
- * pn_connection_engine_destroy()
- *
- * Create a new @ref pn_collector_t and set as #collector.
- *
- * The transport and connection are *not* bound at this point. You should
- * configure them as needed and let the application handle the
- * PN_CONNECTION_INIT from pn_connection_engine_event() before calling
- * pn_connection_engine_bind().
- *
- * @return if any allocation fails call pn_connection_engine_destroy() and return PN_OUT_OF_MEMORY
- */
-PN_EXTERN int pn_connection_engine_init(pn_connection_engine_t*, pn_connection_t*, pn_transport_t*);
-
-/**
- * Bind the connection to the transport when the external IO is ready.
- *
- * The following functions (if called at all) must be called *before* bind:
- * pn_connection_set_username(), pn_connection_set_password(),  pn_transport_set_server()
- *
- * If there is an external IO error during setup, set a transport error, close
- * the transport and then bind. The error events are reported to the application
- * via pn_connection_engine_event().
- *
- * @return an error code if the bind fails.
- */
-PN_EXTERN int pn_connection_engine_bind(pn_connection_engine_t *);
-
-/**
- * Unbind, release and free #connection, #transpot and #collector. Set all pointers to NULL.
- * Does not free the @ref pn_connection_engine_t struct itself.
- */
-PN_EXTERN void pn_connection_engine_destroy(pn_connection_engine_t *);
-
-/**
- * Get the read buffer.
- *
- * Copy data from your input byte source to buf.start, up to buf.size.
- * Call pn_connection_engine_read_done() when reading is complete.
- *
- * buf.size==0 means reading is not possible: no buffer space or the read side is closed.
- */
-PN_EXTERN pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *);
-
-/**
- * Process the first n bytes of data in pn_connection_engine_read_buffer() and
- * reclaim the buffer space.
- */
-PN_EXTERN void pn_connection_engine_read_done(pn_connection_engine_t *, size_t n);
-
-/**
- * Close the read side. Call when the IO can no longer be read.
- */
-PN_EXTERN void pn_connection_engine_read_close(pn_connection_engine_t *);
-
-/**
- * True if read side is closed.
- */
-PN_EXTERN bool pn_connection_engine_read_closed(pn_connection_engine_t *);
-
-/**
- * Get the write buffer.
- *
- * Write data from buf.start to your IO destination, up to a max of buf.size.
- * Call pn_connection_engine_write_done() when writing is complete.
- *
- * buf.size==0 means there is nothing to write.
- */
- PN_EXTERN pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *);
-
-/**
- * Call when the first n bytes of pn_connection_engine_write_buffer() have been
- * written to IO. Reclaims the buffer space and reset the write buffer.
- */
-PN_EXTERN void pn_connection_engine_write_done(pn_connection_engine_t *, size_t n);
-
-/**
- * Close the write side. Call when IO can no longer be written to.
- */
-PN_EXTERN void pn_connection_engine_write_close(pn_connection_engine_t *);
-
-/**
- * True if write side is closed.
- */
-PN_EXTERN bool pn_connection_engine_write_closed(pn_connection_engine_t *);
-
-/**
- * Close both sides side.
- */
-PN_EXTERN void pn_connection_engine_close(pn_connection_engine_t * c);
-
-/**
- * Get the current event. Call pn_connection_engine_done() when done handling it.
- * Note that if PN_TRACE_EVT is enabled this will log the event, so you should
- * avoid calling it more than once per event. Use pn_connection_engine_has_event()
- * to silently test if any events are available.
- *
- * @return NULL if there are no more events ready. Reading/writing data may produce more.
- */
-PN_EXTERN pn_event_t* pn_connection_engine_event(pn_connection_engine_t *);
-
-/**
- * True if  pn_connection_engine_event() will return a non-NULL event.
- */
-PN_EXTERN bool pn_connection_engine_has_event(pn_connection_engine_t *);
-
-/**
- * Drop the current event, advance pn_connection_engine_event() to the next event.
- */
-PN_EXTERN void pn_connection_engine_pop_event(pn_connection_engine_t *);
-
-/**
- * Return true if the the engine is closed for reading and writing and there are
- * no more events.
- *
- * Call pn_connection_engine_free() to free all related memory.
- */
-PN_EXTERN bool pn_connection_engine_finished(pn_connection_engine_t *);
-
-/**
- * Set IO error information.
- *
- * The name and formatted description are set on the transport condition, and
- * returned as a PN_TRANSPORT_ERROR event from pn_connection_engine_event().
- *
- * You must call this *before* pn_connection_engine_read_close() or
- * pn_connection_engine_write_close() to ensure the error is processed.
- *
- * If there is already a transport condition set, this call does nothing.  For
- * more complex cases, you can work with the transport condition directly using:
- *
- *     pn_condition_t *cond = pn_transport_condition(pn_connection_transport(conn));
- */
-PN_EXTERN void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...);
-
-/**
- * Set IO error information via a va_list, see pn_connection_engine_errorf()
- */
-PN_EXTERN void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list);
-
-/**
- * Log a string message using the connection's transport log.
- */
-PN_EXTERN void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg);
-
-/**
- * Log a printf formatted message using the connection's transport log.
- */
-PN_EXTERN void pn_connection_engine_logf(pn_connection_engine_t *ce, char *fmt, ...);
-
-/**
- * Log a printf formatted message using the connection's transport log.
- */
-PN_EXTERN void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap);
-
-///@}
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif // PROTON_CONNECTION_ENGINE_H

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/include/proton/cproton.i
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/cproton.i b/proton-c/include/proton/cproton.i
index ffcf830..931437e 100644
--- a/proton-c/include/proton/cproton.i
+++ b/proton-c/include/proton/cproton.i
@@ -39,6 +39,9 @@ typedef unsigned long int uintptr_t;
 %ignore pn_bytes_t;
 %ignore pn_rwbytes_t;
 
+/* pn_event_batch_t is not used directly by bindings */
+%ignore pn_event_batch_t;
+
 /* There is no need to wrap pn_class_t aa it is an internal implementation detail and cannot be used outside the library */
 %ignore pn_class_t;
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/include/proton/event.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h
index 4dca2d5..31d4bdd 100644
--- a/proton-c/include/proton/event.h
+++ b/proton-c/include/proton/event.h
@@ -428,6 +428,32 @@ PN_EXTERN pn_event_t *pn_collector_peek(pn_collector_t *collector);
 PN_EXTERN bool pn_collector_pop(pn_collector_t *collector);
 
 /**
+ * Return the next event to be handled.
+ *
+ * Returns the head event if it has not previously been returned by
+ * pn_collector_next(), otherwise does pn_collector_pop() and returns
+ * the new head event.
+ *
+ * The returned pointer is valid till the next call of pn_collector_pop(),
+ * pn_collector_next(), pn_collector_release() or pn_collector_free()
+ *
+ * @param[in] collector a collector object
+ * @return the next event.
+ */
+PN_EXTERN pn_event_t *pn_collector_next(pn_collector_t *collector);
+
+/**
+ * Return the same event as the previous call to pn_collector_next()
+ *
+ * The returned pointer is valid till the next call of pn_collector_pop(),
+ * pn_collector_next(), pn_collector_release() or pn_collector_free()
+ *
+ * @param[in] collector a collector object
+ * @return a pointer to the event returned by previous call to pn_collector_next()
+ */
+PN_EXTERN pn_event_t *pn_collector_prev(pn_collector_t *collector);
+
+/**
  * Check if there are more events after the current event. If this
  * returns true, then pn_collector_peek() will return an event even
  * after pn_collector_pop() is called.
@@ -506,6 +532,36 @@ PN_EXTERN pn_transport_t *pn_event_transport(pn_event_t *event);
  */
 PN_EXTERN pn_record_t *pn_event_attachments(pn_event_t *event);
 
+/**
+ * **Experimental**: A batch of events to handle. Call pn_event_batch_next() in
+ * a loop until it returns NULL to handle them.
+ */
+typedef struct pn_event_batch_t pn_event_batch_t;
+
+/* NOTE: there is deliberately no peek(), more() or other look-ahead on an event
+ * batch. We want to know exactly which events have been handled, next() only
+ * allows the user to get each event exactly once, in order.
+ */
+
+/**
+ * **Experimental**: Remove the next event from the batch and return it. NULL
+ *  means the batch is empty. The returned event pointer is valid until
+ *  pn_event_batch_next() is called again on the same batch.
+ */
+PN_EXTERN pn_event_t *pn_event_batch_next(pn_event_batch_t *batch);
+
+/**
+ *@cond INTERNAL
+ * pn_event_batch_next() can be re-implemented for different behaviors in different contextxs.
+ */
+struct pn_event_batch_t {
+  pn_event_t *(*next_event)(pn_event_batch_t *batch);
+};
+
+/**
+ *@endcond
+ */
+
 #ifdef __cplusplus
 }
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 49d7b6a..e23a24f 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -33,30 +33,27 @@ typedef struct pn_condition_t pn_condition_t;
 /**
  * @file
  *
- * **Experimental**: Proactor API for the the proton @ref engine.
+ * **Experimental**: Proactor API for the the proton @ref engine
  *
  * @defgroup proactor Proactor
  *
  * **Experimental**: Proactor API for portable, multi-threaded, asynchronous applications.
  *
- * The proactor establishes and listens for connections. It creates the @ref
- * "transport" transport that sends and receives data over the network and
- * delivers @ref "events" event to application threads for processing.
+ * The proactor establishes and listens for connections. It creates
+ * the @ref transport that sends and receives data over the network and
+ * delivers @ref event to application threads for handling.
  *
- * ## Multi-threading
- *
- * The @ref proactor is thread-safe, but the @ref "protocol engine" is not.  The
- * proactor ensures that each @ref "connection" connection and its associated
- * values (@ref session, @ref link etc.) is processed sequentially, even if there
- * are multiple application threads. See pn_proactor_wait()
+ * **Multi-threading**:
+ * The @ref proactor is thread-safe, but the @ref engine is not.  The proactor
+ * ensures that each @ref connection and its associated values (@ref session,
+ * @ref link etc.) is handle sequentially, even if there are multiple
+ * application threads. See pn_proactor_wait()
  *
  * @{
  */
 
 /**
- * The proactor creates and manage @ref "transports" transport and delivers @ref
- * "event" events to the application.
- *
+ * The proactor.
  */
 typedef struct pn_proactor_t pn_proactor_t;
 
@@ -70,13 +67,6 @@ pn_proactor_t *pn_proactor(void);
  */
 void pn_proactor_free(pn_proactor_t*);
 
-/* FIXME aconway 2016-11-12: connect and listen need options to enable
-   things like websockets, alternate encryption or other features.
-   The "extra" parameter will be replaced by an "options" parameter
-   that will include providing extra data and other manipulators
-   to affect how the connection is processed.
-*/
-
 /**
  * Asynchronous connect: a connection and transport will be created, the
  * relevant events will be returned by pn_proactor_wait()
@@ -104,13 +94,27 @@ int pn_proactor_connect(pn_proactor_t*, const char *host, const char *port, pn_b
 pn_listener_t *pn_proactor_listen(pn_proactor_t *, const char *host, const char *port, int backlog, pn_bytes_t extra);
 
 /**
- * Wait for an event. Can be called in multiple threads concurrently.
- * You must call pn_event_done() when the event has been handled.
+ * Wait for events to handle. Call pn_proactor_done() after handling events.
+ *
+ * Thread safe: pn_proactor_wait() can be called concurrently, but the events in
+ * the returned ::pn_event_batch_t must be handled sequentially.
+ *
+ * The proactor always returns events that must be handled sequentially in the
+ * same batch or sequentially in a later batch after pn_proactor_done(). Any
+ * events returned concurrently by pn_proactor_wait() are safe to handle
+ * concurrently.
+ */
+pn_event_batch_t *pn_proactor_wait(pn_proactor_t* d);
+
+/**
+ * Call when done handling events.
  *
- * The proactor ensures that events that cannot be handled concurrently
- * (e.g. events for for the same connection) are never returned concurrently.
+ * It is generally most efficient to handle the entire batch in the thread
+ * that calls pn_proactor_wait(), then call pn_proactor_done(). If you call
+ * pn_proactor_done() earlier, the remaining events will be returned again by
+ * pn_proactor_wait(), possibly to another thread.
  */
-pn_event_t *pn_proactor_wait(pn_proactor_t* d);
+void pn_proactor_done(pn_proactor_t* d, pn_event_batch_t *events);
 
 /**
  * Cause PN_PROACTOR_INTERRUPT to be returned to exactly one thread calling wait()
@@ -146,14 +150,6 @@ void pn_connection_wake(pn_connection_t *c);
 pn_proactor_t *pn_connection_proactor(pn_connection_t *c);
 
 /**
- * Call when a proactor event has been handled. Does nothing if not a proactor event.
- *
- * Thread safe: May be called from any thread but must be called exactly once
- * for each event returned by pn_proactor_wait()
- */
-void pn_event_done(pn_event_t *);
-
-/**
  * Get the proactor that created the event or NULL.
  */
 pn_proactor_t *pn_event_proactor(pn_event_t *);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/src/core/connection_driver.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/connection_driver.c b/proton-c/src/core/connection_driver.c
index f31ddb0..3393e64 100644
--- a/proton-c/src/core/connection_driver.c
+++ b/proton-c/src/core/connection_driver.c
@@ -20,144 +20,149 @@
 #include "engine-internal.h"
 #include <proton/condition.h>
 #include <proton/connection.h>
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
 #include <proton/transport.h>
 #include <string.h>
 
-int pn_connection_engine_init(pn_connection_engine_t* ce, pn_connection_t *c, pn_transport_t *t) {
-  ce->connection = c ? c : pn_connection();
-  ce->transport = t ? t : pn_transport();
-  ce->collector = pn_collector();
-  if (!ce->connection || !ce->transport || !ce->collector) {
-    pn_connection_engine_destroy(ce);
+struct driver_batch {
+  pn_event_batch_t batch;
+};
+
+static pn_event_t *batch_next(pn_event_batch_t *batch) {
+  pn_connection_driver_t *d =
+    (pn_connection_driver_t*)((char*)batch - offsetof(pn_connection_driver_t, batch));
+  pn_collector_t *collector = pn_connection_collector(d->connection);
+  pn_event_t *handled = pn_collector_prev(collector);
+  if (handled && pn_event_type(handled) == PN_CONNECTION_INIT) {
+      pn_transport_bind(d->transport, d->connection); /* Init event handled, auto-bind */
+  }
+  pn_event_t *next = pn_collector_next(collector);
+  if (next && d->transport->trace & PN_TRACE_EVT) {
+    pn_string_clear(d->transport->scratch);
+    pn_inspect(next, d->transport->scratch);
+    pn_transport_log(d->transport, pn_string_get(d->transport->scratch));
+  }
+  return next;
+}
+
+int pn_connection_driver_init(pn_connection_driver_t* d, pn_connection_t *c, pn_transport_t *t) {
+  memset(d, 0, sizeof(*d));
+  d->batch.next_event = &batch_next;
+  d->connection = c ? c : pn_connection();
+  d->transport = t ? t : pn_transport();
+  pn_collector_t *collector = pn_collector();
+  if (!d->connection || !d->transport || !collector) {
+    if (collector) pn_collector_free(collector);
+    pn_connection_driver_destroy(d);
     return PN_OUT_OF_MEMORY;
   }
-  pn_connection_collect(ce->connection, ce->collector);
+  pn_connection_collect(d->connection, collector);
   return 0;
 }
 
-int pn_connection_engine_bind(pn_connection_engine_t *ce) {
-  return pn_transport_bind(ce->transport, ce->connection);
+int pn_connection_driver_bind(pn_connection_driver_t *d) {
+  return pn_transport_bind(d->transport, d->connection);
 }
 
-void pn_connection_engine_destroy(pn_connection_engine_t *ce) {
-  if (ce->transport) {
-    pn_transport_unbind(ce->transport);
-    pn_transport_free(ce->transport);
+void pn_connection_driver_destroy(pn_connection_driver_t *d) {
+  if (d->transport) {
+    pn_transport_unbind(d->transport);
+    pn_transport_free(d->transport);
+  }
+  if (d->connection) {
+    pn_collector_t *collector = pn_connection_collector(d->connection);
+    pn_connection_free(d->connection);
+    pn_collector_free(collector);
   }
-  if (ce->collector) pn_collector_free(ce->collector);
-  if (ce->connection) pn_connection_free(ce->connection);
-  memset(ce, 0, sizeof(*ce));
+  memset(d, 0, sizeof(*d));
 }
 
-pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *ce) {
-  ssize_t cap = pn_transport_capacity(ce->transport);
-  return (cap > 0) ?  pn_rwbytes(cap, pn_transport_tail(ce->transport)) : pn_rwbytes(0, 0);
+pn_rwbytes_t pn_connection_driver_read_buffer(pn_connection_driver_t *d) {
+  ssize_t cap = pn_transport_capacity(d->transport);
+  return (cap > 0) ?  pn_rwbytes(cap, pn_transport_tail(d->transport)) : pn_rwbytes(0, 0);
 }
 
-void pn_connection_engine_read_done(pn_connection_engine_t *ce, size_t n) {
-  if (n > 0) pn_transport_process(ce->transport, n);
+void pn_connection_driver_read_done(pn_connection_driver_t *d, size_t n) {
+  if (n > 0) pn_transport_process(d->transport, n);
 }
 
-bool pn_connection_engine_read_closed(pn_connection_engine_t *ce) {
-  return pn_transport_capacity(ce->transport) < 0;
+bool pn_connection_driver_read_closed(pn_connection_driver_t *d) {
+  return pn_transport_capacity(d->transport) < 0;
 }
 
-void pn_connection_engine_read_close(pn_connection_engine_t *ce) {
-  if (!pn_connection_engine_read_closed(ce)) {
-    pn_transport_close_tail(ce->transport);
+void pn_connection_driver_read_close(pn_connection_driver_t *d) {
+  if (!pn_connection_driver_read_closed(d)) {
+    pn_transport_close_tail(d->transport);
   }
 }
 
-pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *ce) {
-  ssize_t pending = pn_transport_pending(ce->transport);
+pn_bytes_t pn_connection_driver_write_buffer(pn_connection_driver_t *d) {
+  ssize_t pending = pn_transport_pending(d->transport);
   return (pending > 0) ?
-    pn_bytes(pending, pn_transport_head(ce->transport)) : pn_bytes_null;
+    pn_bytes(pending, pn_transport_head(d->transport)) : pn_bytes_null;
 }
 
-void pn_connection_engine_write_done(pn_connection_engine_t *ce, size_t n) {
+void pn_connection_driver_write_done(pn_connection_driver_t *d, size_t n) {
   if (n > 0)
-    pn_transport_pop(ce->transport, n);
+    pn_transport_pop(d->transport, n);
 }
 
-bool pn_connection_engine_write_closed(pn_connection_engine_t *ce) {
-  return pn_transport_pending(ce->transport) < 0;
+bool pn_connection_driver_write_closed(pn_connection_driver_t *d) {
+  return pn_transport_pending(d->transport) < 0;
 }
 
-void pn_connection_engine_write_close(pn_connection_engine_t *ce) {
-  if (!pn_connection_engine_write_closed(ce)) {
-    pn_transport_close_head(ce->transport);
+void pn_connection_driver_write_close(pn_connection_driver_t *d) {
+  if (!pn_connection_driver_write_closed(d)) {
+    pn_transport_close_head(d->transport);
   }
 }
 
-void pn_connection_engine_close(pn_connection_engine_t *ce) {
-  pn_connection_engine_read_close(ce);
-  pn_connection_engine_write_close(ce);
-}
-
-pn_event_t* pn_connection_engine_event(pn_connection_engine_t *ce) {
-  pn_event_t *e = ce->collector ? pn_collector_peek(ce->collector) : NULL;
-  if (e) {
-    pn_transport_t *t = ce->transport;
-    if (t && t->trace & PN_TRACE_EVT) {
-      /* This can log the same event twice if pn_connection_engine_event is called
-       * twice but for debugging it is much better to log before handling than after.
-       */
-      pn_string_clear(t->scratch);
-      pn_inspect(e, t->scratch);
-      pn_transport_log(t, pn_string_get(t->scratch));
-    }
-  }
-  return e;
+void pn_connection_driver_close(pn_connection_driver_t *d) {
+  pn_connection_driver_read_close(d);
+  pn_connection_driver_write_close(d);
 }
 
-bool pn_connection_engine_has_event(pn_connection_engine_t *ce) {
-  return ce->collector && pn_collector_peek(ce->collector);
+pn_event_t* pn_connection_driver_next_event(pn_connection_driver_t *d) {
+  return pn_event_batch_next(&d->batch);
 }
 
-void pn_connection_engine_pop_event(pn_connection_engine_t *ce) {
-  if (ce->collector) {
-    pn_event_t *e = pn_collector_peek(ce->collector);
-    if (pn_event_type(e) == PN_TRANSPORT_CLOSED) { /* The last event ever */
-      /* Events can accumulate behind the TRANSPORT_CLOSED before the
-       * PN_TRANSPORT_CLOSED event is handled. They can never be processed
-       * so release them.
-       */
-      pn_collector_release(ce->collector);
-    } else {
-      pn_collector_pop(ce->collector);
-    }
-
-  }
+bool pn_connection_driver_has_event(pn_connection_driver_t *d) {
+  return pn_collector_peek(pn_connection_collector(d->connection));
 }
 
-bool pn_connection_engine_finished(pn_connection_engine_t *ce) {
-  return pn_transport_closed(ce->transport) && !pn_connection_engine_has_event(ce);
+bool pn_connection_driver_finished(pn_connection_driver_t *d) {
+  return pn_transport_closed(d->transport) && !pn_connection_driver_has_event(d);
 }
 
-void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list ap) {
-  pn_transport_t *t = ce->transport;
+void pn_connection_driver_verrorf(pn_connection_driver_t *d, const char *name, const char *fmt, va_list ap) {
+  pn_transport_t *t = d->transport;
   pn_condition_t *cond = pn_transport_condition(t);
   pn_string_vformat(t->scratch, fmt, ap);
   pn_condition_set_name(cond, name);
   pn_condition_set_description(cond, pn_string_get(t->scratch));
 }
 
-void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...) {
+void pn_connection_driver_errorf(pn_connection_driver_t *d, const char *name, const char *fmt, ...) {
   va_list ap;
   va_start(ap, fmt);
-  pn_connection_engine_verrorf(ce, name, fmt, ap);
+  pn_connection_driver_verrorf(d, name, fmt, ap);
   va_end(ap);
 }
 
-void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg) {
-  pn_transport_log(ce->transport, msg);
+void pn_connection_driver_log(pn_connection_driver_t *d, const char *msg) {
+  pn_transport_log(d->transport, msg);
+}
+
+void pn_connection_driver_vlogf(pn_connection_driver_t *d, const char *fmt, va_list ap) {
+  pn_transport_vlogf(d->transport, fmt, ap);
 }
 
-void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap) {
-  pn_transport_vlogf(ce->transport, fmt, ap);
+void pn_connection_driver_vlog(pn_connection_driver_t *d, const char *msg) {
+  pn_transport_log(d->transport, msg);
 }
 
-void pn_connection_engine_vlog(pn_connection_engine_t *ce, const char *msg) {
-  pn_transport_log(ce->transport, msg);
+pn_connection_driver_t* pn_event_batch_connection_driver(pn_event_batch_t *batch) {
+  return (batch->next_event == batch_next) ?
+    (pn_connection_driver_t*)((char*)batch - offsetof(pn_connection_driver_t, batch)) :
+    NULL;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/src/core/connection_engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/connection_engine.c b/proton-c/src/core/connection_engine.c
deleted file mode 100644
index f31ddb0..0000000
--- a/proton-c/src/core/connection_engine.c
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "engine-internal.h"
-#include <proton/condition.h>
-#include <proton/connection.h>
-#include <proton/connection_engine.h>
-#include <proton/transport.h>
-#include <string.h>
-
-int pn_connection_engine_init(pn_connection_engine_t* ce, pn_connection_t *c, pn_transport_t *t) {
-  ce->connection = c ? c : pn_connection();
-  ce->transport = t ? t : pn_transport();
-  ce->collector = pn_collector();
-  if (!ce->connection || !ce->transport || !ce->collector) {
-    pn_connection_engine_destroy(ce);
-    return PN_OUT_OF_MEMORY;
-  }
-  pn_connection_collect(ce->connection, ce->collector);
-  return 0;
-}
-
-int pn_connection_engine_bind(pn_connection_engine_t *ce) {
-  return pn_transport_bind(ce->transport, ce->connection);
-}
-
-void pn_connection_engine_destroy(pn_connection_engine_t *ce) {
-  if (ce->transport) {
-    pn_transport_unbind(ce->transport);
-    pn_transport_free(ce->transport);
-  }
-  if (ce->collector) pn_collector_free(ce->collector);
-  if (ce->connection) pn_connection_free(ce->connection);
-  memset(ce, 0, sizeof(*ce));
-}
-
-pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *ce) {
-  ssize_t cap = pn_transport_capacity(ce->transport);
-  return (cap > 0) ?  pn_rwbytes(cap, pn_transport_tail(ce->transport)) : pn_rwbytes(0, 0);
-}
-
-void pn_connection_engine_read_done(pn_connection_engine_t *ce, size_t n) {
-  if (n > 0) pn_transport_process(ce->transport, n);
-}
-
-bool pn_connection_engine_read_closed(pn_connection_engine_t *ce) {
-  return pn_transport_capacity(ce->transport) < 0;
-}
-
-void pn_connection_engine_read_close(pn_connection_engine_t *ce) {
-  if (!pn_connection_engine_read_closed(ce)) {
-    pn_transport_close_tail(ce->transport);
-  }
-}
-
-pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *ce) {
-  ssize_t pending = pn_transport_pending(ce->transport);
-  return (pending > 0) ?
-    pn_bytes(pending, pn_transport_head(ce->transport)) : pn_bytes_null;
-}
-
-void pn_connection_engine_write_done(pn_connection_engine_t *ce, size_t n) {
-  if (n > 0)
-    pn_transport_pop(ce->transport, n);
-}
-
-bool pn_connection_engine_write_closed(pn_connection_engine_t *ce) {
-  return pn_transport_pending(ce->transport) < 0;
-}
-
-void pn_connection_engine_write_close(pn_connection_engine_t *ce) {
-  if (!pn_connection_engine_write_closed(ce)) {
-    pn_transport_close_head(ce->transport);
-  }
-}
-
-void pn_connection_engine_close(pn_connection_engine_t *ce) {
-  pn_connection_engine_read_close(ce);
-  pn_connection_engine_write_close(ce);
-}
-
-pn_event_t* pn_connection_engine_event(pn_connection_engine_t *ce) {
-  pn_event_t *e = ce->collector ? pn_collector_peek(ce->collector) : NULL;
-  if (e) {
-    pn_transport_t *t = ce->transport;
-    if (t && t->trace & PN_TRACE_EVT) {
-      /* This can log the same event twice if pn_connection_engine_event is called
-       * twice but for debugging it is much better to log before handling than after.
-       */
-      pn_string_clear(t->scratch);
-      pn_inspect(e, t->scratch);
-      pn_transport_log(t, pn_string_get(t->scratch));
-    }
-  }
-  return e;
-}
-
-bool pn_connection_engine_has_event(pn_connection_engine_t *ce) {
-  return ce->collector && pn_collector_peek(ce->collector);
-}
-
-void pn_connection_engine_pop_event(pn_connection_engine_t *ce) {
-  if (ce->collector) {
-    pn_event_t *e = pn_collector_peek(ce->collector);
-    if (pn_event_type(e) == PN_TRANSPORT_CLOSED) { /* The last event ever */
-      /* Events can accumulate behind the TRANSPORT_CLOSED before the
-       * PN_TRANSPORT_CLOSED event is handled. They can never be processed
-       * so release them.
-       */
-      pn_collector_release(ce->collector);
-    } else {
-      pn_collector_pop(ce->collector);
-    }
-
-  }
-}
-
-bool pn_connection_engine_finished(pn_connection_engine_t *ce) {
-  return pn_transport_closed(ce->transport) && !pn_connection_engine_has_event(ce);
-}
-
-void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list ap) {
-  pn_transport_t *t = ce->transport;
-  pn_condition_t *cond = pn_transport_condition(t);
-  pn_string_vformat(t->scratch, fmt, ap);
-  pn_condition_set_name(cond, name);
-  pn_condition_set_description(cond, pn_string_get(t->scratch));
-}
-
-void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...) {
-  va_list ap;
-  va_start(ap, fmt);
-  pn_connection_engine_verrorf(ce, name, fmt, ap);
-  va_end(ap);
-}
-
-void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg) {
-  pn_transport_log(ce->transport, msg);
-}
-
-void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap) {
-  pn_transport_vlogf(ce->transport, fmt, ap);
-}
-
-void pn_connection_engine_vlog(pn_connection_engine_t *ce, const char *msg) {
-  pn_transport_log(ce->transport, msg);
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/src/core/event.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/event.c b/proton-c/src/core/event.c
index 7882327..2a0a5cf 100644
--- a/proton-c/src/core/event.c
+++ b/proton-c/src/core/event.c
@@ -28,7 +28,8 @@ struct pn_collector_t {
   pn_list_t *pool;
   pn_event_t *head;
   pn_event_t *tail;
-  bool freed;
+  bool freed:1;
+  bool head_returned:1;         /* Head has been returned by pn_collector_next() */
 };
 
 struct pn_event_t {
@@ -51,11 +52,8 @@ static void pn_collector_initialize(pn_collector_t *collector)
 static void pn_collector_drain(pn_collector_t *collector)
 {
   assert(collector);
-
-  while (pn_collector_peek(collector)) {
-    pn_collector_pop(collector);
-  }
-
+  while (pn_collector_next(collector))
+    ;
   assert(!collector->head);
   assert(!collector->tail);
 }
@@ -175,6 +173,7 @@ pn_event_t *pn_collector_peek(pn_collector_t *collector)
 
 bool pn_collector_pop(pn_collector_t *collector)
 {
+  collector->head_returned = false;
   pn_event_t *event = collector->head;
   if (event) {
     collector->head = event->next;
@@ -190,6 +189,19 @@ bool pn_collector_pop(pn_collector_t *collector)
   return true;
 }
 
+pn_event_t *pn_collector_next(pn_collector_t *collector)
+{
+  if (collector->head_returned) {
+    pn_collector_pop(collector);
+  }
+  collector->head_returned = collector->head;
+  return collector->head;
+}
+
+pn_event_t *pn_collector_prev(pn_collector_t *collector) {
+  return collector->head_returned ? collector->head : NULL;
+}
+
 bool pn_collector_more(pn_collector_t *collector)
 {
   assert(collector);
@@ -386,3 +398,7 @@ const char *pn_event_type_name(pn_event_type_t type)
   }
   return NULL;
 }
+
+pn_event_t *pn_event_batch_next(pn_event_batch_t *batch) {
+  return batch->next_event(batch);
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/src/tests/refcount.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/refcount.c b/proton-c/src/tests/refcount.c
index a36d01c..267c861 100644
--- a/proton-c/src/tests/refcount.c
+++ b/proton-c/src/tests/refcount.c
@@ -313,7 +313,8 @@ static void test_transport_connection(void) {
 }
 
 static void drain(pn_collector_t *collector) {
-  while (pn_collector_peek(collector)) { pn_collector_pop(collector); }
+  while (pn_collector_next(collector))
+    ;
 }
 
 static void test_collector_connection_transport(void) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/qpid-proton-cpp.syms
----------------------------------------------------------------------
diff --git a/qpid-proton-cpp.syms b/qpid-proton-cpp.syms
index 7a25651..c24e898 100644
--- a/qpid-proton-cpp.syms
+++ b/qpid-proton-cpp.syms
@@ -30,24 +30,24 @@ proton::connection::transport() const
 proton::connection::user(std::string const&)
 proton::connection::~connection()
 
-proton::connection_engine::can_read() const
-proton::connection_engine::can_write() const
-proton::connection_engine::closed() const
-proton::connection_engine::connection() const
-proton::connection_engine::connection_engine(proton::handler&, proton::connection_options const&)
-proton::connection_engine::container::container(std::string const&)
-proton::connection_engine::container::id() const
-proton::connection_engine::container::make_options()
-proton::connection_engine::container::options(proton::connection_options const&)
-proton::connection_engine::container::~container()
-proton::connection_engine::dispatch()
-proton::connection_engine::io_error::io_error(std::string const&)
-proton::connection_engine::io_error::~io_error()
-proton::connection_engine::no_opts
-proton::connection_engine::process(int)
-proton::connection_engine::try_read()
-proton::connection_engine::try_write()
-proton::connection_engine::~connection_engine()
+proton::connection_driver::can_read() const
+proton::connection_driver::can_write() const
+proton::connection_driver::closed() const
+proton::connection_driver::connection() const
+proton::connection_driver::connection_driver(proton::handler&, proton::connection_options const&)
+proton::connection_driver::container::container(std::string const&)
+proton::connection_driver::container::id() const
+proton::connection_driver::container::make_options()
+proton::connection_driver::container::options(proton::connection_options const&)
+proton::connection_driver::container::~container()
+proton::connection_driver::dispatch()
+proton::connection_driver::io_error::io_error(std::string const&)
+proton::connection_driver::io_error::~io_error()
+proton::connection_driver::no_opts
+proton::connection_driver::process(int)
+proton::connection_driver::try_read()
+proton::connection_driver::try_write()
+proton::connection_driver::~connection_driver()
 
 proton::connection_options::connection_options()
 proton::connection_options::connection_options(proton::connection_options const&)
@@ -587,8 +587,8 @@ proton::value::value(proton::value const&)
 # Only types with the following info can be thrown across shared abject boundary
 # Or correctly dynamically cast by user
 typeinfo for proton::connection
-typeinfo for proton::connection_engine
-typeinfo for proton::connection_engine::io_error
+typeinfo for proton::connection_driver
+typeinfo for proton::connection_driver::io_error
 typeinfo for proton::conversion_error
 typeinfo for proton::endpoint
 typeinfo for proton::error
@@ -600,8 +600,8 @@ typeinfo for proton::session
 typeinfo for proton::timeout_error
 typeinfo for proton::url_error
 typeinfo name for proton::connection
-typeinfo name for proton::connection_engine
-typeinfo name for proton::connection_engine::io_error
+typeinfo name for proton::connection_driver
+typeinfo name for proton::connection_driver::io_error
 typeinfo name for proton::conversion_error
 typeinfo name for proton::endpoint
 typeinfo name for proton::error
@@ -613,8 +613,8 @@ typeinfo name for proton::session
 typeinfo name for proton::timeout_error
 typeinfo name for proton::url_error
 vtable for proton::connection
-vtable for proton::connection_engine
-vtable for proton::connection_engine::io_error
+vtable for proton::connection_driver
+vtable for proton::connection_driver::io_error
 vtable for proton::conversion_error
 vtable for proton::endpoint
 vtable for proton::error


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message