couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1171341 - in /couchdb/branches/1.2.x: src/couchdb/couch_work_queue.erl test/etap/042-work-queue.t test/etap/Makefile.am
Date Fri, 16 Sep 2011 00:21:45 GMT
Author: fdmanana
Date: Fri Sep 16 00:21:44 2011
New Revision: 1171341

URL: http://svn.apache.org/viewvc?rev=1171341&view=rev
Log:
Add test test/etap/042-work-queue.t

So far the couch_work_queue module had no unit tests at
all. This module is important for the view updater and
the replicator.

This is a backport of revision 1171340 from trunk.

Added:
    couchdb/branches/1.2.x/test/etap/042-work-queue.t   (with props)
Modified:
    couchdb/branches/1.2.x/src/couchdb/couch_work_queue.erl
    couchdb/branches/1.2.x/test/etap/Makefile.am

Modified: couchdb/branches/1.2.x/src/couchdb/couch_work_queue.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/1.2.x/src/couchdb/couch_work_queue.erl?rev=1171341&r1=1171340&r2=1171341&view=diff
==============================================================================
--- couchdb/branches/1.2.x/src/couchdb/couch_work_queue.erl (original)
+++ couchdb/branches/1.2.x/src/couchdb/couch_work_queue.erl Fri Sep 16 00:21:44 2011
@@ -16,7 +16,7 @@
 -include("couch_db.hrl").
 
 % public API
--export([new/1, queue/2, dequeue/1, dequeue/2, close/1]).
+-export([new/1, queue/2, dequeue/1, dequeue/2, close/1, item_count/1, size/1]).
 
 % gen_server callbacks
 -export([init/1, terminate/2]).
@@ -57,6 +57,22 @@ dequeue(Wq, MaxItems) ->
     end.
 
 
+item_count(Wq) ->
+    try
+        gen_server:call(Wq, item_count, infinity)
+    catch
+        _:_ -> closed
+    end.
+
+
+size(Wq) ->
+    try
+        gen_server:call(Wq, size, infinity)
+    catch
+        _:_ -> closed
+    end.
+
+
 close(Wq) ->
     gen_server:cast(Wq, close).
     
@@ -104,7 +120,13 @@ handle_call({dequeue, Max}, From, Q) ->
         C when C > 0 ->
             deliver_queue_items(Max, Q)
         end
