couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kocol...@apache.org
Subject svn commit: r803303 - /couchdb/trunk/src/couchdb/couch_rep_writer.erl
Date Tue, 11 Aug 2009 21:14:53 GMT
Author: kocolosk
Date: Tue Aug 11 21:14:53 2009
New Revision: 803303

URL: http://svn.apache.org/viewvc?rev=803303&view=rev
Log:
roll back streaming _bulk_docs b/c of a race condition

Modified:
    couchdb/trunk/src/couchdb/couch_rep_writer.erl

Modified: couchdb/trunk/src/couchdb/couch_rep_writer.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_writer.erl?rev=803303&r1=803302&r2=803303&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_writer.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_writer.erl Tue Aug 11 21:14:53 2009
@@ -16,90 +16,9 @@
 
 -include("couch_db.hrl").
 
--define (MAX_BYTES, 10000000).
--define (MAX_CHUNK_SIZE, 65535).
-
 start_link(Parent, Target, Reader, _PostProps) ->
     {ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}.
 
-make_chunk(Data) when is_list(Data) ->
-    make_chunk(list_to_binary(Data));
-make_chunk(Data) when size(Data) > ?MAX_CHUNK_SIZE ->
-    <<ChunkData:?MAX_CHUNK_SIZE/binary, Rest/binary>> = Data,
-    Chunk = {?MAX_CHUNK_SIZE, [ibrowse_lib:dec2hex(4, ?MAX_CHUNK_SIZE), "\r\n",
-        ChunkData, "\r\n"]},
-    [Chunk, Rest];
-make_chunk(Data) ->
-    Size = size(Data),
-    [{Size, [ibrowse_lib:dec2hex(4, Size), "\r\n", Data, "\r\n"]}].
-
-upload_docs({start, Buffer}) ->
-    [{Size, Chunk}] = make_chunk(<<"{\"new_edits\":false, \"docs\":[">>),
-    % Buffer ! {self(), next_doc},
-    {ok, Chunk, {continue, Buffer, "", Size}};
-
-upload_docs({continue, _Buf, _Pre, ByteCount}) when ByteCount > ?MAX_BYTES ->
-    {ok, "2\r\n]}\r\n0\r\n\r\n", finish};
-upload_docs({continue, Buffer, Prepend, ByteCount}) ->
-    Buffer ! {self(), next_doc},
-    receive 
-    {ok, JsonDoc} ->
-        [{Size, Chunk} | MoreData] = make_chunk([Prepend, JsonDoc]),
-        {ok, Chunk, {multi_chunk, MoreData, Buffer, ",", ByteCount+Size}};
-    eof ->
-        {ok, "2\r\n]}\r\n0\r\n\r\n", finish};
-    timeout ->
-        ?LOG_DEBUG("sending a heartbeat to keep the connection open", []),
-        {ok, "1\r\n \r\n", {continue, Buffer, Prepend, ByteCount}}
-    end;
-
-upload_docs({multi_chunk, [], Buffer, Prepend, ByteCount}) ->
-    % Buffer ! {self(), next_doc},
-    upload_docs({continue, Buffer, Prepend, ByteCount});
-upload_docs({multi_chunk, [Data], Buffer, Prepend, ByteCount}) ->
-    [{Size, Chunk} | MoreData] = make_chunk(Data),
-    {ok, Chunk, {multi_chunk, MoreData, Buffer, Prepend, ByteCount+Size}};
-
-upload_docs(finish) ->
-    couch_util:should_flush(),
-    eof.
-
-encoding_worker([]) ->
-    receive {MiddleMan, next_doc} -> MiddleMan ! eof end,
-    ok;
-encoding_worker([Doc|Rest]) ->
-    JsonDoc = ?l2b(?JSON_ENCODE(couch_doc:to_json_obj(Doc,[revs,attachments]))),
-    receive {MiddleMan, next_doc} -> MiddleMan ! {ok, JsonDoc} end,
-    encoding_worker(Rest).
-
-% needed because upload_docs is inside a gen_server so it can't have a mailbox
-middle_man(EncodingWorker) ->
-    receive {Uploader, next_doc} ->
-        receive
-        {ok, JsonDoc} ->
-            EncodingWorker ! {self(), next_doc},
-            Uploader ! {ok, JsonDoc},
-            middle_man(EncodingWorker);
-        eof ->
-            Uploader ! eof,
-            ok
-        after 5000 ->
-            Uploader ! timeout,
-            middle_man(EncodingWorker)
-        end
-    end.
-
-request_loop(Ref, Request, Acc) ->
-    receive
-    {'DOWN', Ref, _, _, normal} ->
-        lists:flatten(lists:reverse(Acc));
-    {'DOWN', Ref, _, _, Reason} ->
-        exit(Reason)
-    after 0 ->
-        ErrorsJson = couch_rep_httpc:request(Request),
-        request_loop(Ref, Request, [ErrorsJson|Acc])
-    end.
-
 writer_loop(Parent, Reader, Target) ->
     case couch_rep_reader:next(Reader) of
     {complete, FinalSeq} ->
@@ -121,20 +40,16 @@
         end,
         Parent ! {writer_checkpoint, HighSeq},
         couch_rep_att:cleanup(),
-        couch_util:should_flush(),
         writer_loop(Parent, Reader, Target)
     end.
 
 write_docs(#http_db{} = Db, Docs) ->
-    {Worker, Ref} = spawn_monitor(fun() -> encoding_worker(Docs) end),
-    Pid = spawn_link(fun() -> Worker ! {self(),next_doc}, middle_man(Worker) end),
-    Request = Db#http_db{
+    JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
+    ErrorsJson = couch_rep_httpc:request(Db#http_db{
         resource = "_bulk_docs",
         method = post,
-        body = {fun upload_docs/1, {start, Pid}},
-        headers = [{"transfer-encoding", "chunked"} | Db#http_db.headers]
-    },
-    ErrorsJson = request_loop(Ref, Request, []),
+        body = {[{new_edits, false}, {docs, JsonDocs}]}
+    }),
     ErrorsList =
     lists:map(
         fun({Props}) ->



Mime
View raw message