couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dam...@apache.org
Subject svn commit: r946803 - in /couchdb/trunk: share/www/script/test/changes.js src/couchdb/couch_changes.erl src/couchdb/couch_db.erl src/couchdb/couch_doc.erl src/couchdb/couch_httpd_db.erl src/couchdb/couch_rep_writer.erl src/couchdb/couch_work_queue.erl
Date Thu, 20 May 2010 21:47:52 GMT
Author: damien
Date: Thu May 20 21:47:51 2010
New Revision: 946803

URL: http://svn.apache.org/viewvc?rev=946803&view=rev
Log:
Refactoring of various internal APIs, particularly those dealing with replicating documents
with attachments.

Modified:
    couchdb/trunk/share/www/script/test/changes.js
    couchdb/trunk/src/couchdb/couch_changes.erl
    couchdb/trunk/src/couchdb/couch_db.erl
    couchdb/trunk/src/couchdb/couch_doc.erl
    couchdb/trunk/src/couchdb/couch_httpd_db.erl
    couchdb/trunk/src/couchdb/couch_rep_writer.erl
    couchdb/trunk/src/couchdb/couch_work_queue.erl

Modified: couchdb/trunk/share/www/script/test/changes.js
URL: http://svn.apache.org/viewvc/couchdb/trunk/share/www/script/test/changes.js?rev=946803&r1=946802&r2=946803&view=diff
==============================================================================
--- couchdb/trunk/share/www/script/test/changes.js (original)
+++ couchdb/trunk/share/www/script/test/changes.js Thu May 20 21:47:51 2010
@@ -220,7 +220,7 @@ couchTests.changes = function(debug) {
   
   req = CouchDB.request("GET", "/test_suite_db/_changes?filter=changes_filter/dynamic&field=bop");
   resp = JSON.parse(req.responseText);
-  T(resp.results.length == 1);
+  T(resp.results.length == 1, "changes_filter/dynamic&field=bop");
 
   if (!is_safari && xhr) { // full test requires parallel connections
     // filter with longpoll
@@ -352,7 +352,7 @@ couchTests.changes = function(debug) {
 
   req = CouchDB.request("GET", "/test_suite_db/_changes?filter=changes_filter/conflicted");
   resp = JSON.parse(req.responseText);
-  T(resp.results.length == 1);
+  T(resp.results.length == 1, "filter=changes_filter/conflicted");
 
   // test with erlang filter function
   run_on_modified_server([{

Modified: couchdb/trunk/src/couchdb/couch_changes.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_changes.erl?rev=946803&r1=946802&r2=946803&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_changes.erl (original)
+++ couchdb/trunk/src/couchdb/couch_changes.erl Thu May 20 21:47:51 2010
@@ -16,8 +16,9 @@
 -export([handle_changes/3]).
 
 %% @type Req -> #httpd{} | {json_req, JsonObj()}
-handle_changes(#changes_args{}=Args1, Req, Db) ->
-    Args = Args1#changes_args{filter=make_filter_fun(Args1#changes_args.filter, Req, Db)},
+handle_changes(#changes_args{style=Style}=Args1, Req, Db) ->
+    Args = Args1#changes_args{filter=
+            make_filter_fun(Args1#changes_args.filter, Style, Req, Db)},
     StartSeq = case Args#changes_args.dir of
     rev ->
         couch_db:get_update_seq(Db);
@@ -72,14 +73,18 @@ handle_changes(#changes_args{}=Args1, Re
     end.
 
 %% @type Req -> #httpd{} | {json_req, JsonObj()}
-make_filter_fun(FilterName, Req, Db) ->
+make_filter_fun(FilterName, Style, Req, Db) ->
     case [list_to_binary(couch_httpd:unquote(Part))
             || Part <- string:tokens(FilterName, "/")] of
     [] ->
-        fun(DocInfos) ->
-        % doing this as a batch is more efficient for external filters
-            [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} ||
-                #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos]
+        fun(#doc_info{revs=[#rev_info{rev=Rev}|_]=Revs}) ->
+            case Style of
+            main_only ->
+                [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
+            all_docs ->
+                [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} 
+                        || #rev_info{rev=R} <- Revs]
+            end
         end;
     [DName, FName] ->
         DesignId = <<"_design/", DName/binary>>,
@@ -87,17 +92,23 @@ make_filter_fun(FilterName, Req, Db) ->
         % validate that the ddoc has the filter fun
         #doc{body={Props}} = DDoc,
         couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]),
-        fun(DocInfos) ->
+        fun(DocInfo) ->
+            DocInfos = 
+            case Style of
+            main_only ->
+                [DocInfo];
+            all_docs ->
+                [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs]
+            end,
             Docs = [Doc || {ok, Doc} <- [
-                {ok, _Doc} = couch_db:open_doc(Db, DInfo, [deleted, conflicts])
-                || DInfo <- DocInfos]],
+                    couch_db:open_doc(Db, DocInfo2, [deleted, conflicts])
+                        || DocInfo2 <- DocInfos]],
             {ok, Passes} = couch_query_servers:filter_docs(
                 Req, Db, DDoc, FName, Docs
             ),
-            % ?LOG_INFO("filtering ~p ~w",[FName, [DI#doc_info.high_seq || DI <- DocInfos]]),
-            [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}
-                || #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos,
-                Pass <- Passes, Pass == true]
+            [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]}
+                || {Pass, #doc{revs={RevPos,[RevId|_]}}}
+                <- lists:zip(Passes, Docs), Pass == true]
         end;
     _Else ->
         throw({bad_request,
@@ -195,12 +206,12 @@ keep_sending_changes(Args, Callback, Db,
 end_sending_changes(Callback, EndSeq, ResponseType) ->
     Callback({stop, EndSeq}, ResponseType).
 
-changes_enumerator(DocInfos, {Db, _, _, FilterFun, Callback, "continuous",
+changes_enumerator(DocInfo, {Db, _, _, FilterFun, Callback, "continuous",
     Limit, IncludeDocs}) ->
 
-    [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_]
-        = DocInfos,
-    Results0 = FilterFun(DocInfos),
+    #doc_info{id=Id, high_seq=Seq,
+            revs=[#rev_info{deleted=Del,rev=Rev}|_]} = DocInfo,
+    Results0 = FilterFun(DocInfo),
     Results = [Result || Result <- Results0, Result /= null],
     Go = if Limit =< 1 -> stop; true -> ok end,
     case Results of
@@ -215,12 +226,12 @@ changes_enumerator(DocInfos, {Db, _, _, 
                 IncludeDocs}
         }
     end;
-changes_enumerator(DocInfos, {Db, _, Prepend, FilterFun, Callback, ResponseType,
+changes_enumerator(DocInfo, {Db, _, Prepend, FilterFun, Callback, ResponseType,
     Limit, IncludeDocs}) ->
 
-    [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_]
-        = DocInfos,
-    Results0 = FilterFun(DocInfos),
+    #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}
+        = DocInfo,
+    Results0 = FilterFun(DocInfo),
     Results = [Result || Result <- Results0, Result /= null],
     Go = if Limit =< 1 -> stop; true -> ok end,
     case Results of

Modified: couchdb/trunk/src/couchdb/couch_db.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_db.erl?rev=946803&r1=946802&r2=946803&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_db.erl (original)
+++ couchdb/trunk/src/couchdb/couch_db.erl Thu May 20 21:47:51 2010
@@ -119,18 +119,47 @@ open_doc(Db, Id, Options) ->
     {ok, #doc{deleted=true}=Doc} ->
         case lists:member(deleted, Options) of
         true ->
-            {ok, Doc};
+            apply_open_options({ok, Doc},Options);
         false ->
             {not_found, deleted}
         end;
     Else ->
-        Else
+        apply_open_options(Else,Options)
+    end.
+
+apply_open_options({ok, Doc},Options) ->
+    apply_open_options2(Doc,Options);
+apply_open_options(Else,_Options) ->
+    Else.
+    
+apply_open_options2(Doc,[]) ->
+    {ok, Doc};
+apply_open_options2(#doc{atts=Atts,revs=Revs}=Doc,
+        [{atts_since, PossibleAncestors}|Rest]) ->
+    RevPos = find_ancestor_rev_pos(Revs, PossibleAncestors),
+    apply_open_options2(Doc#doc{atts=[A#att{data=
+        if AttPos>RevPos -> Data; true -> stub end} 
+        || #att{revpos=AttPos,data=Data}=A <- Atts]}, Rest);
+apply_open_options2(Doc,[_|Rest]) ->
+    apply_open_options2(Doc,Rest).
+
+
+find_ancestor_rev_pos({_, []}, _AttsSinceRevs) ->
+    0;
+find_ancestor_rev_pos(_DocRevs, []) ->
+    0;
+find_ancestor_rev_pos({RevPos, [RevId|Rest]}, AttsSinceRevs) ->
+    case lists:member({RevPos, RevId}, AttsSinceRevs) of
+    true ->
+        RevPos;
+    false ->
+        find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs)
     end.
 
 open_doc_revs(Db, Id, Revs, Options) ->
     couch_stats_collector:increment({couchdb, database_reads}),
-    [Result] = open_doc_revs_int(Db, [{Id, Revs}], Options),
-    Result.
+    [{ok, Results}] = open_doc_revs_int(Db, [{Id, Revs}], Options),
+    {ok, [apply_open_options(Result, Options) || Result <- Results]}.
 
 % Each returned result is a list of tuples:
 % {Id, MissingRevs, PossibleAncestors}
@@ -437,7 +466,7 @@ prep_and_validate_updates(Db, [DocBucket
         fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) ->       
             case couch_doc:has_stubs(Doc) of
             true ->
-                couch_doc:merge_doc(Doc, #doc{}); % will throw exception
+                couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
             false -> ok
             end,
             case Revs of
@@ -892,15 +921,16 @@ changes_since(Db, Style, StartSeq, Fun, 
 changes_since(Db, Style, StartSeq, Fun, Options, Acc) ->
     Wrapper = fun(DocInfo, _Offset, Acc2) ->
             #doc_info{revs=Revs} = DocInfo,
+            DocInfo2 =
             case Style of
-            main_only ->
-                Infos = [DocInfo];
+            main_only ->    
+                DocInfo;
             all_docs ->
-                % make each rev it's own doc info
-                Infos = [DocInfo#doc_info{revs=[RevInfo]} ||
-                    #rev_info{seq=RevSeq}=RevInfo <- Revs, StartSeq < RevSeq]
+                % remove revs before the seq
+                DocInfo#doc_info{revs=[RevInfo ||
+                    #rev_info{seq=RevSeq}=RevInfo <- Revs, StartSeq < RevSeq]}
             end,
-            Fun(Infos, Acc2)
+            Fun(DocInfo2, Acc2)
         end,
     {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree, 
         Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options),

Modified: couchdb/trunk/src/couchdb/couch_doc.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_doc.erl?rev=946803&r1=946802&r2=946803&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_doc.erl (original)
+++ couchdb/trunk/src/couchdb/couch_doc.erl Thu May 20 21:47:51 2010
@@ -17,7 +17,7 @@
 -export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]).
 -export([validate_docid/1]).
 -export([doc_from_multi_part_stream/2]).
--export([doc_to_multi_part_stream/6, len_doc_to_multi_part_stream/5]).
+-export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]).
 
 -include("couch_db.hrl").
 
@@ -73,31 +73,25 @@ to_json_meta(Meta) ->
         end, Meta).
 
 to_json_attachments(Attachments, Options) ->
-    RevPos = case lists:member(attachments, Options) of
-    true -> % return all the binaries
-        0;
-    false ->
-        % note the default is [], because this sorts higher than all numbers.
-        % and will return all the binaries.
-        couch_util:get_value(atts_after_revpos, Options, [])
-    end,
     to_json_attachments(
         Attachments,
-        RevPos,
+        lists:member(attachments, Options),
         lists:member(follows, Options),
         lists:member(att_encoding_info, Options)
     ).
 
-to_json_attachments([], _RevPosIncludeAfter, _DataToFollow, _ShowEncInfo) ->
+to_json_attachments([], _OutputData, _DataToFollow, _ShowEncInfo) ->
     [];
-to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow, ShowEncInfo) ->
+to_json_attachments(Atts, OutputData, DataToFollow, ShowEncInfo) ->
     AttProps = lists:map(
         fun(#att{disk_len=DiskLen, att_len=AttLen, encoding=Enc}=Att) ->
             {Att#att.name, {[
                 {<<"content_type">>, Att#att.type},
                 {<<"revpos">>, Att#att.revpos}
                 ] ++
-                if Att#att.revpos > RevPosIncludeAfter ->    
+                if not OutputData orelse Att#att.data == stub ->
+                    [{<<"length">>, DiskLen}, {<<"stub">>, true}];
+                true ->
                     if DataToFollow ->
                         [{<<"length">>, DiskLen}, {<<"follows">>,
true}];
                     true ->
@@ -108,9 +102,7 @@ to_json_attachments(Atts, RevPosIncludeA
                             att_to_bin(Att)
                         end,
                         [{<<"data">>, base64:encode(AttData)}]
-                    end;
-                true ->
-                    [{<<"length">>, DiskLen}, {<<"stub">>, true}]
+                    end
                 end ++
                     case {ShowEncInfo, Enc} of
                     {false, _} ->
@@ -383,16 +375,13 @@ fold_streamed_data(RcvFun, LenLeft, Fun,
     ResultAcc = Fun(Bin, Acc),
     fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc).
 
-len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos,
+len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts,
     SendEncodedAtts) ->
-    2 + % "--"
-    size(Boundary) +
-    36 + % "\r\ncontent-type: application/json\r\n\r\n"
-    iolist_size(JsonBytes) +
-    4 + % "\r\n--"
-    size(Boundary) +
-    + lists:foldl(fun(#att{revpos=RevPos} = Att, AccAttsSize) ->
-            if RevPos > AttsSinceRevPos ->
+    AttsSize = lists:foldl(fun(#att{data=Data} = Att, AccAttsSize) ->
+            case Data of
+            stub ->
+                AccAttsSize;
+            _ ->
                 AccAttsSize +  
                 4 + % "\r\n\r\n"
                 case SendEncodedAtts of
@@ -402,24 +391,41 @@ len_doc_to_multi_part_stream(Boundary, J
                     Att#att.disk_len
                 end +
                 4 + % "\r\n--"
-                size(Boundary);
-            true ->
-                AccAttsSize
+                size(Boundary)
             end
-        end, 0, Atts) +
-    2. % "--"
+        end, 0, Atts),
+    if AttsSize == 0 ->
+        iolist_size(JsonBytes);
+    true ->
+        2 + % "--"
+        size(Boundary) +
+        36 + % "\r\ncontent-type: application/json\r\n\r\n"
+        iolist_size(JsonBytes) +
+        4 + % "\r\n--"
+        size(Boundary) +
+        + AttsSize +
+        2 % "--"
+    end.
 
-doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos, WriteFun,
+doc_to_multi_part_stream(Boundary, JsonBytes, Atts, WriteFun,
     SendEncodedAtts) ->
-    WriteFun([<<"--", Boundary/binary,
-            "\r\ncontent-type: application/json\r\n\r\n">>,
-            JsonBytes, <<"\r\n--", Boundary/binary>>]),
-    atts_to_mp(Atts, Boundary, WriteFun, AttsSinceRevPos, SendEncodedAtts).
+    case lists:any(fun(#att{data=Data})-> Data /= stub end, Atts) of
+    true ->
+        WriteFun([<<"--", Boundary/binary,
+                "\r\ncontent-type: application/json\r\n\r\n">>,
+                JsonBytes, <<"\r\n--", Boundary/binary>>]),
+        atts_to_mp(Atts, Boundary, WriteFun, SendEncodedAtts);
+    false ->
+        WriteFun(JsonBytes)
+    end.
 
-atts_to_mp([], _Boundary, WriteFun, _AttsSinceRevPos, _SendEncAtts) ->
+atts_to_mp([], _Boundary, WriteFun, _SendEncAtts) ->
     WriteFun(<<"--">>);
-atts_to_mp([#att{revpos=RevPos} = Att | RestAtts], Boundary, WriteFun, 
-        AttsSinceRevPos, SendEncodedAtts) when RevPos > AttsSinceRevPos ->
+atts_to_mp([#att{data=stub} | RestAtts], Boundary, WriteFun,
+        SendEncodedAtts) ->
+    atts_to_mp(RestAtts, Boundary, WriteFun, SendEncodedAtts);
+atts_to_mp([Att | RestAtts], Boundary, WriteFun, 
+        SendEncodedAtts)  ->
     WriteFun(<<"\r\n\r\n">>),
     AttFun = case SendEncodedAtts of
     false ->
@@ -429,16 +435,15 @@ atts_to_mp([#att{revpos=RevPos} = Att | 
     end,
     AttFun(Att, fun(Data, ok) -> WriteFun(Data) end, ok),
     WriteFun(<<"\r\n--", Boundary/binary>>),
-    atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos, SendEncodedAtts);
-atts_to_mp([_ | RestAtts], Boundary, WriteFun, AttsSinceRevPos,
-    SendEncodedAtts) ->
-    atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos, SendEncodedAtts).
+    atts_to_mp(RestAtts, Boundary, WriteFun, SendEncodedAtts).
 
 
 doc_from_multi_part_stream(ContentType, DataFun) ->
+    Self = self(),
     Parser = spawn_link(fun() -> 
         couch_httpd:parse_multipart_request(ContentType, DataFun,
-                fun(Next)-> mp_parse_doc(Next, []) end)
+                fun(Next)-> mp_parse_doc(Next, []) end),
+        unlink(Self)
         end),
     Parser ! {get_doc_bytes, self()},
     receive {doc_bytes, DocBytes} -> ok end,

Modified: couchdb/trunk/src/couchdb/couch_httpd_db.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpd_db.erl?rev=946803&r1=946802&r2=946803&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpd_db.erl (original)
+++ couchdb/trunk/src/couchdb/couch_httpd_db.erl Thu May 20 21:47:51 2010
@@ -266,7 +266,6 @@ db_req(#httpd{method='POST',path_parts=[
             throw({bad_request,
                 "can't do a full commit ahead of current update_seq"});
         RequiredSeq > CommittedSeq ->
-            % user asked for an explicit sequence, don't commit any batches
             couch_db:ensure_full_commit(Db);
         true ->
             {ok, Db#db.instance_start_time}
@@ -564,14 +563,13 @@ db_doc_req(#httpd{method='GET'}=Req, Db,
         atts_since = AttsSince
     } = parse_doc_query(Req),
     case Revs of
-    [] ->
-        Doc = couch_doc_open(Db, DocId, Rev, Options),
+    [] ->    
         Options2 =
         if AttsSince /= nil ->
-            RevPos = find_ancestor_rev_pos(Doc#doc.revs, AttsSince),
-            [{atts_after_revpos, RevPos} | Options];
+            [{atts_since, AttsSince}, attachments | Options];
         true -> Options
         end,
+        Doc = couch_doc_open(Db, DocId, Rev, Options2),
         send_doc(Req, Doc, Options2);
     _ ->
         {ok, Results} = couch_db:open_doc_revs(Db, DocId, Revs, Options),
@@ -699,17 +697,6 @@ db_doc_req(#httpd{method='COPY'}=Req, Db
 db_doc_req(Req, _Db, _DocId) ->
     send_method_not_allowed(Req, "DELETE,GET,HEAD,POST,PUT,COPY").
 
-find_ancestor_rev_pos({_, []}, _AttsSinceRevs) ->
-    0;
-find_ancestor_rev_pos(_DocRevs, []) ->
-    0;
-find_ancestor_rev_pos({RevPos, [RevId|Rest]}, AttsSinceRevs) ->
-    case lists:member({RevPos, RevId}, AttsSinceRevs) of
-    true ->
-        RevPos;
-    false ->
-        find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs)
-    end.
 
 send_doc(Req, Doc, Options) ->
     case Doc#doc.meta of
@@ -727,8 +714,7 @@ send_doc(Req, Doc, Options) ->
 send_doc_efficiently(Req, #doc{atts=[]}=Doc, Headers, Options) ->
         send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options));
 send_doc_efficiently(Req, #doc{atts=Atts}=Doc, Headers, Options) ->
-    case lists:member(attachments, Options) orelse 
-        proplists:is_defined(atts_after_revpos, Options) of
+    case lists:member(attachments, Options) of
     true ->
         AcceptedTypes = case couch_httpd:header_value(Req, "Accept") of
             undefined       -> [];
@@ -740,14 +726,12 @@ send_doc_efficiently(Req, #doc{atts=Atts
         true ->
             Boundary = couch_uuids:random(),
             JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [follows|Options])),
-            AttsSinceRevPos = couch_util:get_value(atts_after_revpos, Options, 0),
-            Len = couch_doc:len_doc_to_multi_part_stream(Boundary,JsonBytes,Atts,
-                    AttsSinceRevPos,false),
+            Len = couch_doc:len_doc_to_multi_part_stream(Boundary,JsonBytes,
+                    Atts,false),
             CType = {<<"Content-Type">>, 
                     <<"multipart/related; boundary=\"", Boundary/binary, "\"">>},
             {ok, Resp} = start_response_length(Req, 200, [CType|Headers], Len),
             couch_doc:doc_to_multi_part_stream(Boundary,JsonBytes,Atts,
-                    AttsSinceRevPos,
                     fun(Data) -> couch_httpd:send(Resp, Data) end, false)
         end;
     false ->

Modified: couchdb/trunk/src/couchdb/couch_rep_writer.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_writer.erl?rev=946803&r1=946802&r2=946803&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_writer.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_writer.erl Thu May 20 21:47:51 2010
@@ -90,12 +90,12 @@ write_multi_part_doc(#http_db{headers=He
     JsonBytes = ?JSON_ENCODE(
         couch_doc:to_json_obj(
             Doc,
-            [follows, att_encoding_info, {atts_after_revpos, 0}]
+            [follows, att_encoding_info, attachments]
         )
     ),
     Boundary = couch_uuids:random(),
     Len = couch_doc:len_doc_to_multi_part_stream(
-        Boundary, JsonBytes, Atts, 0, true
+        Boundary, JsonBytes, Atts, true
     ),
     {ok, DataQueue} = couch_work_queue:new(1024*1024, 1000),
     _StreamerPid = spawn_link(
@@ -104,7 +104,6 @@ write_multi_part_doc(#http_db{headers=He
                 Boundary,
                 JsonBytes,
                 Atts,
-                0,
                 fun(Data) -> couch_work_queue:queue(DataQueue, Data) end,
                 true
             ),
@@ -116,7 +115,7 @@ write_multi_part_doc(#http_db{headers=He
         closed ->
             eof;
         {ok, Data} ->
-            {ok, iolist_to_binary(lists:reverse(Data)), Acc}
+            {ok, iolist_to_binary(Data), Acc}
         end
     end,
     Request = Db#http_db{

Modified: couchdb/trunk/src/couchdb/couch_work_queue.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_work_queue.erl?rev=946803&r1=946802&r2=946803&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_work_queue.erl (original)
+++ couchdb/trunk/src/couchdb/couch_work_queue.erl Thu May 20 21:47:51 2010
@@ -13,11 +13,11 @@
 -module(couch_work_queue).
 -behaviour(gen_server).
 
--export([new/2,queue/2,dequeue/1,close/1]).
+-export([new/2,queue/2,dequeue/1,dequeue/2,close/1]).
 -export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
 
 -record(q, {
-    buffer=[],
+    queue=queue:new(),
     blocked=[],
     max_size,
     max_items,
@@ -34,7 +34,10 @@ queue(Wq, Item) ->
     gen_server:call(Wq, {queue, Item}, infinity).
 
 dequeue(Wq) ->
-    try gen_server:call(Wq, dequeue, infinity)
+    dequeue(Wq, all).
+    
+dequeue(Wq, MaxItems) ->
+    try gen_server:call(Wq, {dequeue, MaxItems}, infinity)
     catch
         _:_ -> closed
     end.
@@ -48,13 +51,13 @@ init({MaxSize,MaxItems}) ->
 
 terminate(_Reason, #q{work_waiter=nil}) ->
     ok;
-terminate(_Reason, #q{work_waiter=WW}) ->
-    gen_server:reply(WW, closed).
+terminate(_Reason, #q{work_waiter={WWFrom, _}}) ->
+    gen_server:reply(WWFrom, closed).
     
 handle_call({queue, Item}, From, #q{work_waiter=nil}=Q0) ->
     Q = Q0#q{size=Q0#q.size + byte_size(term_to_binary(Item)),
                 items=Q0#q.items + 1,
-                buffer=[Item | Q0#q.buffer]},
+                queue=queue:in(Item, Q0#q.queue)},
     case (Q#q.size >= Q#q.max_size) orelse
             (Q#q.items >= Q#q.max_items) of
     true ->
@@ -62,24 +65,44 @@ handle_call({queue, Item}, From, #q{work
     false ->
         {reply, ok, Q}
     end;
-handle_call({queue, Item}, _From, #q{work_waiter=WW}=Q) ->
-    gen_server:reply(WW, {ok, [Item]}),
+handle_call({queue, Item}, _From, #q{work_waiter={WWFrom, _Max}}=Q) ->
+    gen_server:reply(WWFrom, {ok, [Item]}),
     {reply, ok, Q#q{work_waiter=nil}};
-handle_call(dequeue, _From, #q{work_waiter=WW}) when WW /= nil ->
+handle_call({dequeue, _Max}, _From, #q{work_waiter=WW}) when WW /= nil ->
     exit("Only one caller allowed to wait for work at a time");
-handle_call(dequeue, From, #q{items=0}=Q) ->
-    {noreply, Q#q{work_waiter=From}};
-handle_call(dequeue, _From, #q{buffer=Buff, max_size=MaxSize,
-        max_items=MaxItems, close_on_dequeue=Close}=Q) ->
-    [gen_server:reply(From, ok) || From <- Q#q.blocked],
-    Q2 = #q{max_size=MaxSize, max_items=MaxItems},
-    if Close ->
-        {stop, normal, {ok, Buff}, Q2};
+handle_call({dequeue, Max}, From, #q{items=0}=Q) ->
+    {noreply, Q#q{work_waiter={From, Max}}};
+handle_call({dequeue, Max}, _From, #q{queue=Queue, max_size=MaxSize,
+        max_items=MaxItems, items=Items,close_on_dequeue=Close}=Q) ->
+    if Max >= Items orelse Max == all ->
+        [gen_server:reply(From, ok) || From <- Q#q.blocked],
+        Q2 = #q{max_size=MaxSize, max_items=MaxItems},
+        if Close ->
+            {stop, normal, {ok, queue:to_list(Queue)}, Q2};
+        true ->
+            {reply, {ok, queue:to_list(Queue)}, Q2}
+        end;
     true ->
-        {reply, {ok, Buff}, #q{max_size=MaxSize, max_items=MaxItems}}
+        {DequeuedItems, Queue2, Blocked2} = 
+                dequeue_items(Max, Queue, Q#q.blocked, []),
+        {reply, {ok, DequeuedItems},
+                Q#q{items=Items-Max,blocked=Blocked2,queue=Queue2}}
     end.
 
-handle_cast(close, #q{buffer=[]}=Q) ->
+dequeue_items(0, Queue, Blocked, DequeuedAcc) ->
+    {lists:reverse(DequeuedAcc), Queue, Blocked};
+dequeue_items(NumItems, Queue, Blocked, DequeuedAcc) ->
+    {{value, Item}, Queue2} = queue:out(Queue),
+    case Blocked of
+    [] ->
+        Blocked2 = Blocked;
+    [From|Blocked2] ->
+        gen_server:reply(From, ok)
+    end,
+    dequeue_items(NumItems-1, Queue2, Blocked2, [Item | DequeuedAcc]).
+    
+
+handle_cast(close, #q{items=0}=Q) ->
     {stop, normal, Q};
 handle_cast(close, Q) ->
     {noreply, Q#q{close_on_dequeue=true}}.



Mime
View raw message