couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kocol...@apache.org
Subject svn commit: r803220 - /couchdb/trunk/src/couchdb/couch_rep_writer.erl
Date Tue, 11 Aug 2009 17:25:13 GMT
Author: kocolosk
Date: Tue Aug 11 17:25:13 2009
New Revision: 803220

URL: http://svn.apache.org/viewvc?rev=803220&view=rev
Log:
more work on _bulk_docs streaming during replication.

I think this effort is kind of a dead end, at least if we're serious about
that 4GB maximum document/attachment size.  Presently push replication will
have trouble with attachments larger than a few MB because of the
inactivity_timeout we've set in ibrowse.  We'll push the data, but then close
the connection after 30 seconds while the target is still decoding.

We should focus our efforts instead on a way to push attachments without Base64.
I don't know what to say about 4GB _documents_, I don't see how we could ever
really support those now even without replication.

Modified:
    couchdb/trunk/src/couchdb/couch_rep_writer.erl

Modified: couchdb/trunk/src/couchdb/couch_rep_writer.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_writer.erl?rev=803220&r1=803219&r2=803220&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_writer.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_writer.erl Tue Aug 11 17:25:13 2009
@@ -17,35 +17,89 @@
 -include("couch_db.hrl").
 
 -define (MAX_BYTES, 10000000).
+-define (MAX_CHUNK_SIZE, 65535).
 
 start_link(Parent, Target, Reader, _PostProps) ->
     {ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}.
 
 make_chunk(Data) when is_list(Data) ->
     make_chunk(list_to_binary(Data));
+make_chunk(Data) when size(Data) > ?MAX_CHUNK_SIZE ->
+    <<ChunkData:?MAX_CHUNK_SIZE/binary, Rest/binary>> = Data,
+    Chunk = {?MAX_CHUNK_SIZE, [ibrowse_lib:dec2hex(4, ?MAX_CHUNK_SIZE), "\r\n",
+        ChunkData, "\r\n"]},
+    [Chunk, Rest];
 make_chunk(Data) ->
     Size = size(Data),
-    {Size, [ibrowse_lib:dec2hex(8, Size), "\r\n", Data, "\r\n"]}.
+    [{Size, [ibrowse_lib:dec2hex(4, Size), "\r\n", Data, "\r\n"]}].
+
+upload_docs({start, Buffer}) ->
+    [{Size, Chunk}] = make_chunk(<<"{\"new_edits\":false, \"docs\":[">>),
+    % Buffer ! {self(), next_doc},
+    {ok, Chunk, {continue, Buffer, "", Size}};
+
+upload_docs({continue, _Buf, _Pre, ByteCount}) when ByteCount > ?MAX_BYTES ->
+    {ok, "2\r\n]}\r\n0\r\n\r\n", finish};
+upload_docs({continue, Buffer, Prepend, ByteCount}) ->
+    Buffer ! {self(), next_doc},
+    receive 
+    {ok, JsonDoc} ->
+        [{Size, Chunk} | MoreData] = make_chunk([Prepend, JsonDoc]),
+        {ok, Chunk, {multi_chunk, MoreData, Buffer, ",", ByteCount+Size}};
+    eof ->
+        {ok, "2\r\n]}\r\n0\r\n\r\n", finish};
+    timeout ->
+        ?LOG_DEBUG("sending a heartbeat to keep the connection open", []),
+        {ok, "1\r\n \r\n", {continue, Buffer, Prepend, ByteCount}}
+    end;
+
+upload_docs({multi_chunk, [], Buffer, Prepend, ByteCount}) ->
+    % Buffer ! {self(), next_doc},
+    upload_docs({continue, Buffer, Prepend, ByteCount});
+upload_docs({multi_chunk, [Data], Buffer, Prepend, ByteCount}) ->
+    [{Size, Chunk} | MoreData] = make_chunk(Data),
+    {ok, Chunk, {multi_chunk, MoreData, Buffer, Prepend, ByteCount+Size}};
 
