httpd-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rüdiger Plüm <ruediger.pl...@vodafone.com>
Subject Fwd: svn commit: r1202257 - in /httpd/httpd/trunk/server/mpm/event: config3.m4 equeue.c equeue.h event.c
Date Wed, 16 Nov 2011 09:20:30 GMT


-------- Original-Nachricht --------
Betreff: 	svn commit: r1202257 - in /httpd/httpd/trunk/server/mpm/event: config3.m4 equeue.c
equeue.h event.c
Datum: 	Tue, 15 Nov 2011 15:51:04 GMT
Von: 	pquerna@apache.org



Author: pquerna
Date: Tue Nov 15 15:51:03 2011
New Revision: 1202257

URL: http://svn.apache.org/viewvc?rev=1202257&view=rev
Log:
Create a new lock free circular queue, and use it in the EventMPM to remove the timeout mutex
that was wrapping both timeout queue operations and pollset operations.

Added:
     httpd/httpd/trunk/server/mpm/event/equeue.c   (with props)
     httpd/httpd/trunk/server/mpm/event/equeue.h   (with props)
Modified:
     httpd/httpd/trunk/server/mpm/event/config3.m4
     httpd/httpd/trunk/server/mpm/event/event.c

Modified: httpd/httpd/trunk/server/mpm/event/config3.m4
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/server/mpm/event/equeue.c?rev=1202257&view=auto
==============================================================================
--- httpd/httpd/trunk/server/mpm/event/equeue.c (added)
+++ httpd/httpd/trunk/server/mpm/event/equeue.c Tue Nov 15 15:51:03 2011
@@ -0,0 +1,125 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "equeue.h"
+
+#include<apr_atomic.h>
+#include<sched.h>
+
+struct ap_equeue_t {
+    apr_uint32_t nelem;
+    apr_size_t elem_size;
+    uint8_t *bytes;
+    volatile apr_uint32_t writeCount;
+    volatile apr_uint32_t readCount;
+};
+
+
+static APR_INLINE apr_uint32_t count_to_index(ap_equeue_t *eq, apr_uint32_t count)
+{
+    return (count&  (eq->nelem - 1));
+}
+
+static APR_INLINE void* index_to_bytes(ap_equeue_t *eq, apr_uint32_t idx)
+{
+    apr_size_t offset = idx * eq->elem_size;
+    return (void*)&eq->bytes[offset];
+}
+
+static APR_INLINE apr_uint32_t nearest_power(apr_uint32_t num)
+{
+    apr_uint32_t n = 1;
+    while (n<  num) {
+        n<<= 1;
+    }
+
+    return n;
+}
+
+#if 0
+static void dump_queue(ap_equeue_t *eq)
+{
+    apr_uint32_t i;
+
+    fprintf(stderr, "dumping %p\n", eq);
+    fprintf(stderr, "  nelem:   %u\n", eq->nelem);
+    fprintf(stderr, "  esize:   %"APR_SIZE_T_FMT"\n", eq->elem_size);
+    fprintf(stderr, "  wcnt:    %u\n", eq->writeCount);
+    fprintf(stderr, "  rcnt:    %u\n", eq->writeCount);
+    fprintf(stderr, "  bytes:   %p\n", eq->bytes);
+    for (i = 0; i<  eq->nelem; i++) {
+        fprintf(stderr, "    [%u] = %p\n", i, index_to_bytes(eq, i));
+    }
+
+    fprintf(stderr, "\n");
+    fflush(stderr);
+}
+#endif
+
+apr_status_t
+ap_equeue_create(apr_pool_t *p, apr_uint32_t nelem, apr_size_t elem_size, ap_equeue_t **eqout)
+{
+    ap_equeue_t *eq;
+
+    *eqout = NULL;
+
+    eq = apr_palloc(p, sizeof(ap_equeue_t));
+    eq->bytes = apr_palloc(p, (1 + nelem) * elem_size);
+    eq->nelem = nearest_power(nelem);
+    eq->elem_size = elem_size;
+    eq->writeCount = 0;
+    eq->readCount = 0;
+    *eqout = eq;
+
+    return APR_SUCCESS;
+}
+
+void *
+ap_equeue_reader_next(ap_equeue_t *eq)
+{
+    if (apr_atomic_read32(&eq->writeCount) == eq->readCount) {
+        return NULL;
+    }
+    else {
+        apr_uint32_t idx = count_to_index(eq, apr_atomic_inc32(&eq->readCount));
+        return index_to_bytes(eq, idx);
+    }
+}
+
+void *
+ap_equeue_writer_value(ap_equeue_t *eq)
+{
+    apr_uint32_t idx;
+
+    while (1) {
+        apr_uint32_t readCount = apr_atomic_read32(&eq->readCount);
+
+        if (count_to_index(eq, eq->writeCount + 1) != count_to_index(eq, readCount)) {
+            break;
+        }
+        /* TODO: research if sched_yield is even worth doing  */
+        sched_yield();
+    }
+
+    idx = count_to_index(eq, eq->writeCount);
+    return index_to_bytes(eq, idx);
+}

I assume that only a single thread tries write operations on this queue, correct?
Otherwise it seems unsafe if another thread calls
ap_equeue_writer_value in parallel as it would return the same slot until
ap_equeue_writer_onward was called.




Mime
View raw message