ianh 2002/08/18 13:22:29 Modified: . CHANGES Added: misc apr_queue.c include apr_queue.h test testqueue.c Log: add a thread safe FIFO bounded buffer queue. still need to edit makefiles etc Revision Changes Path 1.74 +2 -0 apr-util/CHANGES Index: CHANGES =================================================================== RCS file: /home/cvs/apr-util/CHANGES,v retrieving revision 1.73 retrieving revision 1.74 diff -u -r1.73 -r1.74 --- CHANGES 16 Aug 2002 05:58:50 -0000 1.73 +++ CHANGES 18 Aug 2002 20:22:28 -0000 1.74 @@ -1,5 +1,7 @@ Changes with APR-util b1 + *) Added a Thread safe FIFO bounded buffer (apr_queue) [Ian Holsman] + *) Changed file_bucket_setaside() to use apr_file_setaside() instead of turning the file bucket into an mmap bucket. [Brian Pane] 1.1 apr-util/misc/apr_queue.c Index: apr_queue.c =================================================================== /* ==================================================================== * The Apache Software License, Version 1.1 * * Copyright (c) 2000-2002 The Apache Software Foundation. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Apache Software Foundation (http://www.apache.org/)." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Apache" and "Apache Software Foundation" must * not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact apache@apache.org. * * 5. Products derived from this software may not be called "Apache", * nor may "Apache" appear in their name, without prior written * permission of the Apache Software Foundation. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * . * * Portions of this software are based upon public domain software * originally written at the National Center for Supercomputing Applications, * University of Illinois, Urbana-Champaign. */ #if APR_HAVE_STDIO_H #include #endif #if APR_HAVE_STDLIB_H #include #endif #if APR_HAVE_UNISTD_H #include #endif #include "apr_portable.h" #include "apr_thread_mutex.h" #include "apr_thread_cond.h" #include "apr_errno.h" #include "apr_queue.h" /* * define this to get debug messages * */ #define QUEUE_DEBUG struct apr_queue_t { void **data; int nelts; /**< # elements */ int in; /**< next empty location */ int out; /**< next filled location */ int bounds;/**< max size of queue */ apr_thread_mutex_t *one_big_mutex; apr_thread_cond_t *not_empty; apr_thread_cond_t *not_full; int terminated; }; #ifdef QUEUE_DEBUG static void Q_DBG(char*msg, apr_queue_t *q) { fprintf(stderr, "%ld\t#%d in %d out %d\t%s\n", apr_os_thread_current(), q->nelts, q->in, q->out, msg ); } #else #define Q_DBG(x,y) #endif /** * Detects when the apr_queue_t is full. This utility function is expected * to be called from within critical sections, and is not threadsafe. */ #define apr_queue_full(queue) ((queue)->nelts == (queue)->bounds) /** * Detects when the apr_queue_t is empty. This utility function is expected * to be called from within critical sections, and is not threadsafe. */ #define apr_queue_empty(queue) ((queue)->nelts == 0) /** * Callback routine that is called to destroy this * apr_queue_t when its pool is destroyed. */ static apr_status_t queue_destroy(void *data) { apr_queue_t *queue = data; /* Ignore errors here, we can't do anything about them anyway. * XXX: We should at least try to signal an error here, it is * indicative of a programmer error. -aaron */ apr_thread_cond_destroy(queue->not_empty); apr_thread_cond_destroy(queue->not_full); apr_thread_mutex_destroy(queue->one_big_mutex); return APR_SUCCESS; } /** * Initialize the apr_queue_t. */ apr_status_t apr_queue_create(apr_queue_t **q, int queue_capacity, apr_pool_t *a) { apr_status_t rv; apr_queue_t *queue; queue = apr_palloc(a, sizeof( apr_queue_t )); *q = queue; rv = apr_thread_mutex_create(&queue->one_big_mutex, APR_THREAD_MUTEX_UNNESTED, /* nested doesn't work ;( */ a); if (rv != APR_SUCCESS) { return rv; } rv = apr_thread_cond_create(&queue->not_empty, a); if (rv!= APR_SUCCESS) { return rv; } rv = apr_thread_cond_create(&queue->not_full, a); if (rv != APR_SUCCESS) { return rv; } /* Set all the data in the queue to NULL */ queue->data = apr_pcalloc(a, queue_capacity * sizeof(void*)); queue->bounds = queue_capacity; queue->nelts = 0; queue->in = 0; queue->out = 0; queue->terminated = 0; apr_pool_cleanup_register(a, queue, queue_destroy, apr_pool_cleanup_null); return APR_SUCCESS; } /** * Push new data onto the queue. Blocks if the queue is full. Once * the push operation has completed, it signals other threads waiting * in apr_queue_pop() that they may continue consuming sockets. */ apr_status_t apr_queue_push(apr_queue_t *queue, void *data) { apr_status_t rv; int need_signal=0; if (queue->terminated) { return APR_EOF; /* no more elements ever again */ } rv = apr_thread_mutex_lock(queue->one_big_mutex); if (rv != APR_SUCCESS) { return rv; } if (apr_queue_full(queue)) { if (!queue->terminated) { rv = apr_thread_cond_wait(queue->not_full, queue->one_big_mutex); if (rv != APR_SUCCESS) { apr_thread_mutex_unlock(queue->one_big_mutex); return rv; } } /* If we wake up and it's still empty, then we were interrupted */ if (apr_queue_full(queue)) { Q_DBG( "queue full (intr)", queue); rv = apr_thread_mutex_unlock(queue->one_big_mutex); if (rv != APR_SUCCESS) { return rv; } if (queue->terminated) { return APR_EOF; /* no more elements ever again */ } else { return APR_EINTR; } } } /* if we were empty then signal that we aren't */ if (apr_queue_empty(queue)) { need_signal=1; } queue->data[queue->in] = data; queue->in = (queue->in +1) % queue->bounds ; queue->nelts++; if ( need_signal == 1 ) { Q_DBG( "sig !empty", queue); rv = apr_thread_cond_signal(queue->not_empty); if ( rv != APR_SUCCESS) { apr_thread_mutex_unlock(queue->one_big_mutex); return rv; } } rv = apr_thread_mutex_unlock(queue->one_big_mutex); return rv; } /** * Push new data onto the queue. Blocks if the queue is full. Once * the push operation has completed, it signals other threads waiting * in apr_queue_pop() that they may continue consuming sockets. */ apr_status_t apr_queue_trypush(apr_queue_t *queue, void *data) { apr_status_t rv; int need_signal=0; if (queue->terminated) { return APR_EOF; /* no more elements ever again */ } rv = apr_thread_mutex_lock(queue->one_big_mutex); if (rv != APR_SUCCESS) { return rv; } if (apr_queue_full(queue)) { rv = apr_thread_mutex_unlock(queue->one_big_mutex); return APR_EAGAIN; } /* if we were empty then signal that we aren't */ if (apr_queue_empty(queue)) { need_signal=1; } queue->data[queue->in] = data; queue->in = (queue->in +1) % queue->bounds ; queue->nelts++; if ( need_signal == 1 ) { Q_DBG( "sig !empty", queue); rv = apr_thread_cond_signal(queue->not_empty); if ( rv != APR_SUCCESS) { apr_thread_mutex_unlock(queue->one_big_mutex); return rv; } } rv = apr_thread_mutex_unlock(queue->one_big_mutex); return rv; } /** * not thread safe */ int apr_queue_size(apr_queue_t *queue) { return queue->nelts; } /** * Retrieves the next item from the queue. If there are no * items available, it will block until one becomes available. * Once retrieved, the item is placed into the address specified by * 'data'. */ apr_status_t apr_queue_pop(apr_queue_t *queue, void **data) { apr_status_t rv; int need_signal=0; if (queue->terminated) { return APR_EOF; /* no more elements ever again */ } rv = apr_thread_mutex_lock(queue->one_big_mutex); if (rv != APR_SUCCESS) { return rv; } /* Keep waiting until we wake up and find that the queue is not empty. */ if (apr_queue_empty(queue)) { if (!queue->terminated) { rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex); if (rv != APR_SUCCESS) { apr_thread_mutex_unlock(queue->one_big_mutex); return rv; } } /* If we wake up and it's still empty, then we were interrupted */ if (apr_queue_empty(queue)) { Q_DBG( "queue empty (intr)", queue); rv = apr_thread_mutex_unlock(queue->one_big_mutex); if (rv != APR_SUCCESS) { return rv; } if (queue->terminated) { return APR_EOF; /* no more elements ever again */ } else { return APR_EINTR; } } } if (apr_queue_full(queue)) { need_signal =1; } *data = &queue->data[queue->out]; queue->nelts--; queue->out = (queue->out + 1) % queue->bounds; if ( need_signal == 1 ) { Q_DBG( "signal !full", queue); rv = apr_thread_cond_signal(queue->not_full); if ( rv != APR_SUCCESS) { apr_thread_mutex_unlock(queue->one_big_mutex); return rv; } } rv = apr_thread_mutex_unlock(queue->one_big_mutex); return rv; } /** * Retrieves the next item from the queue. If there are no * items available, it will block until one becomes available. * Once retrieved, the item is placed into the address specified by * 'data'. */ apr_status_t apr_queue_trypop(apr_queue_t *queue, void **data) { apr_status_t rv; int need_signal=0; if (queue->terminated) { return APR_EOF; /* no more elements ever again */ } rv = apr_thread_mutex_lock(queue->one_big_mutex); if (rv != APR_SUCCESS) { return rv; } /* Keep waiting until we wake up and find that the queue is not empty. */ if (apr_queue_empty(queue)) { rv = apr_thread_mutex_unlock(queue->one_big_mutex); return APR_EAGAIN; } if (apr_queue_full(queue)) { need_signal =1; } *data = &queue->data[queue->out]; queue->nelts--; queue->out = (queue->out + 1) % queue->bounds; if ( need_signal == 1 ) { Q_DBG( "signal !full", queue); rv = apr_thread_cond_signal(queue->not_full); if ( rv != APR_SUCCESS) { apr_thread_mutex_unlock(queue->one_big_mutex); return rv; } } rv = apr_thread_mutex_unlock(queue->one_big_mutex); return rv; } apr_status_t apr_queue_interrupt_all(apr_queue_t *queue) { apr_status_t rv; Q_DBG( "intr all", queue); if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { return rv; } apr_thread_cond_broadcast(queue->not_empty); apr_thread_cond_broadcast(queue->not_full); if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) { return rv; } return APR_SUCCESS; } apr_status_t apr_queue_term(apr_queue_t *queue) { apr_status_t rv; if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { return rv; } /* we must hold one_big_mutex when setting this... otherwise, * we could end up setting it and waking everybody up just after a * would-be popper checks it but right before they block */ queue->terminated = 1; if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) { return rv; } return apr_queue_interrupt_all(queue); } 1.1 apr-util/include/apr_queue.h Index: apr_queue.h =================================================================== /* ==================================================================== * The Apache Software License, Version 1.1 * * Copyright (c) 2000-2002 The Apache Software Foundation. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Apache Software Foundation (http://www.apache.org/)." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Apache" and "Apache Software Foundation" must * not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact apache@apache.org. * * 5. Products derived from this software may not be called "Apache", * nor may "Apache" appear in their name, without prior written * permission of the Apache Software Foundation. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * . * * Portions of this software are based upon public domain software * originally written at the National Center for Supercomputing Applications, * University of Illinois, Urbana-Champaign. */ #ifndef APR_QUEUE_H #define APR_QUEUE_H /** * @file apr_queue.h * @brief Thread Safe FIFO bounded queue */ /** * @defgroup APR_Util_FIFO Thread Safe FIFO bounded queue * @ingroup APR_Util * @{ */ /** * opaque structure */ typedef struct apr_queue_t apr_queue_t; /** * create a FIFO queue * @param queue_capacity maximum size of the queue * @pool to use */ apr_status_t apr_queue_create(apr_queue_t **queue, int queue_capacity, apr_pool_t *a); /** * push/add a object to the queue, blocking if the queue is already full * * @param queue the queue * @param the data * @returns APR_EINTR the blocking was interrupted (try again) * @returns APR_EOF the queue has been terminated * @returns APR_SUCCESS on a successfull push */ apr_status_t apr_queue_push(apr_queue_t *queue, void *data); /** * pop/get an object from the queue, blocking if the queue is already empty * * @param queue the queue * @param the data * @returns APR_EINTR the blocking was interrupted (try again) * @returns APR_EOF if the queue has been terminated * @returns APR_SUCCESS on a successfull pop */ apr_status_t apr_queue_pop(apr_queue_t *queue, void **data); /** * push/add a object to the queue, returning immediatly if the queue is full * * @param queue the queue * @param the data * @returns APR_EINTR the blocking operation was interrupted (try again) * @returns APR_EAGAIN the queue is full * @returns APR_EOF the queue has been terminated * @returns APR_SUCCESS on a successfull push */ apr_status_t apr_queue_trypush(apr_queue_t *queue, void *data); /** * pop/get an object to the queue, returning immediatly if the queue is empty * * @param queue the queue * @param the data * @returns APR_EINTR the blocking operation was interrupted (try again) * @returns APR_EAGAIN the queue is empty * @returns APR_EOF the queue has been terminated * @returns APR_SUCCESS on a successfull push */ apr_status_t apr_queue_trypop(apr_queue_t *queue, void **data); /** * returns the size of the queue. * * @warning this is not threadsafe, and is intended for reporting/monitoring * of the queue. * @param queue the queue * @returns the size of the queue */ int apr_queue_size(apr_queue_t *queue); /** * interrupt all the threads blocking on this queue. * * @param queue the queue */ apr_status_t apr_queue_interrupt_all(apr_queue_t *queue); /** * terminate all queue, sendinging a interupt to all the * blocking threads * * @param queue the queue */ apr_status_t apr_queue_term(apr_queue_t *queue); #endif /* APRQUEUE_H */ 1.1 apr-util/test/testqueue.c Index: testqueue.c =================================================================== /* ==================================================================== * The Apache Software License, Version 1.1 * * Copyright (c) 2000-2002 The Apache Software Foundation. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Apache Software Foundation (http://www.apache.org/)." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Apache" and "Apache Software Foundation" must * not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact apache@apache.org. * * 5. Products derived from this software may not be called "Apache", * nor may "Apache" appear in their name, without prior written * permission of the Apache Software Foundation. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * . */ #include #include #include #include #include "errno.h" #include #include #include #if APR_HAVE_UNISTD_H #include #endif #include #include "apr_queue.h" #if !APR_HAS_THREADS int main(void) { fprintf(stderr, "This program won't work on this platform because there is no " "support for threads.\n"); return 0; } #else /* !APR_HAS_THREADS */ apr_pool_t *context; int consumer_activity=300; int producer_activity=200; int verbose=0; void * APR_THREAD_FUNC consumer(apr_thread_t *thd, void *data) { long sleeprate; apr_queue_t *q = (apr_queue_t*)data; apr_status_t rv; int val; void*v; sleeprate = 1000000/consumer_activity; apr_sleep( (rand() % 4 ) * 1000000 ); /* sleep random seconds */ while (1) { do { rv = apr_queue_pop(q, &v); if (rv == APR_EINTR) fprintf(stderr, "%ld\tconsumer intr\n",apr_os_thread_current()); } while (rv == APR_EINTR) ; if (rv != APR_SUCCESS) { if (rv == APR_EOF) { fprintf(stderr, "%ld\tconsumer:queue terminated APR_EOF\n", apr_os_thread_current()); rv=APR_SUCCESS; } else fprintf(stderr, "%ld\tconsumer thread exit rv %d\n", apr_os_thread_current(),rv); apr_thread_exit(thd, rv); return NULL; } val = **(int**)v; if (verbose) fprintf(stderr, "%ld\tpop %d\n", apr_os_thread_current(),val); apr_sleep( sleeprate ); /* sleep this long to acheive our rate */ } /* not reached */ return NULL; } void * APR_THREAD_FUNC producer(apr_thread_t *thd, void *data) { int i=0; long sleeprate; apr_queue_t *q = (apr_queue_t*)data; apr_status_t rv; int *val; sleeprate = 1000000/producer_activity; apr_sleep( (rand() % 4 ) * 1000000 ); /* sleep random seconds */ while(1) { val = apr_palloc(context, sizeof(int)); *val=i; if (verbose) fprintf(stderr, "%ld\tpush %d\n",apr_os_thread_current(),*val); do { rv = apr_queue_push(q, val); if (rv == APR_EINTR) fprintf(stderr, "%ld\tproducer intr\n",apr_os_thread_current()); } while (rv == APR_EINTR); if (rv != APR_SUCCESS) { if (rv == APR_EOF) { fprintf(stderr, "%ld\tproducer: queue terminated APR_EOF\n", apr_os_thread_current()); rv = APR_SUCCESS; } else fprintf(stderr, "%ld\tproducer thread exit rv %d\n", apr_os_thread_current(),rv); apr_thread_exit(thd, rv); return NULL; } i++; apr_sleep( sleeprate ); /* sleep this long to acheive our rate */ } /* not reached */ return NULL; } static void usage() { fprintf(stderr,"usage: testqueue -p n -P n -c n -C n -q n -s n "); fprintf(stderr,"-c # of consumer\n"); fprintf(stderr,"-C amount they consumer before dying\n"); fprintf(stderr,"-p # of producers\n"); fprintf(stderr,"-P amount they produce before dying\n"); fprintf(stderr,"-q queue size\n"); fprintf(stderr,"-s amount of time to sleep before killing it\n"); fprintf(stderr,"-v verbose\n"); } int main(int argc, const char* const argv[]) { apr_thread_t **t; apr_queue_t *queue; int i; apr_status_t rv; apr_getopt_t *opt; const char *optarg; char c; int numconsumers=3; int numproducers=4; int queuesize=100; int sleeptime=30; char errorbuf[200]; apr_initialize(); srand((unsigned int)apr_time_now()); printf("APR Queue Test\n======================\n\n"); printf("%-60s", "Initializing the context"); if (apr_pool_create(&context, NULL) != APR_SUCCESS) { fflush(stdout); fprintf(stderr, "Failed.\nCould not initialize\n"); exit(-1); } printf("OK\n"); apr_getopt_init(&opt, context, argc, argv); while ((rv = apr_getopt(opt, "p:c:P:C:q:s:v", &c, &optarg)) == APR_SUCCESS) { switch (c) { case 'c': numconsumers = atoi( optarg); break; case 'p': numproducers = atoi( optarg); break; case 'C': consumer_activity = atoi( optarg); break; case 'P': producer_activity = atoi( optarg); break; case 's': sleeptime= atoi(optarg); break; case 'q': queuesize = atoi(optarg); break; case 'v': verbose= 1; break; default: usage(); exit(-1); } } /* bad cmdline option? then we die */ if (rv != APR_EOF || opt->ind < opt->argc) { usage(); exit(-1); } printf("test stats %d consumers (rate %d/sec) %d producers (rate %d/sec) queue size %d sleep %d\n", numconsumers,consumer_activity, numproducers, producer_activity, queuesize,sleeptime); printf("%-60s", "Initializing the queue"); rv = apr_queue_create(&queue, queuesize, context); if (rv != APR_SUCCESS) { fflush(stdout); fprintf(stderr, "Failed\nCould not create queue %d\n",rv); apr_strerror(rv, errorbuf,200); fprintf(stderr,"%s\n",errorbuf); exit(-1); } printf("OK\n"); t = apr_palloc( context, sizeof(apr_thread_t*) * (numconsumers+numproducers)); printf("%-60s", "Starting consumers"); for (i=0;i