-upload_docs({start, W, Docs}) ->
-    {Size, Chunk} = make_chunk(<<"{\"new_edits\":false, \"docs\":[">>),
-    {ok, Chunk, {continue, W, Docs, "", Size}};
-upload_docs({continue, W, Docs, _, ByteCount}) when ByteCount > ?MAX_BYTES ->
-    W ! {docs_remaining, length(Docs)},
-    {ok, "2\r\n]}\r\n", last_chunk};
-upload_docs({continue, W, [Doc|Rest], Prepend, ByteCount}) ->
-    JsonDoc = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs,attachments])),
-    {Size, Chunk} = make_chunk([Prepend, JsonDoc]),
-    {ok, Chunk, {continue, W, Rest, ",", ByteCount+Size}};
-upload_docs({continue, W, [], _, _}) ->
-    W ! {docs_remaining, 0},
-    {ok, "2\r\n]}\r\n", last_chunk};
-upload_docs(last_chunk) ->
-    {ok, "0\r\n\r\n", finish};
 upload_docs(finish) ->
     couch_util:should_flush(),
     eof.
-    
+
+encoding_worker([]) ->
+    receive {MiddleMan, next_doc} -> MiddleMan ! eof end,
+    ok;
+encoding_worker([Doc|Rest]) ->
+    JsonDoc = ?l2b(?JSON_ENCODE(couch_doc:to_json_obj(Doc,[revs,attachments]))),
+    receive {MiddleMan, next_doc} -> MiddleMan ! {ok, JsonDoc} end,
+    encoding_worker(Rest).
+
+% needed because upload_docs is inside a gen_server so it can't have a mailbox
+middle_man(EncodingWorker) ->
+    receive {Uploader, next_doc} ->
+        receive
+        {ok, JsonDoc} ->
+            EncodingWorker ! {self(), next_doc},
+            Uploader ! {ok, JsonDoc},
+            middle_man(EncodingWorker);
+        eof ->
+            Uploader ! eof,
+            ok
+        after 5000 ->
+            Uploader ! timeout,
+            middle_man(EncodingWorker)
+        end
+    end.
+
+request_loop(Ref, Request, Acc) ->
+    receive
+    {'DOWN', Ref, _, _, normal} ->
+        lists:flatten(lists:reverse(Acc));
+    {'DOWN', Ref, _, _, Reason} ->
+        exit(Reason)
+    after 0 ->
+        ErrorsJson = couch_rep_httpc:request(Request),
+        request_loop(Ref, Request, [ErrorsJson|Acc])
+    end.
+
 writer_loop(Parent, Reader, Target) ->
     case couch_rep_reader:next(Reader) of
     {complete, FinalSeq} ->
@@ -53,7 +107,7 @@
         ok;
     {HighSeq, Docs} ->
         DocCount = length(Docs),
-        try write_docs(Target, Docs, []) of
+        try write_docs(Target, Docs) of
         {ok, []} ->
             Parent ! {update_stats, docs_written, DocCount};
         {ok, Errors} ->
@@ -71,13 +125,16 @@
         writer_loop(Parent, Reader, Target)
     end.
 
-write_docs(#http_db{} = Db, Docs, ErrorsAcc) ->
-    ErrorsJson = couch_rep_httpc:request(Db#http_db{
+write_docs(#http_db{} = Db, Docs) ->
+    {Worker, Ref} = spawn_monitor(fun() -> encoding_worker(Docs) end),
+    Pid = spawn_link(fun() -> Worker ! {self(),next_doc}, middle_man(Worker) end),
+    Request = Db#http_db{
         resource = "_bulk_docs",
         method = post,
-        body = {fun upload_docs/1, {start, self(), Docs}},
+        body = {fun upload_docs/1, {start, Pid}},
         headers = [{"transfer-encoding", "chunked"} | Db#http_db.headers]
-    }),
+    },
+    ErrorsJson = request_loop(Ref, Request, []),
     ErrorsList =
     lists:map(
         fun({Props}) ->
@@ -88,12 +145,6 @@
             Reason = proplists:get_value(<<"reason">>, Props),
             {{Id, Rev}, {ErrId, Reason}}
         end, ErrorsJson),
-    receive
-    {docs_remaining, 0} ->
-        {ok, lists:flatten([ErrorsList|ErrorsAcc])};
-    {docs_remaining, N} ->
-        MoreDocs = lists:nthtail(length(Docs)-N, Docs),
-        write_docs(Db, MoreDocs, [ErrorsList|ErrorsAcc])
-    end;
-write_docs(Db, Docs, _) ->
+    {ok, ErrorsList};
+write_docs(Db, Docs) ->
     couch_db:update_docs(Db, Docs, [], replicated_changes).



Mime
View raw message