-    end.
+    end;
+
+handle_call(item_count, _From, Q) ->
+    {reply, Q#q.items, Q};
+
+handle_call(size, _From, Q) ->
+    {reply, Q#q.size, Q}.
 
 
 deliver_queue_items(Max, Q) ->

Added: couchdb/branches/1.2.x/test/etap/042-work-queue.t
URL: http://svn.apache.org/viewvc/couchdb/branches/1.2.x/test/etap/042-work-queue.t?rev=1171341&view=auto
==============================================================================
--- couchdb/branches/1.2.x/test/etap/042-work-queue.t (added)
+++ couchdb/branches/1.2.x/test/etap/042-work-queue.t Fri Sep 16 00:21:44 2011
@@ -0,0 +1,500 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+main(_) ->
+    test_util:init_code_path(),
+
+    etap:plan(155),
+    case (catch test()) of
+        ok ->
+            etap:end_tests();
+        Other ->
+            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
+            etap:bail(Other)
+    end,
+    ok.
+
+
+test() ->
+    ok = crypto:start(),
+    test_single_consumer_max_item_count(),
+    test_single_consumer_max_size(),
+    test_single_consumer_max_item_count_and_size(),
+    test_multiple_consumers(),
+    ok.
+
+
+test_single_consumer_max_item_count() ->
+    etap:diag("Spawning a queue with 3 max items, 1 producer and 1 consumer"),
+
+    {ok, Q} = couch_work_queue:new([{max_items, 3}]),
+    Producer = spawn_producer(Q),
+    Consumer = spawn_consumer(Q),
+
+    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
+
+    consume(Consumer, 1),
+    etap:is(ping(Consumer), timeout,
+        "Consumer blocked when attempting to dequeue 1 item from empty queue"),
+
+    Item1 = produce(Producer, 10),
+    etap:is(ping(Producer), ok, "Producer not blocked"),
+
+    etap:is(ping(Consumer), ok, "Consumer unblocked"),
+    etap:is(last_consumer_items(Consumer), {ok, [Item1]},
+        "Consumer received the right item"),
+
+    Item2 = produce(Producer, 20),
+    etap:is(ping(Producer), ok, "Producer not blocked with non full queue"),
+    etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
+
+    Item3 = produce(Producer, 15),
+    etap:is(ping(Producer), ok, "Producer not blocked with non full queue"),
+    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
+
+    Item4 = produce(Producer, 3),
+    etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
+    etap:is(ping(Producer), timeout, "Producer blocked with full queue"),
+
+    consume(Consumer, 2),
+    etap:is(ping(Consumer), ok,
+        "Consumer not blocked when attempting to dequeue 2 items from queue"),
+    etap:is(last_consumer_items(Consumer), {ok, [Item2, Item3]},
+        "Consumer received the right items"),
+    etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
+
+    consume(Consumer, 2),
+    etap:is(ping(Consumer), ok,
+        "Consumer not blocked when attempting to dequeue 2 items from queue"),
+    etap:is(last_consumer_items(Consumer), {ok, [Item4]},
+        "Consumer received the right item"),
+    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
+
+    consume(Consumer, 100),
+    etap:is(ping(Consumer), timeout,
+        "Consumer blocked when attempting to dequeue 100 items from empty queue"),
+    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
+
+    Item5 = produce(Producer, 11),
+    etap:is(ping(Producer), ok, "Producer not blocked with empty queue"),
+    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
+
+    Item6 = produce(Producer, 19),
+    etap:is(ping(Producer), ok, "Producer not blocked with non full queue"),
+    etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
+
+    Item7 = produce(Producer, 2),
+    etap:is(ping(Producer), ok, "Producer not blocked with non full queue"),
+    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
+
+    Item8 = produce(Producer, 33),
+    etap:is(ping(Producer), timeout, "Producer blocked with full queue"),
+    etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
+
+    etap:is(ping(Consumer), ok, "Consumer unblocked"),
+    etap:is(last_consumer_items(Consumer), {ok, [Item5]},
+        "Consumer received the first queued item"),
+    etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
+
+    consume(Consumer, all),
+    etap:is(ping(Consumer), ok,
+        "Consumer not blocked when attempting to dequeue all items from queue"),
+    etap:is(last_consumer_items(Consumer), {ok, [Item6, Item7, Item8]},
+        "Consumer received all queued items"),
+
+    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
+
+    etap:is(close_queue(Q), ok, "Closed queue"),
+    consume(Consumer, 1),
+    etap:is(last_consumer_items(Consumer), closed, "Consumer got closed queue"),
+    etap:is(couch_work_queue:item_count(Q), closed, "Queue closed"),
+    etap:is(couch_work_queue:size(Q), closed, "Queue closed"),
+
+    stop(Producer, "producer"),
+    stop(Consumer, "consumer").
+
+
+
+test_single_consumer_max_size() ->
+    etap:diag("Spawning a queue with max size of 160 bytes, "
+        "1 producer and 1 consumer"),
+
+    {ok, Q} = couch_work_queue:new([{max_size, 160}]),
+    Producer = spawn_producer(Q),
+    Consumer = spawn_consumer(Q),
+
+    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
+    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
+
+    consume(Consumer, 1),
+    etap:is(ping(Consumer), timeout,
+        "Consumer blocked when attempting to dequeue 1 item from empty queue"),
+
+    Item1 = produce(Producer, 50),
+    etap:is(ping(Producer), ok, "Producer not blocked"),
+
+    etap:is(ping(Consumer), ok, "Consumer unblocked"),
+    etap:is(last_consumer_items(Consumer), {ok, [Item1]},
+        "Consumer received the right item"),
+
+    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
+    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
+
+    Item2 = produce(Producer, 50),
+    etap:is(ping(Producer), ok, "Producer not blocked"),
+    etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
+    etap:is(couch_work_queue:size(Q), 50, "Queue size is 50 bytes"),
+
+    Item3 = produce(Producer, 50),
+    etap:is(ping(Producer), ok, "Producer not blocked"),
+    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
+    etap:is(couch_work_queue:size(Q), 100, "Queue size is 100 bytes"),
+
+    Item4 = produce(Producer, 61),
+    etap:is(ping(Producer), timeout, "Producer blocked"),
+    etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
+    etap:is(couch_work_queue:size(Q), 161, "Queue size is 161 bytes"),
+
+    consume(Consumer, 1),
+    etap:is(ping(Consumer), ok,
+        "Consumer not blocked when attempting to dequeue 1 item from full queue"),
+    etap:is(last_consumer_items(Consumer), {ok, [Item2]},
+        "Consumer received the right item"),
+    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
+    etap:is(couch_work_queue:size(Q), 111, "Queue size is 111 bytes"),
+
+    Item5 = produce(Producer, 20),
+    etap:is(ping(Producer), ok, "Producer not blocked"),
+    etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
+    etap:is(couch_work_queue:size(Q), 131, "Queue size is 131 bytes"),
+
+    Item6 = produce(Producer, 40),
+    etap:is(ping(Producer), timeout, "Producer blocked"),
+    etap:is(couch_work_queue:item_count(Q), 4, "Queue item count is 4"),
+    etap:is(couch_work_queue:size(Q), 171, "Queue size is 171 bytes"),
+
+    etap:is(close_queue(Q), timeout,
+        "Timeout when trying to close non-empty queue"),
+
+    consume(Consumer, 2),
+    etap:is(ping(Consumer), ok,
+        "Consumer not blocked when attempting to dequeue 2 items from full queue"),
+    etap:is(last_consumer_items(Consumer), {ok, [Item3, Item4]},
+        "Consumer received the right items"),
+    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
+    etap:is(couch_work_queue:size(Q), 60, "Queue size is 60 bytes"),
+
+    etap:is(close_queue(Q), timeout,
+        "Timeout when trying to close non-empty queue"),
+
+    consume(Consumer, all),
+    etap:is(ping(Consumer), ok,
+        "Consumer not blocked when attempting to dequeue all items from queue"),
+    etap:is(last_consumer_items(Consumer), {ok, [Item5, Item6]},
+        "Consumer received the right items"),
+
+    etap:is(couch_work_queue:item_count(Q), closed, "Queue closed"),
+    etap:is(couch_work_queue:size(Q), closed, "Queue closed"),
+
+    consume(Consumer, all),
+    etap:is(last_consumer_items(Consumer), closed, "Consumer got closed queue"),
+
+    stop(Producer, "producer"),
+    stop(Consumer, "consumer").
+
+
+test_single_consumer_max_item_count_and_size() ->
+    etap:diag("Spawning a queue with 3 max items, max size of 200 bytes, "
+        "1 producer and 1 consumer"),
+
+    {ok, Q} = couch_work_queue:new([{max_items, 3}, {max_size, 200}]),
+    Producer = spawn_producer(Q),
+    Consumer = spawn_consumer(Q),
+
+    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
+    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
+
+    Item1 = produce(Producer, 100),
+    etap:is(ping(Producer), ok, "Producer not blocked"),
+    etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
+    etap:is(couch_work_queue:size(Q), 100, "Queue size is 100 bytes"),
+
+    Item2 = produce(Producer, 110),
+    etap:is(ping(Producer), timeout,
+        "Producer blocked when queue size >= max_size"),
+    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
+    etap:is(couch_work_queue:size(Q), 210, "Queue size is 210 bytes"),
+
+    consume(Consumer, all),
+    etap:is(ping(Consumer), ok,
+        "Consumer not blocked when attempting to dequeue all items from queue"),
+    etap:is(last_consumer_items(Consumer), {ok, [Item1, Item2]},
+        "Consumer received the right items"),
+    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
+    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
+
+    etap:is(ping(Producer), ok, "Producer not blocked anymore"),
+
+    Item3 = produce(Producer, 10),
+    etap:is(ping(Producer), ok, "Producer not blocked"),
+    etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
+    etap:is(couch_work_queue:size(Q), 10, "Queue size is 10 bytes"),
+
+    Item4 = produce(Producer, 4),
+    etap:is(ping(Producer), ok, "Producer not blocked"),
+    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
+    etap:is(couch_work_queue:size(Q), 14, "Queue size is 14 bytes"),
+
+    Item5 = produce(Producer, 2),
+    etap:is(ping(Producer), timeout,
+        "Producer blocked when queue item count = max_items"),
+    etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
+    etap:is(couch_work_queue:size(Q), 16, "Queue size is 16 bytes"),
+
+    consume(Consumer, 1),
+    etap:is(ping(Consumer), ok,
+        "Consumer not blocked when attempting to dequeue 1 item from queue"),
+    etap:is(last_consumer_items(Consumer), {ok, [Item3]},
+       "Consumer received 1 item"),
+    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
+    etap:is(couch_work_queue:size(Q), 6, "Queue size is 6 bytes"),
+
+    etap:is(close_queue(Q), timeout,
+        "Timeout when trying to close non-empty queue"),
+
+    consume(Consumer, 1),
+    etap:is(ping(Consumer), ok,
+        "Consumer not blocked when attempting to dequeue 1 item from queue"),
+    etap:is(last_consumer_items(Consumer), {ok, [Item4]},
+       "Consumer received 1 item"),
+    etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
+    etap:is(couch_work_queue:size(Q), 2, "Queue size is 2 bytes"),
+
+    Item6 = produce(Producer, 50),
+    etap:is(ping(Producer), ok,
+        "Producer not blocked when queue is not full and already received"
+        " a close request"),
+    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
+    etap:is(couch_work_queue:size(Q), 52, "Queue size is 52 bytes"),
+
+    consume(Consumer, all),
+    etap:is(ping(Consumer), ok,
+        "Consumer not blocked when attempting to dequeue all items from queue"),
+    etap:is(last_consumer_items(Consumer), {ok, [Item5, Item6]},
+       "Consumer received all queued items"),
+
+    etap:is(couch_work_queue:item_count(Q), closed, "Queue closed"),
+    etap:is(couch_work_queue:size(Q), closed, "Queue closed"),
+
+    consume(Consumer, 1),
+    etap:is(last_consumer_items(Consumer), closed, "Consumer got closed queue"),
+
+    stop(Producer, "producer"),
+    stop(Consumer, "consumer").
+
+
+test_multiple_consumers() ->
+    etap:diag("Spawning a queue with 3 max items, max size of 200 bytes, "
+        "1 producer and 3 consumers"),
+
+    {ok, Q} = couch_work_queue:new(
+        [{max_items, 3}, {max_size, 200}, {multi_workers, true}]),
+    Producer = spawn_producer(Q),
+    Consumer1 = spawn_consumer(Q),
+    Consumer2 = spawn_consumer(Q),
+    Consumer3 = spawn_consumer(Q),
+
+    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
+    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
+
+    consume(Consumer1, 1),
+    etap:is(ping(Consumer1), timeout,
+        "Consumer 1 blocked when attempting to dequeue 1 item from empty queue"),
+    consume(Consumer2, 2),
+    etap:is(ping(Consumer2), timeout,
+        "Consumer 2 blocked when attempting to dequeue 2 items from empty queue"),
+    consume(Consumer3, 1),
+    etap:is(ping(Consumer3), timeout,
+        "Consumer 3 blocked when attempting to dequeue 1 item from empty queue"),
+
+    Item1 = produce(Producer, 50),
+    etap:is(ping(Producer), ok, "Producer not blocked"),
+    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
+    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
+
+    Item2 = produce(Producer, 50),
+    etap:is(ping(Producer), ok, "Producer not blocked"),
+    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
+    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
+
+    Item3 = produce(Producer, 50),
+    etap:is(ping(Producer), ok, "Producer not blocked"),
+    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
+    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
+
+    etap:is(ping(Consumer1), ok, "Consumer 1 unblocked"),
+    etap:is(last_consumer_items(Consumer1), {ok, [Item1]},
+       "Consumer 1 received 1 item"),
+    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
+    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
+
+    etap:is(ping(Consumer2), ok, "Consumer 2 unblocked"),
+    etap:is(last_consumer_items(Consumer2), {ok, [Item2]},
+       "Consumer 2 received 1 item"),
+    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
+    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
+
+    etap:is(ping(Consumer3), ok, "Consumer 3 unblocked"),
+    etap:is(last_consumer_items(Consumer3), {ok, [Item3]},
+       "Consumer 3 received 1 item"),
+    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
+    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
+
+    consume(Consumer1, 1),
+    etap:is(ping(Consumer1), timeout,
+        "Consumer 1 blocked when attempting to dequeue 1 item from empty queue"),
+    consume(Consumer2, 2),
+    etap:is(ping(Consumer2), timeout,
+        "Consumer 2 blocked when attempting to dequeue 1 item from empty queue"),
+    consume(Consumer3, 1),
+    etap:is(ping(Consumer3), timeout,
+        "Consumer 3 blocked when attempting to dequeue 1 item from empty queue"),
+
+    Item4 = produce(Producer, 50),
+    etap:is(ping(Producer), ok, "Producer not blocked"),
+    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
+    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
+
+    etap:is(close_queue(Q), ok, "Closed queue"),
+
+    etap:is(ping(Consumer1), ok, "Consumer 1 unblocked"),
+    etap:is(last_consumer_items(Consumer1), {ok, [Item4]},
+       "Consumer 1 received 1 item"),
+
+    etap:is(couch_work_queue:item_count(Q), closed, "Queue closed"),
+    etap:is(couch_work_queue:size(Q), closed, "Queue closed"),
+
+    etap:is(ping(Consumer2), ok, "Consumer 2 unblocked"),
+    etap:is(last_consumer_items(Consumer2), closed,
+        "Consumer 2 received 'closed' atom"),
+
+    etap:is(ping(Consumer3), ok, "Consumer 3 unblocked"),
+    etap:is(last_consumer_items(Consumer3), closed,
+        "Consumer 3 received 'closed' atom"),
+
+    stop(Producer, "producer"),
+    stop(Consumer1, "consumer 1"),
+    stop(Consumer2, "consumer 2"),
+    stop(Consumer3, "consumer 3").
+
+
+close_queue(Q) ->
+    ok = couch_work_queue:close(Q),
+    MonRef = erlang:monitor(process, Q),
+    receive
+    {'DOWN', MonRef, process, Q, _Reason} ->
+         etap:diag("Queue closed")
+    after 3000 ->
+         erlang:demonitor(MonRef),
+         timeout
+    end.
+
+
+spawn_consumer(Q) ->
+    Parent = self(),
+    spawn(fun() -> consumer_loop(Parent, Q, nil) end).
+
+
+consumer_loop(Parent, Q, PrevItem) ->
+    receive
+    {stop, Ref} ->
+        Parent ! {ok, Ref};
+    {ping, Ref} ->
+        Parent ! {pong, Ref},
+        consumer_loop(Parent, Q, PrevItem);
+    {last_item, Ref} ->
+        Parent ! {item, Ref, PrevItem},
+        consumer_loop(Parent, Q, PrevItem);
+    {consume, N} ->
+        Result = couch_work_queue:dequeue(Q, N),
+        consumer_loop(Parent, Q, Result)
+    end.
+
+
+spawn_producer(Q) ->
+    Parent = self(),
+    spawn(fun() -> producer_loop(Parent, Q) end).
+
+
+producer_loop(Parent, Q) ->
+    receive
+    {stop, Ref} ->
+        Parent ! {ok, Ref};
+    {ping, Ref} ->
+        Parent ! {pong, Ref},
+        producer_loop(Parent, Q);
+    {produce, Ref, Size} ->
+        Item = crypto:rand_bytes(Size),
+        Parent ! {item, Ref, Item},
+        ok = couch_work_queue:queue(Q, Item),
+        producer_loop(Parent, Q)
+    end.
+
+
+consume(Consumer, N) ->
+    Consumer ! {consume, N}.
+
+
+last_consumer_items(Consumer) ->
+    Ref = make_ref(),
+    Consumer ! {last_item, Ref},
+    receive
+    {item, Ref, Items} ->
+        Items
+    after 3000 ->
+        timeout
+    end.
+
+
+produce(Producer, Size) ->
+    Ref = make_ref(),
+    Producer ! {produce, Ref, Size},
+    receive
+    {item, Ref, Item} ->
+        Item
+    after 3000 ->
+        etap:bail("Timeout asking producer to produce an item")
+    end.
+
+
+ping(Pid) ->
+    Ref = make_ref(),
+    Pid ! {ping, Ref},
+    receive
+    {pong, Ref} ->
+        ok
+    after 3000 ->
+        timeout
+    end.
+
+
+stop(Pid, Name) ->
+    Ref = make_ref(),
+    Pid ! {stop, Ref},
+    receive
+    {ok, Ref} ->
+        etap:diag("Stopped " ++ Name)
+    after 3000 ->
+        etap:bail("Timeout stopping " ++ Name)
+    end.

Propchange: couchdb/branches/1.2.x/test/etap/042-work-queue.t
------------------------------------------------------------------------------
    svn:executable = *

Modified: couchdb/branches/1.2.x/test/etap/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/branches/1.2.x/test/etap/Makefile.am?rev=1171341&r1=1171340&r2=1171341&view=diff
==============================================================================
--- couchdb/branches/1.2.x/test/etap/Makefile.am (original)
+++ couchdb/branches/1.2.x/test/etap/Makefile.am Fri Sep 16 00:21:44 2011
@@ -45,6 +45,7 @@ EXTRA_DIST = \
     041-uuid-gen-seq.ini \
     041-uuid-gen-utc.ini \
     041-uuid-gen.t \
+    042-work-queue.t \
     050-stream.t \
     060-kt-merging.t \
     061-kt-missing-leaves.t \



Mime
View raw message