qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jr...@apache.org
Subject [1/2] qpid-proton git commit: PROTON-1585: Remove old reactor and messenger examples; promote the proactor examples to the top level
Date Mon, 11 Sep 2017 23:18:16 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/master 6888ab5e5 -> 564e0ca4c


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c
deleted file mode 100644
index 6fd74a5..0000000
--- a/examples/c/proactor/receive.c
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <proton/connection.h>
-#include <proton/condition.h>
-#include <proton/delivery.h>
-#include <proton/link.h>
-#include <proton/message.h>
-#include <proton/proactor.h>
-#include <proton/session.h>
-#include <proton/transport.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-
-typedef struct app_data_t {
-  const char *host, *port;
-  const char *amqp_address;
-  const char *container_id;
-  int message_count;
-
-  pn_proactor_t *proactor;
-  int received;
-  bool finished;
-} app_data_t;
-
-static const int BATCH = 1000; /* Batch size for unlimited receive */
-
-static int exit_code = 0;
-
-static void check_condition(pn_event_t *e, pn_condition_t *cond) {
-  if (pn_condition_is_set(cond)) {
-    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
-            pn_condition_get_name(cond), pn_condition_get_description(cond));
-    pn_connection_close(pn_event_connection(e));
-    exit_code = 1;
-  }
-}
-
-#define MAX_SIZE 1024
-
-static void decode_message(pn_delivery_t *dlv) {
-  static char buffer[MAX_SIZE];
-  ssize_t len;
-  // try to decode the message body
-  if (pn_delivery_pending(dlv) < MAX_SIZE) {
-    // read in the raw data
-    len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
-    if (len > 0) {
-      // decode it into a proton message
-      pn_message_t *m = pn_message();
-      if (PN_OK == pn_message_decode(m, buffer, len)) {
-        pn_string_t *s = pn_string(NULL);
-        pn_inspect(pn_message_body(m), s);
-        printf("%s\n", pn_string_get(s));
-        pn_free(s);
-      }
-      pn_message_free(m);
-    }
-  }
-}
-
-/* Return true to continue, false to exit */
-static bool handle(app_data_t* app, pn_event_t* event) {
-  switch (pn_event_type(event)) {
-
-   case PN_CONNECTION_INIT: {
-     pn_connection_t* c = pn_event_connection(event);
-     pn_connection_set_container(c, app->container_id);
-     pn_connection_open(c);
-     pn_session_t* s = pn_session(c);
-     pn_session_open(s);
-     pn_link_t* l = pn_receiver(s, "my_receiver");
-     pn_terminus_set_address(pn_link_source(l), app->amqp_address);
-     pn_link_open(l);
-     /* cannot receive without granting credit: */
-     pn_link_flow(l, app->message_count ? app->message_count : BATCH);
-   } break;
-
-   case PN_DELIVERY: {
-     /* A message has been received */
-     pn_link_t *link = NULL;
-     pn_delivery_t *dlv = pn_event_delivery(event);
-     if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
-       link = pn_delivery_link(dlv);
-       decode_message(dlv);
-       /* Accept the delivery */
-       pn_delivery_update(dlv, PN_ACCEPTED);
-       /* done with the delivery, move to the next and free it */
-       pn_link_advance(link);
-       pn_delivery_settle(dlv);  /* dlv is now freed */
-
-       if (app->message_count == 0) {
-         /* receive forever - see if more credit is needed */
-         if (pn_link_credit(link) < BATCH/2) {
-           /* Grant enough credit to bring it up to BATCH: */
-           pn_link_flow(link, BATCH - pn_link_credit(link));
-         }
-       } else if (++app->received >= app->message_count) {
-         /* done receiving, close the endpoints */
-         printf("%d messages received\n", app->received);
-         pn_session_t *ssn = pn_link_session(link);
-         pn_link_close(link);
-         pn_session_close(ssn);
-         pn_connection_close(pn_session_connection(ssn));
-       }
-     }
-   } break;
-
-   case PN_TRANSPORT_CLOSED:
-    check_condition(event, pn_transport_condition(pn_event_transport(event)));
-    break;
-
-   case PN_CONNECTION_REMOTE_CLOSE:
-    check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_SESSION_REMOTE_CLOSE:
-    check_condition(event, pn_session_remote_condition(pn_event_session(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_LINK_REMOTE_CLOSE:
-   case PN_LINK_REMOTE_DETACH:
-    check_condition(event, pn_link_remote_condition(pn_event_link(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_PROACTOR_INACTIVE:
-    return false;
-    break;
-
-   default:
-    break;
-  }
-    return true;
-}
-
-void run(app_data_t *app) {
-  /* Loop and handle events */
-  do {
-    pn_event_batch_t *events = pn_proactor_wait(app->proactor);
-    for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
-      if (!handle(app, e)) {
-        return;
-      }
-    }
-    pn_proactor_done(app->proactor, events);
-  } while(true);
-}
-
-int main(int argc, char **argv) {
-  struct app_data_t app = {0};
-  int i = 0;
-  app.container_id = argv[i++];   /* Should be unique */
-  app.host = (argc > 1) ? argv[i++] : "";
-  app.port = (argc > 1) ? argv[i++] : "amqp";
-  app.amqp_address = (argc > i) ? argv[i++] : "examples";
-  app.message_count = (argc > i) ? atoi(argv[i++]) : 10;
-
-  /* Create the proactor and connect */
-  app.proactor = pn_proactor();
-  char addr[PN_MAX_ADDR];
-  pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
-  pn_proactor_connect(app.proactor, pn_connection(), addr);
-  run(&app);
-  pn_proactor_free(app.proactor);
-  return exit_code;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/send.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
deleted file mode 100644
index 43da8b0..0000000
--- a/examples/c/proactor/send.c
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <proton/connection.h>
-#include <proton/condition.h>
-#include <proton/delivery.h>
-#include <proton/link.h>
-#include <proton/message.h>
-#include <proton/proactor.h>
-#include <proton/session.h>
-#include <proton/transport.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-
-typedef struct app_data_t {
-  const char *host, *port;
-  const char *amqp_address;
-  const char *container_id;
-  int message_count;
-
-  pn_proactor_t *proactor;
-  pn_rwbytes_t message_buffer;
-  int sent;
-  int acknowledged;
-} app_data_t;
-
-static int exit_code = 0;
-
-static void check_condition(pn_event_t *e, pn_condition_t *cond) {
-  if (pn_condition_is_set(cond)) {
-    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
-            pn_condition_get_name(cond), pn_condition_get_description(cond));
-    pn_connection_close(pn_event_connection(e));
-    exit_code = 1;
-  }
-}
-
-/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
-static pn_bytes_t encode_message(app_data_t* app) {
-  /* Construct a message with the map { "sequence": app.sent } */
-  pn_message_t* message = pn_message();
-  pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
-  pn_data_t* body = pn_message_body(message);
-  pn_data_put_map(body);
-  pn_data_enter(body);
-  pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
-  pn_data_put_int(body, app->sent); /* The sequence number */
-  pn_data_exit(body);
-
-  /* encode the message, expanding the encode buffer as needed */
-  if (app->message_buffer.start == NULL) {
-    static const size_t initial_size = 128;
-    app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
-  }
-  /* app->message_buffer is the total buffer space available. */
-  /* mbuf wil point at just the portion used by the encoded message */
-  pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
-  int status = 0;
-  while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
-    app->message_buffer.size *= 2;
-    app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
-    mbuf.size = app->message_buffer.size;
-  }
-  if (status != 0) {
-    fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
-    exit(1);
-  }
-  pn_message_free(message);
-  return pn_bytes(mbuf.size, mbuf.start);
-}
-
-/* Returns true to continue, false if finished */
-static bool handle(app_data_t* app, pn_event_t* event) {
-  switch (pn_event_type(event)) {
-
-   case PN_CONNECTION_INIT: {
-     pn_connection_t* c = pn_event_connection(event);
-     pn_connection_set_container(c, app->container_id);
-     pn_connection_open(c);
-     pn_session_t* s = pn_session(pn_event_connection(event));
-     pn_session_open(s);
-     pn_link_t* l = pn_sender(s, "my_sender");
-     pn_terminus_set_address(pn_link_target(l), app->amqp_address);
-     pn_link_open(l);
-     break;
-   }
-
-   case PN_LINK_FLOW: {
-     /* The peer has given us some credit, now we can send messages */
-     pn_link_t *sender = pn_event_link(event);
-     while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
-       ++app->sent;
-       // Use sent counter as unique delivery tag.
-       pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
-       pn_bytes_t msgbuf = encode_message(app);
-       pn_link_send(sender, msgbuf.start, msgbuf.size);
-       pn_link_advance(sender);
-     }
-     break;
-   }
-
-   case PN_DELIVERY: {
-     /* We received acknowledgedment from the peer that a message was delivered. */
-     pn_delivery_t* d = pn_event_delivery(event);
-     if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
-       if (++app->acknowledged == app->message_count) {
-         printf("%d messages sent and acknowledged\n", app->acknowledged);
-         pn_connection_close(pn_event_connection(event));
-         /* Continue handling events till we receive TRANSPORT_CLOSED */
-       }
-     } else {
-       fprintf(stderr, "unexpected delivery state %d\n", (int)pn_delivery_remote_state(d));
-       pn_connection_close(pn_event_connection(event));
-       exit_code=1;
-     }
-     break;
-   }
-
-   case PN_TRANSPORT_CLOSED:
-    check_condition(event, pn_transport_condition(pn_event_transport(event)));
-    break;
-
-   case PN_CONNECTION_REMOTE_CLOSE:
-    check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_SESSION_REMOTE_CLOSE:
-    check_condition(event, pn_session_remote_condition(pn_event_session(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_LINK_REMOTE_CLOSE:
-   case PN_LINK_REMOTE_DETACH:
-    check_condition(event, pn_link_remote_condition(pn_event_link(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_PROACTOR_INACTIVE:
-    return false;
-
-   default: break;
-  }
-  return true;
-}
-
-void run(app_data_t *app) {
-  /* Loop and handle events */
-  do {
-    pn_event_batch_t *events = pn_proactor_wait(app->proactor);
-    for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
-      if (!handle(app, e)) {
-        return;
-      }
-    }
-    pn_proactor_done(app->proactor, events);
-  } while(true);
-}
-
-int main(int argc, char **argv) {
-  struct app_data_t app = {0};
-  int i = 0;
-  app.container_id = argv[i++];   /* Should be unique */
-  app.host = (argc > 1) ? argv[i++] : "";
-  app.port = (argc > 1) ? argv[i++] : "amqp";
-  app.amqp_address = (argc > i) ? argv[i++] : "examples";
-  app.message_count = (argc > i) ? atoi(argv[i++]) : 10;
-
-  app.proactor = pn_proactor();
-  char addr[PN_MAX_ADDR];
-  pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
-  pn_proactor_connect(app.proactor, pn_connection(), addr);
-  run(&app);
-  pn_proactor_free(app.proactor);
-  free(app.message_buffer.start);
-  return exit_code;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/thread.h
----------------------------------------------------------------------
diff --git a/examples/c/proactor/thread.h b/examples/c/proactor/thread.h
deleted file mode 100644
index 3b9f19e..0000000
--- a/examples/c/proactor/thread.h
+++ /dev/null
@@ -1,49 +0,0 @@
-#ifndef _PROTON_EXAMPLES_C_PROACTOR_THREADS_H
-#define _PROTON_EXAMPLES_C_PROACTOR_THREADS_H
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/* EXAMPLE USE ONLY. Simulate the subset of POSIX threads used by examples for windows */
-
-#ifdef _WIN32
-
-#include <windows.h>
-#include <time.h>
-#define _WIN32_WINNT 0x500 /* WINBASE.H - Enable SignalObjectAndWait */
-#include <process.h>
-#include <windows.h>
-
-#define pthread_function DWORD WINAPI
-#define pthread_function_return DWORD
-#define pthread_t HANDLE
-#define pthread_create(thhandle,attr,thfunc,tharg) (int)((*thhandle=(HANDLE)_beginthreadex(NULL,0,(DWORD WINAPI(*)())thfunc,tharg,0,NULL))==NULL)
-#define pthread_join(thread, result) ((WaitForSingleObject((thread),INFINITE)!=WAIT_OBJECT_0) || !CloseHandle(thread))
-#define pthread_mutex_T HANDLE
-#define pthread_mutex_init(pobject,pattr) (*pobject=CreateMutex(NULL,FALSE,NULL))
-#define pthread_mutex_destroy(pobject) CloseHandle(*pobject)
-#define pthread_mutex_lock(pobject) WaitForSingleObject(*pobject,INFINITE)
-#define pthread_mutex_unlock(pobject) ReleaseMutex(*pobject)
-
-#else
-
-#include <pthread.h>
-
-#endif
-
-#endif

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/reactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/reactor/CMakeLists.txt b/examples/c/reactor/CMakeLists.txt
deleted file mode 100644
index bd6163f..0000000
--- a/examples/c/reactor/CMakeLists.txt
+++ /dev/null
@@ -1,45 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-find_package(Proton REQUIRED)
-
-set (reactor-examples
-  sender.c
-  receiver.c
-  )
-
-set_source_files_properties (
-  ${reactor-examples}
-  PROPERTIES
-  COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${COMPILE_LANGUAGE_FLAGS} ${LINK_TIME_OPTIMIZATION}"
-  )
-
-if (BUILD_WITH_CXX)
-  set_source_files_properties (
-    ${reactor-examples}
-    PROPERTIES LANGUAGE CXX
-    )
-endif (BUILD_WITH_CXX)
-
-include_directories(${Proton_INCLUDE_DIRS})
-add_executable(sender sender.c)
-add_executable(receiver receiver.c)
-target_link_libraries(sender ${Proton_LIBRARIES})
-target_link_libraries(receiver ${Proton_LIBRARIES})
-

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/reactor/README
----------------------------------------------------------------------
diff --git a/examples/c/reactor/README b/examples/c/reactor/README
deleted file mode 100644
index 8d61893..0000000
--- a/examples/c/reactor/README
+++ /dev/null
@@ -1,30 +0,0 @@
-These example clients require a broker or similar intermediary that
-supports the AMQP 1.0 protocol, allows anonymous connections and
-accepts links to and from a node named 'examples'.
-
-------------------------------------------------------------------
-
-sender.c
-
-A simple message sending client.  This example sends all messages but
-the last as pre-settled (no ack required).  It then pends waiting for
-an ack for the last message sent before exiting.
-
-Use the '-h' command line option for a list of supported parameters.
-
-------------------------------------------------------------------
-
-receiver.c
-
-A simple message consuming client.  This example receives messages
-from a target (default 'examples').  Received messages are
-acknowledged if they are sent un-settled.  The client will try to
-decode the message payload assuming it has been generated by the
-sender example.
-
-Use the '-h' command line option for a list of supported parameters.
-
-
-
-
-

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/reactor/receiver.c
----------------------------------------------------------------------
diff --git a/examples/c/reactor/receiver.c b/examples/c/reactor/receiver.c
deleted file mode 100644
index e72a6d9..0000000
--- a/examples/c/reactor/receiver.c
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-
-#include "pncompat/misc_funcs.inc"
-
-#include "proton/reactor.h"
-#include "proton/message.h"
-#include "proton/connection.h"
-#include "proton/session.h"
-#include "proton/link.h"
-#include "proton/delivery.h"
-#include "proton/event.h"
-#include "proton/handlers.h"
-#include "proton/transport.h"
-#include "proton/url.h"
-
-static int quiet = 0;
-
-// Credit batch if unlimited receive (-c 0)
-static const int CAPACITY = 100;
-#define MAX_SIZE 512
-
-// Example application data.  This data will be instantiated in the event
-// handler, and is available during event processing.  In this example it
-// holds configuration and state information.
-//
-typedef struct {
-    int count;          // # of messages to receive before exiting
-    const char *source;     // name of the source node to receive from
-    pn_message_t *message;  // holds the received message
-} app_data_t;
-
-// helper to pull pointer to app_data_t instance out of the pn_handler_t
-//
-#define GET_APP_DATA(handler) ((app_data_t *)pn_handler_mem(handler))
-
-// Called when reactor exits to clean up app_data
-//
-static void delete_handler(pn_handler_t *handler)
-{
-    app_data_t *d = GET_APP_DATA(handler);
-    if (d->message) {
-        pn_decref(d->message);
-        d->message = NULL;
-    }
-}
-
-
-/* Process each event posted by the reactor.
- */
-static void event_handler(pn_handler_t *handler,
-                          pn_event_t *event,
-                          pn_event_type_t type)
-{
-    app_data_t *data = GET_APP_DATA(handler);
-
-    switch (type) {
-
-    case PN_CONNECTION_INIT: {
-        // Create and open all the endpoints needed to send a message
-        //
-        pn_connection_t *conn;
-        pn_session_t *ssn;
-        pn_link_t *receiver;
-
-        conn = pn_event_connection(event);
-        pn_connection_open(conn);
-        ssn = pn_session(conn);
-        pn_session_open(ssn);
-        receiver = pn_receiver(ssn, "MyReceiver");
-        pn_terminus_set_address(pn_link_source(receiver), data->source);
-        pn_link_open(receiver);
-        // cannot receive without granting credit:
-        pn_link_flow(receiver, data->count ? data->count : CAPACITY);
-    } break;
-
-    case PN_DELIVERY: {
-        // A message has been received
-        //
-        pn_link_t *link = NULL;
-        pn_delivery_t *dlv = pn_event_delivery(event);
-        if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
-            // A full message has arrived
-            if (!quiet) {
-                ssize_t len;
-                pn_bytes_t bytes;
-                bool found = false;
-
-                // try to decode the message body
-                if (pn_delivery_pending(dlv) < MAX_SIZE) {
-                    static char buffer[MAX_SIZE];
-                    // read in the raw data
-                    len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
-                    if (len > 0) {
-                        // decode it into a proton message
-                        pn_message_clear(data->message);
-                        if (PN_OK == pn_message_decode(data->message, buffer,
-                                                       len)) {
-                            // Assuming the message came from the sender
-                            // example, try to parse out a single string from
-                            // the payload
-                            //
-                            pn_data_scan(pn_message_body(data->message), "?S",
-                                         &found, &bytes);
-                        }
-                    }
-                }
-                if (found) {
-                    fprintf(stdout, "Message: [%.*s]\n", (int)bytes.size,
-                            bytes.start);
-                } else {
-                    fprintf(stdout, "Message received!\n");
-                }
-            }
-
-            link = pn_delivery_link(dlv);
-
-            if (!pn_delivery_settled(dlv)) {
-                // remote has not settled, so it is tracking the delivery.  Ack
-                // it.
-                pn_delivery_update(dlv, PN_ACCEPTED);
-            }
-
-            // done with the delivery, move to the next and free it
-            pn_link_advance(link);
-            pn_delivery_settle(dlv);  // dlv is now freed
-
-            if (data->count == 0) {
-                // send forever - see if more credit is needed
-                if (pn_link_credit(link) < CAPACITY/2) {
-                    // Grant enough credit to bring it up to CAPACITY:
-                    pn_link_flow(link, CAPACITY - pn_link_credit(link));
-                }
-            } else if (--data->count == 0) {
-                // done receiving, close the endpoints
-                pn_session_t *ssn = pn_link_session(link);
-				pn_link_close(link);
-                pn_session_close(ssn);
-                pn_connection_close(pn_session_connection(ssn));
-            }
-        }
-    } break;
-
-    case PN_TRANSPORT_ERROR: {
-        // The connection to the peer failed.
-        //
-        pn_transport_t *tport = pn_event_transport(event);
-        pn_condition_t *cond = pn_transport_condition(tport);
-        fprintf(stderr, "Network transport failed!\n");
-        if (pn_condition_is_set(cond)) {
-            const char *name = pn_condition_get_name(cond);
-            const char *desc = pn_condition_get_description(cond);
-            fprintf(stderr, "    Error: %s  Description: %s\n",
-                    (name) ? name : "<error name not provided>",
-                    (desc) ? desc : "<no description provided>");
-        }
-        // pn_reactor_process() will exit with a false return value, stopping
-        // the main loop.
-    } break;
-
-    default:
-        break;
-    }
-}
-
-static void usage(void)
-{
-  printf("Usage: receiver <options>\n");
-  printf("-a      \tThe host address [localhost:5672]\n");
-  printf("-c      \t# of messages to receive, 0=receive forever [1]\n");
-  printf("-s      \tSource address [examples]\n");
-  printf("-i      \tContainer name [ReceiveExample]\n");
-  printf("-q      \tQuiet - turn off stdout\n");
-  exit(1);
-}
-
-int main(int argc, char** argv)
-{
-    const char *address = "localhost";
-    const char *container = "ReceiveExample";
-    int c;
-    pn_reactor_t *reactor = NULL;
-    pn_url_t *url = NULL;
-    pn_connection_t *conn = NULL;
-
-    /* create a handler for the connection's events.
-     * event_handler will be called for each event.  The handler will allocate
-     * a app_data_t instance which can be accessed when the event_handler is
-     * called.
-     */
-    pn_handler_t *handler = pn_handler_new(event_handler,
-                                           sizeof(app_data_t),
-                                           delete_handler);
-
-    /* set up the application data with defaults */
-    app_data_t *app_data = GET_APP_DATA(handler);
-    memset(app_data, 0, sizeof(app_data_t));
-    app_data->count = 1;
-    app_data->source = "examples";
-    app_data->message = pn_message();
-
-    /* Attach the pn_handshaker() handler.  This handler deals with endpoint
-     * events from the peer so we don't have to.
-     */
-    {
-        pn_handler_t *handshaker = pn_handshaker();
-        pn_handler_add(handler, handshaker);
-        pn_decref(handshaker);
-    }
-
-    /* command line options */
-    opterr = 0;
-    while((c = getopt(argc, argv, "i:a:c:s:qh")) != -1) {
-        switch(c) {
-        case 'h': usage(); break;
-        case 'a': address = optarg; break;
-        case 'c':
-            app_data->count = atoi(optarg);
-            if (app_data->count < 0) usage();
-            break;
-        case 's': app_data->source = optarg; break;
-        case 'i': container = optarg; break;
-        case 'q': quiet = 1; break;
-        default:
-            usage();
-            break;
-        }
-    }
-
-    reactor = pn_reactor();
-
-    url = pn_url_parse(address);
-    if (url == NULL) {
-        fprintf(stderr, "Invalid host address %s\n", address);
-        exit(1);
-    }
-    conn = pn_reactor_connection_to_host(reactor,
-                                         pn_url_get_host(url),
-                                         pn_url_get_port(url),
-                                         handler);
-    pn_decref(url);
-    pn_decref(handler);
-
-    // the container name should be unique for each client
-    pn_connection_set_container(conn, container);
-
-    // wait up to 5 seconds for activity before returning from
-    // pn_reactor_process()
-    pn_reactor_set_timeout(reactor, 5000);
-
-    pn_reactor_start(reactor);
-
-    while (pn_reactor_process(reactor)) {
-        /* Returns 'true' until the connection is shut down.
-         * pn_reactor_process() will return true at least once every 5 seconds
-         * (due to the timeout).  If no timeout was configured,
-         * pn_reactor_process() returns as soon as it finishes processing all
-         * pending I/O and events. Once the connection has closed,
-         * pn_reactor_process() will return false.
-         */
-    }
-    pn_decref(reactor);
-
-    return 0;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/reactor/sender.c
----------------------------------------------------------------------
diff --git a/examples/c/reactor/sender.c b/examples/c/reactor/sender.c
deleted file mode 100644
index 6c3cdb3..0000000
--- a/examples/c/reactor/sender.c
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-
-#include "pncompat/misc_funcs.inc"
-
-#include "proton/reactor.h"
-#include "proton/message.h"
-#include "proton/connection.h"
-#include "proton/session.h"
-#include "proton/link.h"
-#include "proton/delivery.h"
-#include "proton/event.h"
-#include "proton/handlers.h"
-#include "proton/transport.h"
-#include "proton/url.h"
-
-
-static int quiet = 0;
-
-// Example application data.  This data will be instantiated in the event
-// handler, and is available during event processing.  In this example it
-// holds configuration and state information.
-//
-typedef struct {
-    int count;           // # messages to send
-    int anon;            // use anonymous link if true
-    const char *target;  // name of destination target
-    char *msg_data;      // pre-encoded outbound message
-    size_t msg_len;      // bytes in msg_data
-} app_data_t;
-
-// helper to pull pointer to app_data_t instance out of the pn_handler_t
-//
-#define GET_APP_DATA(handler) ((app_data_t *)pn_handler_mem(handler))
-
-// Called when reactor exits to clean up app_data
-//
-static void delete_handler(pn_handler_t *handler)
-{
-    app_data_t *d = GET_APP_DATA(handler);
-    if (d->msg_data) {
-        free(d->msg_data);
-        d->msg_data = NULL;
-    }
-}
-
-/* Process each event posted by the reactor.
- */
-static void event_handler(pn_handler_t *handler,
-                          pn_event_t *event,
-                          pn_event_type_t type)
-{
-    app_data_t *data = GET_APP_DATA(handler);
-
-    switch (type) {
-
-    case PN_CONNECTION_INIT: {
-        // Create and open all the endpoints needed to send a message
-        //
-        pn_connection_t *conn;
-        pn_session_t *ssn;
-        pn_link_t *sender;
-
-        conn = pn_event_connection(event);
-        pn_connection_open(conn);
-        ssn = pn_session(conn);
-        pn_session_open(ssn);
-        sender = pn_sender(ssn, "MySender");
-        // we do not wait for ack until the last message
-        pn_link_set_snd_settle_mode(sender, PN_SND_MIXED);
-        if (!data->anon) {
-            pn_terminus_set_address(pn_link_target(sender), data->target);
-        }
-        pn_link_open(sender);
-    } break;
-
-    case PN_LINK_FLOW: {
-        // the remote has given us some credit, now we can send messages
-        //
-        static long tag = 0;  // a simple tag generator
-        pn_delivery_t *delivery;
-        pn_link_t *sender = pn_event_link(event);
-        int credit = pn_link_credit(sender);
-        while (credit > 0 && data->count > 0) {
-            --credit;
-            --data->count;
-            ++tag;
-            delivery = pn_delivery(sender,
-                                   pn_dtag((const char *)&tag, sizeof(tag)));
-            pn_link_send(sender, data->msg_data, data->msg_len);
-            pn_link_advance(sender);
-            if (data->count > 0) {
-                // send pre-settled until the last one, then wait for an ack on
-                // the last sent message. This allows the sender to send
-                // messages as fast as possible and then exit when the consumer
-                // has dealt with the last one.
-                //
-                pn_delivery_settle(delivery);
-            }
-        }
-    } break;
-
-    case PN_DELIVERY: {
-        // Since the example sends all messages but the last pre-settled
-        // (pre-acked), only the last message's delivery will get updated with
-        // the remote state (acked/nacked).
-        //
-        pn_delivery_t *dlv = pn_event_delivery(event);
-        if (pn_delivery_updated(dlv) && pn_delivery_remote_state(dlv)) {
-            uint64_t rs = pn_delivery_remote_state(dlv);
-            int done = 1;
-            switch (rs) {
-            case PN_RECEIVED:
-                // This is not a terminal state - it is informational, and the
-                // peer is still processing the message.
-                done = 0;
-                break;
-            case PN_ACCEPTED:
-                pn_delivery_settle(dlv);
-                if (!quiet) fprintf(stdout, "Send complete!\n");
-                break;
-            case PN_REJECTED:
-            case PN_RELEASED:
-            case PN_MODIFIED:
-                pn_delivery_settle(dlv);
-                fprintf(stderr, "Message not accepted - code:%lu\n", (unsigned long)rs);
-                break;
-            default:
-                // ??? no other terminal states defined, so ignore anything else
-                pn_delivery_settle(dlv);
-                fprintf(stderr, "Unknown delivery failure - code=%lu\n", (unsigned long)rs);
-                break;
-            }
-
-            if (done) {
-                // initiate clean shutdown of the endpoints
-                pn_link_t *link = pn_delivery_link(dlv);
-                pn_session_t *ssn = pn_link_session(link);
-                pn_link_close(link);
-                pn_session_close(ssn);
-                pn_connection_close(pn_session_connection(ssn));
-            }
-        }
-    } break;
-
-    case PN_TRANSPORT_ERROR: {
-        // The connection to the peer failed.
-        //
-        pn_transport_t *tport = pn_event_transport(event);
-        pn_condition_t *cond = pn_transport_condition(tport);
-        fprintf(stderr, "Network transport failed!\n");
-        if (pn_condition_is_set(cond)) {
-            const char *name = pn_condition_get_name(cond);
-            const char *desc = pn_condition_get_description(cond);
-            fprintf(stderr, "    Error: %s  Description: %s\n",
-                    (name) ? name : "<error name not provided>",
-                    (desc) ? desc : "<no description provided>");
-        }
-        // pn_reactor_process() will exit with a false return value, stopping
-        // the main loop.
-    } break;
-
-    default:
-        break;
-    }
-}
-
-static void usage(void)
-{
-  printf("Usage: send <options> <message>\n");
-  printf("-a      \tThe host address [localhost:5672]\n");
-  printf("-c      \t# of messages to send [1]\n");
-  printf("-t      \tTarget address [examples]\n");
-  printf("-n      \tUse an anonymous link [off]\n");
-  printf("-i      \tContainer name [SendExample]\n");
-  printf("-q      \tQuiet - turn off stdout\n");
-  printf("message \tA text string to send.\n");
-  exit(1);
-}
-
-int main(int argc, char** argv)
-{
-    const char *address = "localhost";
-    const char *msgtext = "Hello World!";
-    const char *container = "SendExample";
-    int c;
-    pn_message_t *message = NULL;
-    pn_data_t *body = NULL;
-    pn_reactor_t *reactor = NULL;
-    pn_url_t *url = NULL;
-    pn_connection_t *conn = NULL;
-
-    /* Create a handler for the connection's events.  event_handler() will be
-     * called for each event and delete_handler will be called when the
-     * connection is released.  The handler will allocate an app_data_t
-     * instance which can be accessed when the event_handler is called.
-     */
-    pn_handler_t *handler = pn_handler_new(event_handler,
-                                           sizeof(app_data_t),
-                                           delete_handler);
-
-    /* set up the application data with defaults */
-    app_data_t *app_data = GET_APP_DATA(handler);
-    memset(app_data, 0, sizeof(app_data_t));
-    app_data->count = 1;
-    app_data->target = "examples";
-
-    /* Attach the pn_handshaker() handler.  This handler deals with endpoint
-     * events from the peer so we don't have to.
-     */
-    {
-        pn_handler_t *handshaker = pn_handshaker();
-        pn_handler_add(handler, handshaker);
-        pn_decref(handshaker);
-    }
-
-    /* command line options */
-    opterr = 0;
-    while((c = getopt(argc, argv, "i:a:c:t:nhq")) != -1) {
-        switch(c) {
-        case 'h': usage(); break;
-        case 'a': address = optarg; break;
-        case 'c':
-            app_data->count = atoi(optarg);
-            if (app_data->count < 1) usage();
-            break;
-        case 't': app_data->target = optarg; break;
-        case 'n': app_data->anon = 1; break;
-        case 'i': container = optarg; break;
-        case 'q': quiet = 1; break;
-        default:
-            usage();
-            break;
-        }
-    }
-    if (optind < argc) msgtext = argv[optind];
-
-
-    // create a single message and pre-encode it so we only have to do that
-    // once.  All transmits will use the same pre-encoded message simply for
-    // speed.
-    //
-    message = pn_message();
-    pn_message_set_address(message, app_data->target);
-    body = pn_message_body(message);
-    pn_data_clear(body);
-
-    // This message's body contains a single string
-    if (pn_data_fill(body, "S", msgtext)) {
-        fprintf(stderr, "Error building message!\n");
-        exit(1);
-    }
-    pn_data_rewind(body);
-    {
-        // encode the message, expanding the encode buffer as needed
-        //
-        size_t len = 128;
-        char *buf = (char *)malloc(len);
-        int rc = 0;
-        do {
-            rc = pn_message_encode(message, buf, &len);
-            if (rc == PN_OVERFLOW) {
-                free(buf);
-                len *= 2;
-                buf = (char *)malloc(len);
-            }
-        } while (rc == PN_OVERFLOW);
-        app_data->msg_len = len;
-        app_data->msg_data = buf;
-    }
-    pn_decref(message);   // message no longer needed
-
-    reactor = pn_reactor();
-
-    url = pn_url_parse(address);
-    if (url == NULL) {
-        fprintf(stderr, "Invalid host address %s\n", address);
-        exit(1);
-    }
-    conn = pn_reactor_connection_to_host(reactor,
-                                         pn_url_get_host(url),
-                                         pn_url_get_port(url),
-                                         handler);
-    pn_decref(url);
-    pn_decref(handler);
-
-    // the container name should be unique for each client
-    pn_connection_set_container(conn, container);
-
-    // wait up to 5 seconds for activity before returning from
-    // pn_reactor_process()
-    pn_reactor_set_timeout(reactor, 5000);
-
-    pn_reactor_start(reactor);
-
-    while (pn_reactor_process(reactor)) {
-        /* Returns 'true' until the connection is shut down.
-         * pn_reactor_process() will return true at least once every 5 seconds
-         * (due to the timeout).  If no timeout was configured,
-         * pn_reactor_process() returns as soon as it finishes processing all
-         * pending I/O and events. Once the connection has closed,
-         * pn_reactor_process() will return false.
-         */
-    }
-    pn_decref(reactor);
-
-    return 0;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/receive.c b/examples/c/receive.c
new file mode 100644
index 0000000..6fd74a5
--- /dev/null
+++ b/examples/c/receive.c
@@ -0,0 +1,188 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.h>
+#include <proton/condition.h>
+#include <proton/delivery.h>
+#include <proton/link.h>
+#include <proton/message.h>
+#include <proton/proactor.h>
+#include <proton/session.h>
+#include <proton/transport.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+
+typedef struct app_data_t {
+  const char *host, *port;
+  const char *amqp_address;
+  const char *container_id;
+  int message_count;
+
+  pn_proactor_t *proactor;
+  int received;
+  bool finished;
+} app_data_t;
+
+static const int BATCH = 1000; /* Batch size for unlimited receive */
+
+static int exit_code = 0;
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond) {
+  if (pn_condition_is_set(cond)) {
+    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+            pn_condition_get_name(cond), pn_condition_get_description(cond));
+    pn_connection_close(pn_event_connection(e));
+    exit_code = 1;
+  }
+}
+
+#define MAX_SIZE 1024
+
+static void decode_message(pn_delivery_t *dlv) {
+  static char buffer[MAX_SIZE];
+  ssize_t len;
+  // try to decode the message body
+  if (pn_delivery_pending(dlv) < MAX_SIZE) {
+    // read in the raw data
+    len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
+    if (len > 0) {
+      // decode it into a proton message
+      pn_message_t *m = pn_message();
+      if (PN_OK == pn_message_decode(m, buffer, len)) {
+        pn_string_t *s = pn_string(NULL);
+        pn_inspect(pn_message_body(m), s);
+        printf("%s\n", pn_string_get(s));
+        pn_free(s);
+      }
+      pn_message_free(m);
+    }
+  }
+}
+
+/* Return true to continue, false to exit */
+static bool handle(app_data_t* app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+   case PN_CONNECTION_INIT: {
+     pn_connection_t* c = pn_event_connection(event);
+     pn_connection_set_container(c, app->container_id);
+     pn_connection_open(c);
+     pn_session_t* s = pn_session(c);
+     pn_session_open(s);
+     pn_link_t* l = pn_receiver(s, "my_receiver");
+     pn_terminus_set_address(pn_link_source(l), app->amqp_address);
+     pn_link_open(l);
+     /* cannot receive without granting credit: */
+     pn_link_flow(l, app->message_count ? app->message_count : BATCH);
+   } break;
+
+   case PN_DELIVERY: {
+     /* A message has been received */
+     pn_link_t *link = NULL;
+     pn_delivery_t *dlv = pn_event_delivery(event);
+     if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
+       link = pn_delivery_link(dlv);
+       decode_message(dlv);
+       /* Accept the delivery */
+       pn_delivery_update(dlv, PN_ACCEPTED);
+       /* done with the delivery, move to the next and free it */
+       pn_link_advance(link);
+       pn_delivery_settle(dlv);  /* dlv is now freed */
+
+       if (app->message_count == 0) {
+         /* receive forever - see if more credit is needed */
+         if (pn_link_credit(link) < BATCH/2) {
+           /* Grant enough credit to bring it up to BATCH: */
+           pn_link_flow(link, BATCH - pn_link_credit(link));
+         }
+       } else if (++app->received >= app->message_count) {
+         /* done receiving, close the endpoints */
+         printf("%d messages received\n", app->received);
+         pn_session_t *ssn = pn_link_session(link);
+         pn_link_close(link);
+         pn_session_close(ssn);
+         pn_connection_close(pn_session_connection(ssn));
+       }
+     }
+   } break;
+
+   case PN_TRANSPORT_CLOSED:
+    check_condition(event, pn_transport_condition(pn_event_transport(event)));
+    break;
+
+   case PN_CONNECTION_REMOTE_CLOSE:
+    check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_SESSION_REMOTE_CLOSE:
+    check_condition(event, pn_session_remote_condition(pn_event_session(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_LINK_REMOTE_CLOSE:
+   case PN_LINK_REMOTE_DETACH:
+    check_condition(event, pn_link_remote_condition(pn_event_link(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_PROACTOR_INACTIVE:
+    return false;
+    break;
+
+   default:
+    break;
+  }
+    return true;
+}
+
+void run(app_data_t *app) {
+  /* Loop and handle events */
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(app->proactor);
+    for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+      if (!handle(app, e)) {
+        return;
+      }
+    }
+    pn_proactor_done(app->proactor, events);
+  } while(true);
+}
+
+int main(int argc, char **argv) {
+  struct app_data_t app = {0};
+  int i = 0;
+  app.container_id = argv[i++];   /* Should be unique */
+  app.host = (argc > 1) ? argv[i++] : "";
+  app.port = (argc > 1) ? argv[i++] : "amqp";
+  app.amqp_address = (argc > i) ? argv[i++] : "examples";
+  app.message_count = (argc > i) ? atoi(argv[i++]) : 10;
+
+  /* Create the proactor and connect */
+  app.proactor = pn_proactor();
+  char addr[PN_MAX_ADDR];
+  pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
+  pn_proactor_connect(app.proactor, pn_connection(), addr);
+  run(&app);
+  pn_proactor_free(app.proactor);
+  return exit_code;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/send.c
----------------------------------------------------------------------
diff --git a/examples/c/send.c b/examples/c/send.c
new file mode 100644
index 0000000..43da8b0
--- /dev/null
+++ b/examples/c/send.c
@@ -0,0 +1,196 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.h>
+#include <proton/condition.h>
+#include <proton/delivery.h>
+#include <proton/link.h>
+#include <proton/message.h>
+#include <proton/proactor.h>
+#include <proton/session.h>
+#include <proton/transport.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+
+typedef struct app_data_t {
+  const char *host, *port;
+  const char *amqp_address;
+  const char *container_id;
+  int message_count;
+
+  pn_proactor_t *proactor;
+  pn_rwbytes_t message_buffer;
+  int sent;
+  int acknowledged;
+} app_data_t;
+
+static int exit_code = 0;
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond) {
+  if (pn_condition_is_set(cond)) {
+    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+            pn_condition_get_name(cond), pn_condition_get_description(cond));
+    pn_connection_close(pn_event_connection(e));
+    exit_code = 1;
+  }
+}
+
+/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
+static pn_bytes_t encode_message(app_data_t* app) {
+  /* Construct a message with the map { "sequence": app.sent } */
+  pn_message_t* message = pn_message();
+  pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
+  pn_data_t* body = pn_message_body(message);
+  pn_data_put_map(body);
+  pn_data_enter(body);
+  pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
+  pn_data_put_int(body, app->sent); /* The sequence number */
+  pn_data_exit(body);
+
+  /* encode the message, expanding the encode buffer as needed */
+  if (app->message_buffer.start == NULL) {
+    static const size_t initial_size = 128;
+    app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
+  }
+  /* app->message_buffer is the total buffer space available. */
+  /* mbuf wil point at just the portion used by the encoded message */
+  pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
+  int status = 0;
+  while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
+    app->message_buffer.size *= 2;
+    app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
+    mbuf.size = app->message_buffer.size;
+  }
+  if (status != 0) {
+    fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
+    exit(1);
+  }
+  pn_message_free(message);
+  return pn_bytes(mbuf.size, mbuf.start);
+}
+
+/* Returns true to continue, false if finished */
+static bool handle(app_data_t* app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+   case PN_CONNECTION_INIT: {
+     pn_connection_t* c = pn_event_connection(event);
+     pn_connection_set_container(c, app->container_id);
+     pn_connection_open(c);
+     pn_session_t* s = pn_session(pn_event_connection(event));
+     pn_session_open(s);
+     pn_link_t* l = pn_sender(s, "my_sender");
+     pn_terminus_set_address(pn_link_target(l), app->amqp_address);
+     pn_link_open(l);
+     break;
+   }
+
+   case PN_LINK_FLOW: {
+     /* The peer has given us some credit, now we can send messages */
+     pn_link_t *sender = pn_event_link(event);
+     while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
+       ++app->sent;
+       // Use sent counter as unique delivery tag.
+       pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
+       pn_bytes_t msgbuf = encode_message(app);
+       pn_link_send(sender, msgbuf.start, msgbuf.size);
+       pn_link_advance(sender);
+     }
+     break;
+   }
+
+   case PN_DELIVERY: {
+     /* We received acknowledgedment from the peer that a message was delivered. */
+     pn_delivery_t* d = pn_event_delivery(event);
+     if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
+       if (++app->acknowledged == app->message_count) {
+         printf("%d messages sent and acknowledged\n", app->acknowledged);
+         pn_connection_close(pn_event_connection(event));
+         /* Continue handling events till we receive TRANSPORT_CLOSED */
+       }
+     } else {
+       fprintf(stderr, "unexpected delivery state %d\n", (int)pn_delivery_remote_state(d));
+       pn_connection_close(pn_event_connection(event));
+       exit_code=1;
+     }
+     break;
+   }
+
+   case PN_TRANSPORT_CLOSED:
+    check_condition(event, pn_transport_condition(pn_event_transport(event)));
+    break;
+
+   case PN_CONNECTION_REMOTE_CLOSE:
+    check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_SESSION_REMOTE_CLOSE:
+    check_condition(event, pn_session_remote_condition(pn_event_session(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_LINK_REMOTE_CLOSE:
+   case PN_LINK_REMOTE_DETACH:
+    check_condition(event, pn_link_remote_condition(pn_event_link(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_PROACTOR_INACTIVE:
+    return false;
+
+   default: break;
+  }
+  return true;
+}
+
+void run(app_data_t *app) {
+  /* Loop and handle events */
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(app->proactor);
+    for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+      if (!handle(app, e)) {
+        return;
+      }
+    }
+    pn_proactor_done(app->proactor, events);
+  } while(true);
+}
+
+int main(int argc, char **argv) {
+  struct app_data_t app = {0};
+  int i = 0;
+  app.container_id = argv[i++];   /* Should be unique */
+  app.host = (argc > 1) ? argv[i++] : "";
+  app.port = (argc > 1) ? argv[i++] : "amqp";
+  app.amqp_address = (argc > i) ? argv[i++] : "examples";
+  app.message_count = (argc > i) ? atoi(argv[i++]) : 10;
+
+  app.proactor = pn_proactor();
+  char addr[PN_MAX_ADDR];
+  pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
+  pn_proactor_connect(app.proactor, pn_connection(), addr);
+  run(&app);
+  pn_proactor_free(app.proactor);
+  free(app.message_buffer.start);
+  return exit_code;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/thread.h
----------------------------------------------------------------------
diff --git a/examples/c/thread.h b/examples/c/thread.h
new file mode 100644
index 0000000..1bd5595
--- /dev/null
+++ b/examples/c/thread.h
@@ -0,0 +1,49 @@
+#ifndef _PROTON_EXAMPLES_C_THREADS_H
+#define _PROTON_EXAMPLES_C_THREADS_H 1
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* EXAMPLE USE ONLY. Simulate the subset of POSIX threads used by examples for windows */
+
+#ifdef _WIN32
+
+#include <windows.h>
+#include <time.h>
+#define _WIN32_WINNT 0x500 /* WINBASE.H - Enable SignalObjectAndWait */
+#include <process.h>
+#include <windows.h>
+
+#define pthread_function DWORD WINAPI
+#define pthread_function_return DWORD
+#define pthread_t HANDLE
+#define pthread_create(thhandle,attr,thfunc,tharg) (int)((*thhandle=(HANDLE)_beginthreadex(NULL,0,(DWORD WINAPI(*)())thfunc,tharg,0,NULL))==NULL)
+#define pthread_join(thread, result) ((WaitForSingleObject((thread),INFINITE)!=WAIT_OBJECT_0) || !CloseHandle(thread))
+#define pthread_mutex_T HANDLE
+#define pthread_mutex_init(pobject,pattr) (*pobject=CreateMutex(NULL,FALSE,NULL))
+#define pthread_mutex_destroy(pobject) CloseHandle(*pobject)
+#define pthread_mutex_lock(pobject) WaitForSingleObject(*pobject,INFINITE)
+#define pthread_mutex_unlock(pobject) ReleaseMutex(*pobject)
+
+#else
+
+#include <pthread.h>
+
+#endif
+
+#endif /* thread.h */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/proton-c/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/CMakeLists.txt b/proton-c/src/tests/CMakeLists.txt
index 50ea677..c562025 100644
--- a/proton-c/src/tests/CMakeLists.txt
+++ b/proton-c/src/tests/CMakeLists.txt
@@ -63,9 +63,9 @@ if(HAS_PROACTOR)
   endif(WIN32)
 
   if(WIN32)
-    # set(path "$<TARGET_FILE_DIR:proactor-broker>;$<TARGET_FILE_DIR:qpid-proton>")
+    # set(path "$<TARGET_FILE_DIR:c-broker>;$<TARGET_FILE_DIR:qpid-proton>")
   else(WIN32)
-    set(path "${CMAKE_CURRENT_BINARY_DIR}:${CMAKE_BINARY_DIR}/examples/c/proactor:$ENV{PATH}")
+    set(path "${CMAKE_CURRENT_BINARY_DIR}:${CMAKE_BINARY_DIR}/examples/c:$ENV{PATH}")
   endif(WIN32)
   # Add the tools directory for the 'proctest' module
   set_search_path(pypath "${CMAKE_SOURCE_DIR}/tools/py" "$ENV{PYTHON_PATH}")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message