Return-Path: Delivered-To: apmail-couchdb-commits-archive@www.apache.org Received: (qmail 46862 invoked from network); 14 Jun 2010 14:47:22 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 14 Jun 2010 14:47:22 -0000 Received: (qmail 72820 invoked by uid 500); 14 Jun 2010 13:47:21 -0000 Delivered-To: apmail-couchdb-commits-archive@couchdb.apache.org Received: (qmail 72789 invoked by uid 500); 14 Jun 2010 13:47:21 -0000 Mailing-List: contact commits-help@couchdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@couchdb.apache.org Delivered-To: mailing list commits@couchdb.apache.org Received: (qmail 72782 invoked by uid 99); 14 Jun 2010 13:47:21 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Jun 2010 13:47:21 +0000 X-ASF-Spam-Status: No, hits=-1753.9 required=10.0 tests=ALL_TRUSTED,AWL X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Jun 2010 13:47:20 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 025EB238899C; Mon, 14 Jun 2010 13:46:35 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r954475 - in /couchdb/branches/0.11.x/src/couchdb: couch_db.erl couch_db.hrl couch_doc.erl couch_httpd.erl couch_httpd_db.erl couch_rep_writer.erl Date: Mon, 14 Jun 2010 13:46:34 -0000 To: commits@couchdb.apache.org From: jan@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100614134635.025EB238899C@eris.apache.org> Author: jan Date: Mon Jun 14 13:46:34 2010 New Revision: 954475 URL: http://svn.apache.org/viewvc?rev=954475&view=rev Log: More work to allow for streaming attachment replication. Modified: couchdb/branches/0.11.x/src/couchdb/couch_db.erl couchdb/branches/0.11.x/src/couchdb/couch_db.hrl couchdb/branches/0.11.x/src/couchdb/couch_doc.erl couchdb/branches/0.11.x/src/couchdb/couch_httpd.erl couchdb/branches/0.11.x/src/couchdb/couch_httpd_db.erl couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl Modified: couchdb/branches/0.11.x/src/couchdb/couch_db.erl URL: http://svn.apache.org/viewvc/couchdb/branches/0.11.x/src/couchdb/couch_db.erl?rev=954475&r1=954474&r2=954475&view=diff ============================================================================== --- couchdb/branches/0.11.x/src/couchdb/couch_db.erl (original) +++ couchdb/branches/0.11.x/src/couchdb/couch_db.erl Mon Jun 14 13:46:34 2010 @@ -523,7 +523,7 @@ prep_and_validate_replicated_updates(Db, fun(Doc, {AccPrepped2, AccErrors2}) -> case couch_doc:has_stubs(Doc) of true -> - couch_doc:merge_doc(Doc, #doc{}); % will throw exception + couch_doc:merge_stubs(Doc, #doc{}); % will throw exception false -> ok end, case validate_doc_update(Db, Doc, fun() -> nil end) of Modified: couchdb/branches/0.11.x/src/couchdb/couch_db.hrl URL: http://svn.apache.org/viewvc/couchdb/branches/0.11.x/src/couchdb/couch_db.hrl?rev=954475&r1=954474&r2=954475&view=diff ============================================================================== --- couchdb/branches/0.11.x/src/couchdb/couch_db.hrl (original) +++ couchdb/branches/0.11.x/src/couchdb/couch_db.hrl Mon Jun 14 13:46:34 2010 @@ -280,3 +280,4 @@ filter = "", include_docs = false }). + Modified: couchdb/branches/0.11.x/src/couchdb/couch_doc.erl URL: http://svn.apache.org/viewvc/couchdb/branches/0.11.x/src/couchdb/couch_doc.erl?rev=954475&r1=954474&r2=954475&view=diff ============================================================================== --- couchdb/branches/0.11.x/src/couchdb/couch_doc.erl (original) +++ couchdb/branches/0.11.x/src/couchdb/couch_doc.erl Mon Jun 14 13:46:34 2010 @@ -375,8 +375,7 @@ fold_streamed_data(RcvFun, LenLeft, Fun, ResultAcc = Fun(Bin, Acc), fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc). -len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, - SendEncodedAtts) -> +len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, SendEncodedAtts) -> AttsSize = lists:foldl(fun(#att{data=Data} = Att, AccAttsSize) -> case Data of stub -> @@ -395,16 +394,18 @@ len_doc_to_multi_part_stream(Boundary, J end end, 0, Atts), if AttsSize == 0 -> - iolist_size(JsonBytes); + {<<"application/json">>, iolist_size(JsonBytes)}; true -> - 2 + % "--" - size(Boundary) + - 36 + % "\r\ncontent-type: application/json\r\n\r\n" - iolist_size(JsonBytes) + - 4 + % "\r\n--" - size(Boundary) + - + AttsSize + - 2 % "--" + {<<"multipart/related; boundary=\"", Boundary/binary, "\"">>, + 2 + % "--" + size(Boundary) + + 36 + % "\r\ncontent-type: application/json\r\n\r\n" + iolist_size(JsonBytes) + + 4 + % "\r\n--" + size(Boundary) + + + AttsSize + + 2 % "--" + } end. doc_to_multi_part_stream(Boundary, JsonBytes, Atts, WriteFun, @@ -446,26 +447,30 @@ doc_from_multi_part_stream(ContentType, unlink(Self) end), Parser ! {get_doc_bytes, self()}, - receive {doc_bytes, DocBytes} -> ok end, - Doc = from_json_obj(?JSON_DECODE(DocBytes)), - % go through the attachments looking for 'follows' in the data, - % replace with function that reads the data from MIME stream. - ReadAttachmentDataFun = fun() -> - Parser ! {get_bytes, self()}, - receive {bytes, Bytes} -> Bytes end - end, - Atts2 = lists:map( - fun(#att{data=follows}=A) -> - A#att{data=ReadAttachmentDataFun}; - (A) -> - A - end, Doc#doc.atts), - Doc#doc{atts=Atts2}. + receive + {doc_bytes, DocBytes} -> + Doc = from_json_obj(?JSON_DECODE(DocBytes)), + % go through the attachments looking for 'follows' in the data, + % replace with function that reads the data from MIME stream. + ReadAttachmentDataFun = fun() -> + Parser ! {get_bytes, self()}, + receive {bytes, Bytes} -> Bytes end + end, + Atts2 = lists:map( + fun(#att{data=follows}=A) -> + A#att{data=ReadAttachmentDataFun}; + (A) -> + A + end, Doc#doc.atts), + {ok, Doc#doc{atts=Atts2}} + end. mp_parse_doc({headers, H}, []) -> - {"application/json", _} = proplists:get_value("content-type", H), - fun (Next) -> - mp_parse_doc(Next, []) + case proplists:get_value("content-type", H) of + {"application/json", _} -> + fun (Next) -> + mp_parse_doc(Next, []) + end end; mp_parse_doc({body, Bytes}, AccBytes) -> fun (Next) -> Modified: couchdb/branches/0.11.x/src/couchdb/couch_httpd.erl URL: http://svn.apache.org/viewvc/couchdb/branches/0.11.x/src/couchdb/couch_httpd.erl?rev=954475&r1=954474&r2=954475&view=diff ============================================================================== --- couchdb/branches/0.11.x/src/couchdb/couch_httpd.erl (original) +++ couchdb/branches/0.11.x/src/couchdb/couch_httpd.erl Mon Jun 14 13:46:34 2010 @@ -746,13 +746,15 @@ parse_multipart_request(ContentType, Dat nil_callback(_Data)-> fun(Next) -> nil_callback(Next) end. -get_boundary(ContentType) -> - {"multipart/" ++ _, Opts} = mochiweb_util:parse_header(ContentType), +get_boundary({"multipart/" ++ _, Opts}) -> case proplists:get_value("boundary", Opts) of S when is_list(S) -> S - end. - + end; +get_boundary(ContentType) -> + {"multipart/" ++ _ , Opts} = mochiweb_util:parse_header(ContentType), + get_boundary({"multipart/", Opts}). + split_header(<<>>) -> Modified: couchdb/branches/0.11.x/src/couchdb/couch_httpd_db.erl URL: http://svn.apache.org/viewvc/couchdb/branches/0.11.x/src/couchdb/couch_httpd_db.erl?rev=954475&r1=954474&r2=954475&view=diff ============================================================================== --- couchdb/branches/0.11.x/src/couchdb/couch_httpd_db.erl (original) +++ couchdb/branches/0.11.x/src/couchdb/couch_httpd_db.erl Mon Jun 14 13:46:34 2010 @@ -403,7 +403,7 @@ db_req(#httpd{method='POST',path_parts=[ couch_doc:revs_to_strs(PossibleAncestors)}] end}} end, Results), - send_json(Req, {[{missing, {Results2}}]}); + send_json(Req, {Results2}); db_req(#httpd{path_parts=[_,<<"_revs_diff">>]}=Req, _Db) -> send_method_not_allowed(Req, "POST"); @@ -573,27 +573,36 @@ db_doc_req(#httpd{method='GET'}=Req, Db, send_doc(Req, Doc, Options2); _ -> {ok, Results} = couch_db:open_doc_revs(Db, DocId, Revs, Options), - {ok, Resp} = start_json_response(Req, 200), - send_chunk(Resp, "["), - % We loop through the docs. The first time through the separator - % is whitespace, then a comma on subsequent iterations. - lists:foldl( - fun(Result, AccSeparator) -> - case Result of - {ok, Doc} -> - JsonDoc = couch_doc:to_json_obj(Doc, Options), - Json = ?JSON_ENCODE({[{ok, JsonDoc}]}), - send_chunk(Resp, AccSeparator ++ Json); - {{not_found, missing}, RevId} -> - RevStr = couch_doc:rev_to_str(RevId), - Json = ?JSON_ENCODE({[{"missing", RevStr}]}), - send_chunk(Resp, AccSeparator ++ Json) + AcceptedTypes = case couch_httpd:header_value(Req, "Accept") of + undefined -> []; + AcceptHeader -> string:tokens(AcceptHeader, "; ") + end, + case lists:member("multipart/mixed", AcceptedTypes) of + false -> + {ok, Resp} = start_json_response(Req, 200), + send_chunk(Resp, "["), + % We loop through the docs. The first time through the separator + % is whitespace, then a comma on subsequent iterations. + lists:foldl( + fun(Result, AccSeparator) -> + case Result of + {ok, Doc} -> + JsonDoc = couch_doc:to_json_obj(Doc, Options), + Json = ?JSON_ENCODE({[{ok, JsonDoc}]}), + send_chunk(Resp, AccSeparator ++ Json); + {{not_found, missing}, RevId} -> + RevStr = couch_doc:rev_to_str(RevId), + Json = ?JSON_ENCODE({[{"missing", RevStr}]}), + send_chunk(Resp, AccSeparator ++ Json) + end, + "," % AccSeparator now has a comma end, - "," % AccSeparator now has a comma - end, - "", Results), - send_chunk(Resp, "]"), - end_json_response(Resp) + "", Results), + send_chunk(Resp, "]"), + end_json_response(Resp); + true -> + send_docs_multipart(Req, Results, Options) + end end; db_doc_req(#httpd{method='POST'}=Req, Db, DocId) -> @@ -647,13 +656,13 @@ db_doc_req(#httpd{method='PUT'}=Req, Db, Loc = absolute_uri(Req, "/" ++ ?b2l(Db#db.name) ++ "/" ++ ?b2l(DocId)), RespHeaders = [{"Location", Loc}], - case couch_httpd:header_value(Req, "Content-Type") of - ("multipart/related" ++ _Rest) = ContentType-> - Doc0 = couch_doc:doc_from_multi_part_stream(ContentType, + case couch_util:to_list(couch_httpd:header_value(Req, "Content-Type")) of + ("multipart/related;" ++ _) = ContentType -> + {ok, Doc0} = couch_doc:doc_from_multi_part_stream(ContentType, fun() -> receive_request_data(Req) end), Doc = couch_doc_from_req(Req, DocId, Doc0), update_doc(Req, Db, DocId, Doc, RespHeaders, UpdateType); - _ -> + _Else -> case couch_httpd:qs_value(Req, "batch") of "ok" -> % batch @@ -672,7 +681,8 @@ db_doc_req(#httpd{method='PUT'}=Req, Db, ]}); _Normal -> % normal - Doc = couch_doc_from_req(Req, DocId, couch_httpd:json_body(Req)), + Body = couch_httpd:json_body(Req), + Doc = couch_doc_from_req(Req, DocId, Body), update_doc(Req, Db, DocId, Doc, RespHeaders, UpdateType) end end; @@ -725,11 +735,11 @@ send_doc_efficiently(Req, #doc{atts=Atts send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options)); true -> Boundary = couch_uuids:random(), - JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [follows|Options])), - Len = couch_doc:len_doc_to_multi_part_stream(Boundary,JsonBytes, - Atts,false), - CType = {<<"Content-Type">>, - <<"multipart/related; boundary=\"", Boundary/binary, "\"">>}, + JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, + [attachments, follows|Options])), + {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream( + Boundary,JsonBytes, Atts,false), + CType = {<<"Content-Type">>, ContentType}, {ok, Resp} = start_response_length(Req, 200, [CType|Headers], Len), couch_doc:doc_to_multi_part_stream(Boundary,JsonBytes,Atts, fun(Data) -> couch_httpd:send(Resp, Data) end, false) @@ -738,7 +748,35 @@ send_doc_efficiently(Req, #doc{atts=Atts send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options)) end. - +send_docs_multipart(Req, Results, Options) -> + OuterBoundary = couch_uuids:random(), + InnerBoundary = couch_uuids:random(), + CType = {"Content-Type", + "multipart/mixed; boundary=\"" ++ ?b2l(OuterBoundary) ++ "\""}, + {ok, Resp} = start_chunked_response(Req, 200, [CType]), + couch_httpd:send_chunk(Resp, <<"--", OuterBoundary/binary>>), + lists:foreach( + fun({ok, #doc{atts=Atts}=Doc}) -> + JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, + [attachments,follows|Options])), + {ContentType, _Len} = couch_doc:len_doc_to_multi_part_stream( + InnerBoundary, JsonBytes, Atts, false), + couch_httpd:send_chunk(Resp, <<"\r\nContent-Type: ", + ContentType/binary, "\r\n\r\n">>), + couch_doc:doc_to_multi_part_stream(InnerBoundary, JsonBytes, Atts, + fun(Data) -> couch_httpd:send_chunk(Resp, Data) + end, false), + couch_httpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>); + ({{not_found, missing}, RevId}) -> + RevStr = couch_doc:rev_to_str(RevId), + Json = ?JSON_ENCODE({[{"missing", RevStr}]}), + couch_httpd:send_chunk(Resp, + [<<"\r\nContent-Type: application/json; error=\"true\"\r\n\r\n">>, + Json, + <<"\r\n--", OuterBoundary/binary>>]) + end, Results), + couch_httpd:send_chunk(Resp, <<"--">>), + couch_httpd:last_chunk(Resp). receive_request_data(Req) -> {couch_httpd:recv(Req, 0), fun() -> receive_request_data(Req) end}. @@ -791,6 +829,8 @@ couch_doc_from_req(Req, DocId, #doc{revs case extract_header_rev(Req, ExplicitDocRev) of missing_rev -> Revs2 = {0, []}; + ExplicitDocRev -> + Revs2 = Revs; {Pos, Rev} -> Revs2 = {Pos, [Rev]} end, Modified: couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl URL: http://svn.apache.org/viewvc/couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl?rev=954475&r1=954474&r2=954475&view=diff ============================================================================== --- couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl (original) +++ couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl Mon Jun 14 13:46:34 2010 @@ -94,7 +94,7 @@ write_multi_part_doc(#http_db{headers=He ) ), Boundary = couch_uuids:random(), - Len = couch_doc:len_doc_to_multi_part_stream( + {_ContentType, Len} = couch_doc:len_doc_to_multi_part_stream( Boundary, JsonBytes, Atts, true ), {ok, DataQueue} = couch_work_queue:new(1024*1024, 1000),