couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jch...@apache.org
Subject svn commit: r753448 [2/2] - in /couchdb/trunk: share/ share/www/script/ share/www/script/test/ src/couchdb/
Date Fri, 13 Mar 2009 22:15:35 GMT
Modified: couchdb/trunk/src/couchdb/couch_httpd.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpd.erl?rev=753448&r1=753447&r2=753448&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpd.erl (original)
+++ couchdb/trunk/src/couchdb/couch_httpd.erl Fri Mar 13 22:15:34 2009
@@ -16,7 +16,7 @@
 -export([start_link/0, stop/0, handle_request/4]).
 
 -export([header_value/2,header_value/3,qs_value/2,qs_value/3,qs/1,path/1,absolute_uri/2]).
--export([verify_is_server_admin/1,unquote/1,quote/1,recv/2,recv_chunked/4]).
+-export([verify_is_server_admin/1,unquote/1,quote/1,recv/2,recv_chunked/4,error_info/1]).
 -export([parse_form/1,json_body/1,body/1,doc_etag/1, make_etag/1, etag_respond/3]).
 -export([primary_header_value/2,partition/1,serve_file/3]).
 -export([start_chunked_response/3,send_chunk/2]).
@@ -166,7 +166,7 @@
     catch
         throw:Error ->
             send_error(HttpReq, Error);
-        Tag:Error ->
+        Tag:Error when Error ==foo ->
             ?LOG_ERROR("Uncaught error in HTTP request: ~p",[{Tag, Error}]),
             ?LOG_DEBUG("Stacktrace: ~p",[erlang:get_stacktrace()]),
             send_error(HttpReq, Error)
@@ -295,8 +295,8 @@
 json_body(Httpd) ->
     ?JSON_DECODE(body(Httpd)).
 
-doc_etag(#doc{revs=[DiskRev|_]}) ->
-    "\"" ++ binary_to_list(DiskRev) ++ "\"".
+doc_etag(#doc{revs={Start, [DiskRev|_]}}) ->
+    "\"" ++ ?b2l(couch_doc:rev_to_str({Start, DiskRev})) ++ "\"".
 
 make_etag(Term) ->
     <<SigInt:128/integer>> = erlang:md5(term_to_binary(Term)),
@@ -392,75 +392,55 @@
     send_chunk(Resp, []).
 
 
-send_error(Req, bad_request) ->
-    send_error(Req, 400, <<"bad_request">>, <<>>);
-send_error(Req, {query_parse_error, Reason}) ->
-    send_error(Req, 400, <<"query_parse_error">>, Reason);
-send_error(Req, {bad_request, Reason}) ->
-    send_error(Req, 400, <<"bad_request">>, Reason);
-send_error(Req, not_found) ->
-    send_error(Req, 404, <<"not_found">>, <<"Missing">>);
-send_error(Req, {not_found, Reason}) ->
-    send_error(Req, 404, <<"not_found">>, Reason);
-send_error(Req, conflict) ->
-    send_error(Req, 409, <<"conflict">>, <<"Document update conflict.">>);
-send_error(Req, {invalid_doc, Reason}) ->
-    send_error(Req, 400, <<"invalid_doc">>, Reason);
-send_error(Req, {forbidden, Msg}) ->
-    send_json(Req, 403,
-        {[{<<"error">>,  <<"forbidden">>},
-         {<<"reason">>, Msg}]});
-send_error(Req, {unauthorized, Msg}) ->
-    case couch_config:get("httpd", "WWW-Authenticate", nil) of
-    nil ->
-        Headers = [];
-    Type ->
-        Headers = [{"WWW-Authenticate", Type}]
-    end,
-    send_json(Req, 401, Headers,
-        {[{<<"error">>,  <<"unauthorized">>},
-         {<<"reason">>, Msg}]});
-send_error(Req, {http_error, Code, Headers, Error, Reason}) ->
-    send_json(Req, Code, Headers,
-        {[{<<"error">>, Error}, {<<"reason">>, Reason}]});
-send_error(Req, {user_error, {Props}}) ->
-    {Headers} = proplists:get_value(<<"headers">>, Props, {[]}),
-    send_json(Req,
-        proplists:get_value(<<"http_status">>, Props, 500),
-        Headers,
-        {[{<<"error">>, proplists:get_value(<<"error">>, Props)},
-            {<<"reason">>, proplists:get_value(<<"reason">>, Props)}]});
-send_error(Req, file_exists) ->
-    send_error(Req, 412, <<"file_exists">>, <<"The database could not be "
-        "created, the file already exists.">>);
-send_error(Req, {Error, Reason}) ->
-    send_error(Req, 500, Error, Reason);
-send_error(Req, Error) ->
-    send_error(Req, 500, <<"error">>, Error).
+error_info(bad_request) ->
+    {400, <<"bad_request">>, <<>>};
+error_info({bad_request, Reason}) ->
+    {400, <<"bad_request">>, Reason};
+error_info({query_parse_error, Reason}) ->
+    {400, <<"query_parse_error">>, Reason};
+error_info(not_found) ->
+    {404, <<"not_found">>, <<"Missing">>};
+error_info({not_found, Reason}) ->
+    {404, <<"not_found">>, Reason};
+error_info(conflict) ->
+    {409, <<"conflict">>, <<"Document update conflict.">>};
+error_info({forbidden, Msg}) ->
+    {403, <<"forbidden">>, Msg};
+error_info({unauthorized, Msg}) ->
+    {401, <<"unauthorized">>, Msg};
+error_info(file_exists) ->
+    {412, <<"file_exists">>, <<"The database could not be "
+        "created, the file already exists.">>};
+error_info({Error, Reason}) ->
+    {500, couch_util:to_binary(Error), couch_util:to_binary(Reason)};
+error_info(Error) ->
+    {500, <<"unknown_error">>, couch_util:to_binary(Error)}.
 
+send_error(Req, Error) ->
+    {Code, ErrorStr, ReasonStr} = error_info(Error),
+    if Code == 401 ->     
+        case couch_config:get("httpd", "WWW-Authenticate", nil) of
+        nil ->
+            Headers = [];
+        Type ->
+            Headers = [{"WWW-Authenticate", Type}]
+        end;
+    true ->
+        Headers = []
+    end,
+    send_error(Req, Code, Headers, ErrorStr, ReasonStr).
 
+send_error(Req, Code, ErrorStr, ReasonStr) ->
+    send_error(Req, Code, [], ErrorStr, ReasonStr).
+    
+send_error(Req, Code, Headers, ErrorStr, ReasonStr) ->
+    send_json(Req, Code, Headers,
+        {[{<<"error">>,  ErrorStr},
+         {<<"reason">>, ReasonStr}]}).
 
-send_error(Req, Code, Error, Msg) when is_atom(Error) ->
-    send_error(Req, Code, list_to_binary(atom_to_list(Error)), Msg);
-send_error(Req, Code, Error, Msg) when is_list(Msg) ->
-    case (catch list_to_binary(Msg)) of
-    Bin when is_binary(Bin) ->
-        send_error(Req, Code, Error, Bin);
-    _ ->
-        send_error(Req, Code, Error, io_lib:format("~p", [Msg]))
-    end;
-send_error(Req, Code, Error, Msg) when not is_binary(Error) ->
-    send_error(Req, Code, list_to_binary(io_lib:format("~p", [Error])), Msg);
-send_error(Req, Code, Error, Msg) when not is_binary(Msg) ->
-    send_error(Req, Code, Error, list_to_binary(io_lib:format("~p", [Msg])));
-send_error(Req, Code, Error, <<>>) ->
-    send_json(Req, Code, {[{<<"error">>, Error}]});
-send_error(Req, Code, Error, Msg) ->
-    send_json(Req, Code, {[{<<"error">>, Error}, {<<"reason">>, Msg}]}).
-    
-send_redirect(Req, Path) ->
-    Headers = [{"Location", couch_httpd:absolute_uri(Req, Path)}],
-    send_response(Req, 301, Headers, <<>>).
+ send_redirect(Req, Path) ->
+     Headers = [{"Location", couch_httpd:absolute_uri(Req, Path)}],
+     send_response(Req, 301, Headers, <<>>).
 
 negotiate_content_type(#httpd{mochi_req=MochiReq}) ->
     %% Determine the appropriate Content-Type header for a JSON response

Modified: couchdb/trunk/src/couchdb/couch_httpd_db.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpd_db.erl?rev=753448&r1=753447&r2=753448&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpd_db.erl (original)
+++ couchdb/trunk/src/couchdb/couch_httpd_db.erl Fri Mar 13 22:15:34 2009
@@ -22,8 +22,8 @@
 
 -record(doc_query_args, {
     options = [],
-    rev = "",
-    open_revs = ""
+    rev = nil,
+    open_revs = []
 }).
     
 % Database request handlers
@@ -89,13 +89,13 @@
 db_req(#httpd{method='POST',path_parts=[DbName]}=Req, Db) ->
     Doc = couch_doc:from_json_obj(couch_httpd:json_body(Req)),
     DocId = couch_util:new_uuid(),
-    {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=[]}, []),
+    {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId}, []),
     DocUrl = absolute_uri(Req, 
         binary_to_list(<<"/",DbName/binary,"/",DocId/binary>>)),
     send_json(Req, 201, [{"Location", DocUrl}], {[
         {ok, true},
         {id, DocId},
-        {rev, NewRev}
+        {rev, couch_doc:rev_to_str(NewRev)}
     ]});
 
 db_req(#httpd{path_parts=[_DbName]}=Req, _Db) ->
@@ -131,32 +131,59 @@
                     <<>> -> couch_util:new_uuid();
                     Id0 -> Id0
                 end,
-                Revs = case proplists:get_value(<<"_rev">>, ObjProps) of
-                    undefined -> [];
-                    Rev  -> [Rev]
+                case proplists:get_value(<<"_rev">>, ObjProps) of
+                undefined ->
+                    Revs = {0, []};
+                Rev  ->
+                    {Pos, RevId} = couch_doc:parse_rev(Rev),
+                    Revs = {Pos, [RevId]}
                 end,
                 Doc#doc{id=Id,revs=Revs}
             end,
             DocsArray),
-        {ok, ResultRevs} = couch_db:update_docs(Db, Docs, Options),
-
-        % output the results
-        DocResults = lists:zipwith(
-            fun(Doc, NewRev) ->
-                {[{<<"id">>, Doc#doc.id}, {<<"rev">>, NewRev}]}
-            end,
-            Docs, ResultRevs),
-        send_json(Req, 201, {[
-            {ok, true},
-            {new_revs, DocResults}
-        ]});
-
+        Options2 =
+        case proplists:get_value(<<"all_or_nothing">>, JsonProps) of
+        true  -> [all_or_nothing|Options];
+        _ -> Options
+        end,
+        case couch_db:update_docs(Db, Docs, Options2) of
+        {ok, Results} ->
+            % output the results
+            DocResults = lists:zipwith(
+                fun(Doc, {ok, NewRev}) ->
+                    {[{<<"id">>, Doc#doc.id}, {<<"rev">>, couch_doc:rev_to_str(NewRev)}]};
+                (Doc, Error) ->
+                    {_Code, Err, Msg} = couch_httpd:error_info(Error),
+                    % maybe we should add the http error code to the json?
+                    {[{<<"id">>, Doc#doc.id}, {<<"error">>, Err}, {"reason", Msg}]}
+                end,
+                Docs, Results),
+            send_json(Req, 201, DocResults);
+        {aborted, Errors} ->
+            ErrorsJson = 
+                lists:map(
+                    fun({{Id, Rev}, Error}) ->
+                        {_Code, Err, Msg} = couch_httpd:error_info(Error),
+                        {[{<<"id">>, Id},
+                            {<<"rev">>, couch_doc:rev_to_str(Rev)},
+                            {<<"error">>, Err},
+                            {"reason", Msg}]}
+                    end, Errors),
+            send_json(Req, 417, ErrorsJson)
+        end;
     false ->
         Docs = [couch_doc:from_json_obj(JsonObj) || JsonObj <- DocsArray],
-        ok = couch_db:update_docs(Db, Docs, Options, false),
-        send_json(Req, 201, {[
-            {ok, true}
-        ]})
+        {ok, Errors} = couch_db:update_docs(Db, Docs, Options, replicated_changes),
+        ErrorsJson = 
+            lists:map(
+                fun({{Id, Rev}, Error}) ->
+                    {_Code, Err, Msg} = couch_httpd:error_info(Error),
+                    {[{<<"id">>, Id},
+                        {<<"rev">>, couch_doc:rev_to_str(Rev)},
+                        {<<"error">>, Err},
+                        {"reason", Msg}]}
+                end, Errors),
+        send_json(Req, 201, ErrorsJson)
     end;
 db_req(#httpd{path_parts=[_,<<"_bulk_docs">>]}=Req, _Db) ->
     send_method_not_allowed(Req, "POST");
@@ -170,12 +197,12 @@
 
 db_req(#httpd{method='POST',path_parts=[_,<<"_purge">>]}=Req, Db) ->
     {IdsRevs} = couch_httpd:json_body(Req),
-    % validate the json input
-    [{_Id, [_|_]=_Revs} = IdRevs || IdRevs <- IdsRevs],
+    IdsRevs2 = [{Id, couch_doc:parse_revs(Revs)} || {Id, Revs} <- IdsRevs],
     
-    case couch_db:purge_docs(Db, IdsRevs) of
+    case couch_db:purge_docs(Db, IdsRevs2) of
     {ok, PurgeSeq, PurgedIdsRevs} ->
-        send_json(Req, 200, {[{<<"purge_seq">>, PurgeSeq}, {<<"purged">>, {PurgedIdsRevs}}]});
+        PurgedIdsRevs2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- PurgedIdsRevs],
+        send_json(Req, 200, {[{<<"purge_seq">>, PurgeSeq}, {<<"purged">>, {PurgedIdsRevs2}}]});
     Error ->
         throw(Error)
     end;
@@ -204,7 +231,7 @@
 
     {ok, Info} = couch_db:get_db_info(Db),
     CurrentEtag = couch_httpd:make_etag(proplists:get_value(update_seq, Info)),
-    couch_httpd:etag_respond(Req, CurrentEtag, fun() -> 
+    couch_httpd:etag_respond(Req, CurrentEtag, fun() ->
         TotalRowCount = proplists:get_value(doc_count, Info),
         FoldlFun = couch_httpd_view:make_view_fold_fun(Req, QueryArgs, CurrentEtag, Db,
             TotalRowCount, #view_fold_helper_funs{
@@ -227,14 +254,14 @@
                     deleted_conflict_revs=DelConflictRevs
                 } = DocInfo,
                 Json = {
-                    [{<<"rev">>, Rev}] ++
+                    [{<<"rev">>, couch_doc:rev_to_str(Rev)}] ++
                     case ConflictRevs of
                         []  ->  [];
-                        _   ->  [{<<"conflicts">>, ConflictRevs}]
+                        _   ->  [{<<"conflicts">>, couch_doc:rev_to_strs(ConflictRevs)}]
                     end ++
                     case DelConflictRevs of
                         []  ->  [];
-                        _   ->  [{<<"deleted_conflicts">>, DelConflictRevs}]
+                        _   ->  [{<<"deleted_conflicts">>, couch_doc:rev_to_strs(DelConflictRevs)}]
                     end ++
                     case Deleted of
                         true -> [{<<"deleted">>, true}];
@@ -251,9 +278,11 @@
 
 db_req(#httpd{method='POST',path_parts=[_,<<"_missing_revs">>]}=Req, Db) ->
     {JsonDocIdRevs} = couch_httpd:json_body(Req),
-    {ok, Results} = couch_db:get_missing_revs(Db, JsonDocIdRevs),
+    JsonDocIdRevs2 = [{Id, [couch_doc:parse_rev(RevStr) || RevStr <- RevStrs]} || {Id, RevStrs} <- JsonDocIdRevs],
+    {ok, Results} = couch_db:get_missing_revs(Db, JsonDocIdRevs2),
+    Results2 = [{Id, [couch_doc:rev_to_str(Rev) || Rev <- Revs]} || {Id, Revs} <- Results],
     send_json(Req, {[
-        {missing_revs, {Results}}
+        {missing_revs, {Results2}}
     ]});
 
 db_req(#httpd{path_parts=[_,<<"_missing_revs">>]}=Req, _Db) ->
@@ -271,6 +300,18 @@
 db_req(#httpd{path_parts=[_,<<"_admins">>]}=Req, _Db) ->
     send_method_not_allowed(Req, "PUT,GET");
 
+db_req(#httpd{method='PUT',path_parts=[_,<<"_revs_limit">>]}=Req,
+        Db) ->
+    Limit = couch_httpd:json_body(Req),
+    ok = couch_db:set_revs_limit(Db, Limit),
+    send_json(Req, {[{<<"ok">>, true}]});
+
+db_req(#httpd{method='GET',path_parts=[_,<<"_revs_limit">>]}=Req, Db) ->
+    send_json(Req, couch_db:get_revs_limit(Db));
+
+db_req(#httpd{path_parts=[_,<<"_revs_limit">>]}=Req, _Db) ->
+    send_method_not_allowed(Req, "PUT,GET");
+
 % Special case to enable using an unencoded slash in the URL of design docs, 
 % as slashes in document IDs must otherwise be URL encoded.
 db_req(#httpd{method='GET',mochi_req=MochiReq, path_parts=[DbName,<<"_design/",_/binary>>|_]}=Req, _Db) ->
@@ -334,7 +375,7 @@
             AdapterFun = fun(#full_doc_info{id=Id}=FullDocInfo, Offset, Acc) ->
                 case couch_doc:to_doc_info(FullDocInfo) of
                 #doc_info{deleted=false, rev=Rev} ->
-                    FoldlFun({{Id, Id}, {[{rev, Rev}]}}, Offset, Acc);
+                    FoldlFun({{Id, Id}, {[{rev, couch_doc:rev_to_str(Rev)}]}}, Offset, Acc);
                 #doc_info{deleted=true} ->
                     {ok, Acc}
                 end
@@ -358,9 +399,9 @@
                     DocInfo = (catch couch_db:get_doc_info(Db, Key)),
                     Doc = case DocInfo of
                     {ok, #doc_info{id=Id, rev=Rev, deleted=false}} = DocInfo ->
-                        {{Id, Id}, {[{rev, Rev}]}};
+                        {{Id, Id}, {[{rev, couch_doc:rev_to_str(Rev)}]}};
                     {ok, #doc_info{id=Id, rev=Rev, deleted=true}} = DocInfo ->
-                        {{Id, Id}, {[{rev, Rev}, {deleted, true}]}};
+                        {{Id, Id}, {[{rev, couch_doc:rev_to_str(Rev)}, {deleted, true}]}};
                     not_found ->
                         {{Key, error}, not_found};
                     _ ->
@@ -381,20 +422,12 @@
 
 
 
-
-
 db_doc_req(#httpd{method='DELETE'}=Req, Db, DocId) ->
-    case extract_header_rev(Req, couch_httpd:qs_value(Req, "rev")) of
-    missing_rev ->
-        couch_httpd:send_error(Req, 409, <<"missing_rev">>,
-            <<"Document rev/etag must be specified to delete">>);
-    RevToDelete ->
-        {ok, NewRev} = couch_db:delete_doc(Db, DocId, [RevToDelete]),
-        send_json(Req, 200, {[
-            {ok, true},
-            {id, DocId},
-            {rev, NewRev}
-            ]})
+    case couch_httpd:qs_value(Req, "rev") of
+    undefined ->
+        update_doc(Req, Db, DocId, {[{<<"_deleted">>,true}]});
+    Rev ->
+        update_doc(Req, Db, DocId, {[{<<"_rev">>, ?l2b(Rev)},{<<"_deleted">>,true}]})
     end;
 
 db_doc_req(#httpd{method='GET'}=Req, Db, DocId) ->
@@ -438,82 +471,31 @@
         end_json_response(Resp)
     end;
 
-db_doc_req(#httpd{method='POST'}=Req, Db, DocId) ->
-    Form = couch_httpd:parse_form(Req),
-    Rev = list_to_binary(proplists:get_value("_rev", Form)),
-    Doc = case couch_db:open_doc_revs(Db, DocId, [Rev], []) of
-        {ok, [{ok, Doc0}]}  -> Doc0#doc{revs=[Rev]};
-        {ok, [Error]}       -> throw(Error)
-    end,
-
-    NewAttachments = [
-        {validate_attachment_name(Name), {list_to_binary(ContentType), Content}} ||
-        {Name, {ContentType, _}, Content} <-
-        proplists:get_all_values("_attachments", Form)
-    ],
-    #doc{attachments=Attachments} = Doc,
-    NewDoc = Doc#doc{
-        attachments = Attachments ++ NewAttachments
-    },
-    {ok, NewRev} = couch_db:update_doc(Db, NewDoc, []),
-
-    send_json(Req, 201, [{"Etag", "\"" ++ NewRev ++ "\""}], {obj, [
-        {ok, true},
-        {id, DocId},
-        {rev, NewRev}
-    ]});
-
 db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) ->
-    Json = couch_httpd:json_body(Req),
-    Doc = couch_doc:from_json_obj(Json),
-    ExplicitRev =
-    case Doc#doc.revs of
-        [Rev0|_] -> Rev0;
-        [] -> undefined
-    end,
-    validate_attachment_names(Doc),
-    case couch_httpd:header_value(Req, "X-Couch-Full-Commit", "false") of
-    "true" ->
-        Options = [full_commit];
-    _ ->
-        Options = []
-    end,
-    case extract_header_rev(Req, ExplicitRev) of
-    missing_rev ->
-        Revs = [];
-    Rev ->
-        Revs = [Rev]
-    end,
-    {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=Revs}, Options),
-    send_json(Req, 201, [{"Etag", <<"\"", NewRev/binary, "\"">>}], {[
-        {ok, true},
-        {id, DocId},
-        {rev, NewRev}
-    ]});
+    update_doc(Req, Db, DocId, couch_httpd:json_body(Req));
 
 db_doc_req(#httpd{method='COPY'}=Req, Db, SourceDocId) ->
     SourceRev =
     case extract_header_rev(Req, couch_httpd:qs_value(Req, "rev")) of
-        missing_rev -> [];
+        missing_rev -> nil;
         Rev -> Rev
     end,
 
-    {TargetDocId, TargetRev} = parse_copy_destination_header(Req),
+    {TargetDocId, TargetRevs} = parse_copy_destination_header(Req),
 
     % open revision Rev or Current  
     Doc = couch_doc_open(Db, SourceDocId, SourceRev, []),
-
     % save new doc
-    {ok, NewTargetRev} = couch_db:update_doc(Db, Doc#doc{id=TargetDocId, revs=TargetRev}, []),
-
-    send_json(Req, 201, [{"Etag", "\"" ++ binary_to_list(NewTargetRev) ++ "\""}], {[
-        {ok, true},
-        {id, TargetDocId},
-        {rev, NewTargetRev}
-    ]});
+    case couch_db:update_doc(Db, Doc#doc{id=TargetDocId, revs=TargetRevs}, []) of
+    {ok, NewTargetRev} ->
+        send_json(Req, 201, [{"Etag", "\"" ++ ?b2l(couch_doc:rev_to_str(NewTargetRev)) ++ "\""}],
+            update_result_to_json({ok, NewTargetRev}));
+    Error ->
+        throw(Error)
+    end;
 
 db_doc_req(#httpd{method='MOVE'}=Req, Db, SourceDocId) ->
-    SourceRev =
+    SourceRev = {SourceRevPos, SourceRevId} =
     case extract_header_rev(Req, couch_httpd:qs_value(Req, "rev")) of
     missing_rev -> 
         throw({bad_request, "MOVE requires a specified rev parameter"
@@ -521,37 +503,68 @@
     Rev -> Rev
     end,
 
-    {TargetDocId, TargetRev} = parse_copy_destination_header(Req),
+    {TargetDocId, TargetRevs} = parse_copy_destination_header(Req),
     % open revision Rev or Current
     Doc = couch_doc_open(Db, SourceDocId, SourceRev, []),
 
     % save new doc & delete old doc in one operation
     Docs = [
-        Doc#doc{id=TargetDocId, revs=TargetRev},
-        #doc{id=SourceDocId, revs=[SourceRev], deleted=true}
+        #doc{id=SourceDocId, revs={SourceRevPos, [SourceRevId]}, deleted=true},
+        Doc#doc{id=TargetDocId, revs=TargetRevs}
         ],
-    {ok, ResultRevs} = couch_db:update_docs(Db, Docs, []),
+    {ok, [SourceResult, TargetResult]} = couch_db:update_docs(Db, Docs, []),
     
-    DocResults = lists:zipwith(
-        fun(FDoc, NewRev) ->
-            {[{id, FDoc#doc.id}, {rev, NewRev}]}
-        end,
-        Docs, ResultRevs),
     send_json(Req, 201, {[
-        {ok, true},
-        {new_revs, DocResults}
+        {SourceDocId, update_result_to_json(SourceResult)},
+        {TargetDocId, update_result_to_json(TargetResult)}
     ]});
 
 db_doc_req(Req, _Db, _DocId) ->
     send_method_not_allowed(Req, "DELETE,GET,HEAD,POST,PUT,COPY,MOVE").
 
+update_result_to_json({ok, NewRev}) ->
+    {[{rev, couch_doc:rev_to_str(NewRev)}]};
+update_result_to_json(Error) ->
+    {_Code, ErrorStr, Reason} = couch_httpd:error_info(Error),
+    {[{error, ErrorStr}, {reason, Reason}]}.
+
+
+update_doc(Req, Db, DocId, Json) ->
+    #doc{deleted=Deleted} = Doc = couch_doc:from_json_obj(Json),
+    validate_attachment_names(Doc),
+    ExplicitDocRev =
+    case Doc#doc.revs of
+        {Start,[RevId|_]} -> {Start, RevId};
+        _ -> undefined
+    end,
+    case extract_header_rev(Req, ExplicitDocRev) of
+    missing_rev ->
+        Revs = {0, []};
+    {Pos, Rev} ->
+        Revs = {Pos, [Rev]}
+    end,
+    
+    case couch_httpd:header_value(Req, "X-Couch-Full-Commit", "false") of
+    "true" ->
+        Options = [full_commit];
+    _ ->
+        Options = []
+    end,
+    {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=Revs}, Options),
+    NewRevStr = couch_doc:rev_to_str(NewRev),
+    send_json(Req, if Deleted -> 200; true -> 201 end,
+        [{"Etag", <<"\"", NewRevStr/binary, "\"">>}], {[
+            {ok, true},
+            {id, DocId},
+            {rev, NewRevStr}]}).
+
 % Useful for debugging
 % couch_doc_open(Db, DocId) ->
 %   couch_doc_open(Db, DocId, [], []).
 
 couch_doc_open(Db, DocId, Rev, Options) ->
     case Rev of
-    "" -> % open most recent rev
+    nil -> % open most recent rev
         case couch_db:open_doc(Db, DocId, Options) of
         {ok, Doc} ->
             Doc;
@@ -572,13 +585,13 @@
 db_attachment_req(#httpd{method='GET'}=Req, Db, DocId, FileNameParts) ->
     FileName = list_to_binary(mochiweb_util:join(lists:map(fun binary_to_list/1, FileNameParts),"/")),
     case couch_db:open_doc(Db, DocId, []) of
-    {ok, #doc{attachments=Attachments, revs=[LastRev|_OldRevs]}} ->
+    {ok, #doc{attachments=Attachments}=Doc} ->
         case proplists:get_value(FileName, Attachments) of
         undefined ->
             throw({not_found, "Document is missing attachment"});
         {Type, Bin} ->
             {ok, Resp} = start_chunked_response(Req, 200, [
-                {"ETag", binary_to_list(LastRev)},
+                {"ETag", couch_httpd:doc_etag(Doc)},
                 {"Cache-Control", "must-revalidate"},
                 {"Content-Type", binary_to_list(Type)}%,
                 % My understanding of http://www.faqs.org/rfcs/rfc2616.html
@@ -640,7 +653,7 @@
             #doc{id=DocId};
         Rev ->
             case couch_db:open_doc_revs(Db, DocId, [Rev], []) of
-            {ok, [{ok, Doc0}]}  -> Doc0#doc{revs=[Rev]};
+            {ok, [{ok, Doc0}]}  -> Doc0;
             {ok, [Error]}       -> throw(Error)
             end
     end,
@@ -653,7 +666,7 @@
     send_json(Req, case Method of 'DELETE' -> 200; _ -> 201 end, {[
         {ok, true},
         {id, DocId},
-        {rev, UpdatedRev}
+        {rev, couch_doc:rev_to_str(UpdatedRev)}
     ]});
 
 db_attachment_req(Req, _Db, _DocId, _FileNameParts) ->
@@ -682,25 +695,24 @@
             Options = [deleted_conflicts | Args#doc_query_args.options],
             Args#doc_query_args{options=Options};
         {"rev", Rev} ->
-            Args#doc_query_args{rev=list_to_binary(Rev)};
+            Args#doc_query_args{rev=couch_doc:parse_rev(Rev)};
         {"open_revs", "all"} ->
             Args#doc_query_args{open_revs=all};
         {"open_revs", RevsJsonStr} ->
             JsonArray = ?JSON_DECODE(RevsJsonStr),
-            Args#doc_query_args{open_revs=JsonArray};
+            Args#doc_query_args{open_revs=[couch_doc:parse_rev(Rev) || Rev <- JsonArray]};
         _Else -> % unknown key value pair, ignore.
             Args
         end
     end, #doc_query_args{}, couch_httpd:qs(Req)).
 
 
-
-extract_header_rev(Req, ExplicitRev) when is_list(ExplicitRev)->
-    extract_header_rev(Req, list_to_binary(ExplicitRev));
+extract_header_rev(Req, ExplicitRev) when is_binary(ExplicitRev) or is_list(ExplicitRev)->
+    extract_header_rev(Req, couch_doc:parse_rev(ExplicitRev));
 extract_header_rev(Req, ExplicitRev) ->
     Etag = case couch_httpd:header_value(Req, "If-Match") of
         undefined -> undefined;
-        Value -> list_to_binary(string:strip(Value, both, $"))
+        Value -> couch_doc:parse_rev(string:strip(Value, both, $"))
     end,
     case {ExplicitRev, Etag} of
     {undefined, undefined} -> missing_rev;
@@ -716,11 +728,12 @@
     Destination = couch_httpd:header_value(Req, "Destination"),
     case regexp:match(Destination, "\\?") of
     nomatch -> 
-        {list_to_binary(Destination), []};
+        {list_to_binary(Destination), {0, []}};
     {match, _, _} ->
         {ok, [DocId, RevQueryOptions]} = regexp:split(Destination, "\\?"),
         {ok, [_RevQueryKey, Rev]} = regexp:split(RevQueryOptions, "="),
-        {list_to_binary(DocId), [list_to_binary(Rev)]}
+        {Pos, RevId} = couch_doc:parse_rev(Rev),
+        {list_to_binary(DocId), {Pos, [RevId]}}
     end.
 
 validate_attachment_names(Doc) ->

Modified: couchdb/trunk/src/couchdb/couch_httpd_misc_handlers.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpd_misc_handlers.erl?rev=753448&r1=753447&r2=753448&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpd_misc_handlers.erl (original)
+++ couchdb/trunk/src/couchdb/couch_httpd_misc_handlers.erl Fri Mar 13 22:15:34 2009
@@ -70,35 +70,36 @@
 handle_task_status_req(Req) ->
     send_method_not_allowed(Req, "GET,HEAD").
 
+% add trailing slash if missing
+fix_db_url(UrlBin) ->
+    ?l2b(case lists:last(Url = ?b2l(UrlBin)) of
+    $/ -> Url;
+    _  -> Url ++ "/"
+    end).
+    
+
+get_rep_endpoint(_Req, {Props}) ->
+    Url = proplists:get_value(<<"url">>, Props),
+    {BinHeaders} = proplists:get_value(<<"headers">>, Props, {[]}),
+    {remote, fix_db_url(Url), [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]};
+get_rep_endpoint(_Req, <<"http://",_/binary>>=Url) ->
+    {remote, fix_db_url(Url), []};
+get_rep_endpoint(_Req, <<"https://",_/binary>>=Url) ->
+    {remote, fix_db_url(Url), []};
+get_rep_endpoint(#httpd{user_ctx=UserCtx}, <<DbName/binary>>) ->
+    {local, DbName, UserCtx}.
 
-handle_replicate_req(#httpd{user_ctx=UserCtx,method='POST'}=Req) ->
+handle_replicate_req(#httpd{method='POST'}=Req) ->
     {Props} = couch_httpd:json_body(Req),
-    Source = proplists:get_value(<<"source">>, Props),
-    Target = proplists:get_value(<<"target">>, Props),
-    
-    {SrcOpts} = proplists:get_value(<<"source_options">>, Props, {[]}),
-    {SrcHeadersBinary} = proplists:get_value(<<"headers">>, SrcOpts, {[]}),
-    SrcHeaders = [{?b2l(K),(V)} || {K,V} <- SrcHeadersBinary],
-    
-    {TgtOpts} = proplists:get_value(<<"target_options">>, Props, {[]}),
-    {TgtHeadersBinary} = proplists:get_value(<<"headers">>, TgtOpts, {[]}),
-    TgtHeaders = [{?b2l(K),(V)} || {K,V} <- TgtHeadersBinary],
-    
-    {Options} = proplists:get_value(<<"options">>, Props, {[]}),
-    Options2 = [{source_options,
-                    [{headers, SrcHeaders},
-                    {user_ctx, UserCtx}]},
-                {target_options,
-                    [{headers, TgtHeaders},
-                    {user_ctx, UserCtx}]}
-                | Options],
-    case couch_rep:replicate(Source, Target, Options2) of
-        {ok, {JsonResults}} ->
-            send_json(Req, {[{ok, true} | JsonResults]});
-        {error, {Type, Details}} ->
-            send_json(Req, 500, {[{error, Type}, {reason, Details}]});
-        {error, Reason} ->
-            send_json(Req, 500, {[{error, Reason}]})
+    Source = get_rep_endpoint(Req, proplists:get_value(<<"source">>, Props)),
+    Target = get_rep_endpoint(Req, proplists:get_value(<<"target">>, Props)),
+    case couch_rep:replicate(Source, Target) of
+    {ok, {JsonResults}} ->
+        send_json(Req, {[{ok, true} | JsonResults]});
+    {error, {Type, Details}} ->
+        send_json(Req, 500, {[{error, Type}, {reason, Details}]});
+    {error, Reason} ->
+        send_json(Req, 500, {[{error, Reason}]})
     end;
 handle_replicate_req(Req) ->
     send_method_not_allowed(Req, "POST").

Modified: couchdb/trunk/src/couchdb/couch_httpd_show.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpd_show.erl?rev=753448&r1=753447&r2=753448&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpd_show.erl (original)
+++ couchdb/trunk/src/couchdb/couch_httpd_show.erl Fri Mar 13 22:15:34 2009
@@ -27,10 +27,10 @@
         path_parts=[_DbName, _Design, DesignName, _Show, ShowName, DocId]
     }=Req, Db) ->
     DesignId = <<"_design/", DesignName/binary>>,
-    #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, [], []),
+    #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []),
     Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),
     ShowSrc = get_nested_json_value({Props}, [<<"shows">>, ShowName]),
-    Doc = try couch_httpd_db:couch_doc_open(Db, DocId, [], []) of
+    Doc = try couch_httpd_db:couch_doc_open(Db, DocId, nil, []) of
         FoundDoc -> FoundDoc
     catch
         _ -> nil
@@ -42,7 +42,7 @@
         path_parts=[_DbName, _Design, DesignName, _Show, ShowName]
     }=Req, Db) ->
     DesignId = <<"_design/", DesignName/binary>>,
-    #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, [], []),
+    #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []),
     Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),
     ShowSrc = get_nested_json_value({Props}, [<<"shows">>, ShowName]),
     send_doc_show_response(Lang, ShowSrc, nil, nil, Req, Db);
@@ -56,7 +56,7 @@
 handle_view_list_req(#httpd{method='GET',
         path_parts=[_DbName, _Design, DesignName, _List, ListName, ViewName]}=Req, Db) ->
     DesignId = <<"_design/", DesignName/binary>>,
-    #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, [], []),
+    #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []),
     Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),
     ListSrc = get_nested_json_value({Props}, [<<"lists">>, ListName]),
     send_view_list_response(Lang, ListSrc, ViewName, DesignId, Req, Db, nil);
@@ -67,7 +67,7 @@
 handle_view_list_req(#httpd{method='POST',
         path_parts=[_DbName, _Design, DesignName, _List, ListName, ViewName]}=Req, Db) ->
     DesignId = <<"_design/", DesignName/binary>>,
-    #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, [], []),
+    #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []),
     Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),
     ListSrc = get_nested_json_value({Props}, [<<"lists">>, ListName]),
     ReqBody = couch_httpd:body(Req),
@@ -370,13 +370,12 @@
         couch_httpd_external:send_external_response(Req, JsonResp)
     end);
 
-send_doc_show_response(Lang, ShowSrc, DocId, #doc{revs=[DocRev|_]}=Doc, 
-    #httpd{mochi_req=MReq}=Req, Db) ->
+send_doc_show_response(Lang, ShowSrc, DocId, #doc{revs=Revs}=Doc, #httpd{mochi_req=MReq}=Req, Db) ->
     % calculate the etag
     Headers = MReq:get(headers),
     Hlist = mochiweb_headers:to_list(Headers),
     Accept = proplists:get_value('Accept', Hlist),
-    CurrentEtag = couch_httpd:make_etag({Lang, ShowSrc, DocRev, Accept}),
+    CurrentEtag = couch_httpd:make_etag({Lang, ShowSrc, Revs, Accept}),
     % We know our etag now    
     couch_httpd:etag_respond(Req, CurrentEtag, fun() -> 
         ExternalResp = couch_query_servers:render_doc_show(Lang, ShowSrc, 

Modified: couchdb/trunk/src/couchdb/couch_httpd_view.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpd_view.erl?rev=753448&r1=753447&r2=753448&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpd_view.erl (original)
+++ couchdb/trunk/src/couchdb/couch_httpd_view.erl Fri Mar 13 22:15:34 2009
@@ -565,14 +565,14 @@
         true ->
             Rev = case Value of
             {Props} ->
-                case is_list(Props) of
-                true ->
-                    proplists:get_value(<<"_rev">>, Props, []);
-                _ ->
-                    []
+                case proplists:get_value(<<"_rev">>, Props) of
+                undefined ->
+                    nil;
+                Rev0 ->
+                    couch_doc:parse_rev(Rev0)
                 end;
             _ ->
-                []
+                nil
             end,
             ?LOG_DEBUG("Include Doc: ~p ~p", [DocId, Rev]),
             case (catch couch_httpd_db:couch_doc_open(Db, DocId, Rev, [])) of

Modified: couchdb/trunk/src/couchdb/couch_key_tree.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_key_tree.erl?rev=753448&r1=753447&r2=753448&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_key_tree.erl (original)
+++ couchdb/trunk/src/couchdb/couch_key_tree.erl Fri Mar 13 22:15:34 2009
@@ -13,7 +13,8 @@
 -module(couch_key_tree).
 
 -export([merge/2, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2]).
--export([map/2, get_all_leafs/1, get_leaf_keys/1, count_leafs/1, remove_leafs/2,get_all_leafs_full/1]).
+-export([map/2, get_all_leafs/1, count_leafs/1, remove_leafs/2,
+    get_all_leafs_full/1,stem/2,test/0]).
 
 % a key tree looks like this:
 % Tree -> [] or [{Key, Value, ChildTree} | SiblingTree]
@@ -22,70 +23,150 @@
 % And each Key < SiblingKey
 
 
+% partial trees arranged by how much they are cut off.
 
-% key tree functions
+merge(A, B) ->
+    {Merged, HasConflicts} = 
+    lists:foldl(
+        fun(InsertTree, {AccTrees, AccConflicts}) ->
+            case merge_one(AccTrees, InsertTree, [], false) of
+            {ok, Merged, Conflicts} ->
+                {Merged, Conflicts or AccConflicts};
+            no ->
+                {[InsertTree | AccTrees], true} 
+            end
+        end,
+        {A, false}, B),
+    if HasConflicts or 
+            ((length(Merged) /= length(A)) and (length(Merged) /= length(B))) ->
+        Conflicts = conflicts;
+    true ->
+        Conflicts = no_conflicts
+    end,
+    {lists:sort(Merged), Conflicts}.
 
-% When the same key is found in the trees, the value in tree B is discarded.
-merge([], B) ->
-    B;
-merge(A, []) ->
-    A;
-merge([ATree | ANextTree], [BTree | BNextTree]) ->
+merge_one([], Insert, OutAcc, ConflictsAcc) ->
+    {ok, [Insert | OutAcc], ConflictsAcc};
+merge_one([{Start, Tree}|Rest], {StartInsert, TreeInsert}, OutAcc, ConflictsAcc) ->
+    if Start =< StartInsert ->
+        StartA = Start,
+        StartB = StartInsert,
+        TreeA = Tree,
+        TreeB = TreeInsert;
+    true ->
+        StartB = Start,
+        StartA = StartInsert,
+        TreeB = Tree,
+        TreeA = TreeInsert
+    end,
+    case merge_at([TreeA], StartB - StartA, TreeB) of
+    {ok, [CombinedTrees], Conflicts} ->
+        merge_one(Rest, {StartA, CombinedTrees}, OutAcc, Conflicts or ConflictsAcc);
+    no ->
+        merge_one(Rest, {StartB, TreeB}, [{StartA, TreeA} | OutAcc], ConflictsAcc)
+    end.
+    
+merge_at([], _Place, _Insert) ->
+    no;
+merge_at([{Key, Value, SubTree}|Sibs], 0, {InsertKey, InsertValue, InsertSubTree}) ->
+    if Key == InsertKey ->
+        {Merge, Conflicts} = merge_simple(SubTree, InsertSubTree),
+        {ok, [{Key, Value, Merge} | Sibs], Conflicts};
+    true ->
+        case merge_at(Sibs, 0, {InsertKey, InsertValue, InsertSubTree}) of
+        {ok, Merged, Conflicts} ->
+            {ok, [{Key, Value, SubTree} | Merged], Conflicts};
+        no ->
+            no
+        end
+    end;
+merge_at([{Key, Value, SubTree}|Sibs], Place, Insert) ->
+    case merge_at(SubTree, Place - 1,Insert) of
+    {ok, Merged, Conflicts} ->
+        {ok, [{Key, Value, Merged} | Sibs], Conflicts};
+    no ->
+        case merge_at(Sibs, Place, Insert) of
+        {ok, Merged} ->
+            [{Key, Value, SubTree} | Merged];
+        no ->
+            no
+        end
+    end.
+
+% key tree functions
+merge_simple([], B) ->
+    {B, false};
+merge_simple(A, []) ->
+    {A, false};
+merge_simple([ATree | ANextTree], [BTree | BNextTree]) ->
     {AKey, AValue, ASubTree} = ATree,
     {BKey, _BValue, BSubTree} = BTree,
     if
     AKey == BKey ->
         %same key
-        MergedSubTree = merge(ASubTree, BSubTree),
-        MergedNextTree = merge(ANextTree, BNextTree),
-        [{AKey, AValue, MergedSubTree} | MergedNextTree];
+        {MergedSubTree, Conflict1} = merge_simple(ASubTree, BSubTree),
+        {MergedNextTree, Conflict2} = merge_simple(ANextTree, BNextTree),
+        {[{AKey, AValue, MergedSubTree} | MergedNextTree], Conflict1 or Conflict2};
     AKey < BKey ->
-        [ATree | merge(ANextTree, [BTree | BNextTree])];
+        {MTree, _} = merge_simple(ANextTree, [BTree | BNextTree]),
+        {[ATree | MTree], true};
     true ->
-        [BTree | merge([ATree | ANextTree], BNextTree)]
+        {MTree, _} = merge_simple([ATree | ANextTree], BNextTree),
+        {[BTree | MTree], true}
     end.
 
 find_missing(_Tree, []) ->
     [];
-find_missing([], Keys) ->
-    Keys;
-find_missing([{Key, _, SubTree} | RestTree], Keys) ->
-    SrcKeys2 = Keys -- [Key],
-    SrcKeys3 = find_missing(SubTree, SrcKeys2),
-    find_missing(RestTree, SrcKeys3).
+find_missing([], SeachKeys) ->
+    SeachKeys;
+find_missing([{Start, {Key, Value, SubTree}} | RestTree], SeachKeys) ->
+    PossibleKeys = [{KeyPos, KeyValue} || {KeyPos, KeyValue} <- SeachKeys, KeyPos >= Start],
+    ImpossibleKeys = [{KeyPos, KeyValue} || {KeyPos, KeyValue} <- SeachKeys, KeyPos < Start],
+    Missing = find_missing_simple(Start, [{Key, Value, SubTree}], PossibleKeys),
+    find_missing(RestTree, ImpossibleKeys ++ Missing).
+    
+find_missing_simple(_Pos, _Tree, []) ->
+    [];
+find_missing_simple(_Pos, [], SeachKeys) ->
+    SeachKeys;
+find_missing_simple(Pos, [{Key, _, SubTree} | RestTree], SeachKeys) ->
+    PossibleKeys = [{KeyPos, KeyValue} || {KeyPos, KeyValue} <- SeachKeys, KeyPos >= Pos],
+    ImpossibleKeys = [{KeyPos, KeyValue} || {KeyPos, KeyValue} <- SeachKeys, KeyPos < Pos],
+    
+    SrcKeys2 = PossibleKeys -- [{Pos, Key}],
+    SrcKeys3 = find_missing_simple(Pos + 1, SubTree, SrcKeys2),
+    ImpossibleKeys ++ find_missing_simple(Pos, RestTree, SrcKeys3).
 
 
-get_all_key_paths_rev([], KeyPathAcc) ->
-    KeyPathAcc;
-get_all_key_paths_rev([{Key, Value, SubTree} | RestTree], KeyPathAcc) ->
-    get_all_key_paths_rev(SubTree, [{Key, Value} | KeyPathAcc]) ++
-        get_all_key_paths_rev(RestTree, KeyPathAcc).
-        
+filter_leafs([], _Keys, FilteredAcc, RemovedKeysAcc) ->
+    {FilteredAcc, RemovedKeysAcc};
+filter_leafs([{Pos, [{LeafKey, _}|_]} = Path |Rest], Keys, FilteredAcc, RemovedKeysAcc) ->
+    FilteredKeys = lists:delete({Pos, LeafKey}, Keys),
+    if FilteredKeys == Keys ->
+        % this leaf is not a key we are looking to remove
+        filter_leafs(Rest, Keys, [Path | FilteredAcc], RemovedKeysAcc);
+    true ->
+        % this did match a key, remove both the node and the input key
+        filter_leafs(Rest, FilteredKeys, FilteredAcc, [{Pos, LeafKey} | RemovedKeysAcc])
+    end.
     
 % Removes any branches from the tree whose leaf node(s) are in the Keys
-remove_leafs(Tree, Keys) ->
+remove_leafs(Trees, Keys) ->
     % flatten each branch in a tree into a tree path
-    Paths = get_all_key_paths_rev(Tree, []),
+    Paths = get_all_leafs_full(Trees),
     
     % filter out any that are in the keys list.
-    {FoundKeys, FilteredPaths} = lists:mapfoldl(
-        fun(Key, PathsAcc) ->
-            case [Path || [{LeafKey,_}|_]=Path <- PathsAcc, LeafKey /= Key] of
-            PathsAcc ->
-                {nil, PathsAcc};
-            PathsAcc2 ->
-                {Key, PathsAcc2}
-            end
-        end, Paths, Keys),
-        
+    {FilteredPaths, RemovedKeys} = filter_leafs(Paths, Keys, [], []),
+            
     % convert paths back to trees
     NewTree = lists:foldl(
-        fun(Path,TreeAcc) ->
-            SingleTree = lists:foldl(
+        fun({PathPos, Path},TreeAcc) ->
+            [SingleTree] = lists:foldl(
                 fun({K,V},NewTreeAcc) -> [{K,V,NewTreeAcc}] end, [], Path),
-            merge(TreeAcc, SingleTree)
+            {NewTrees, _} = merge(TreeAcc, [{PathPos + 1 - length(Path), SingleTree}]),
+            NewTrees
         end, [], FilteredPaths),
-    {NewTree, FoundKeys}.
+    {NewTree, RemovedKeys}.
 
 
 % get the leafs in the tree matching the keys. The matching key nodes can be
@@ -94,87 +175,211 @@
 get_key_leafs(Tree, Keys) ->
     get_key_leafs(Tree, Keys, []).
     
-get_key_leafs(_Tree, [], _KeyPathAcc) ->
+get_key_leafs(_, [], Acc) ->
+    {Acc, []};
+get_key_leafs([], Keys, Acc) ->
+    {Acc, Keys};
+get_key_leafs([{Pos, Tree}|Rest], Keys, Acc) ->
+    {Gotten, RemainingKeys} = get_key_leafs_simple(Pos, [Tree], Keys, []),
+    get_key_leafs(Rest, RemainingKeys, Gotten ++ Acc).
+        
+get_key_leafs_simple(_Pos, _Tree, [], _KeyPathAcc) ->
     {[], []};
-get_key_leafs([], KeysToGet, _KeyPathAcc) ->
+get_key_leafs_simple(_Pos, [], KeysToGet, _KeyPathAcc) ->
     {[], KeysToGet};
-get_key_leafs([{Key, _Value, SubTree}=Tree | RestTree], KeysToGet, KeyPathAcc) ->
-    case KeysToGet -- [Key] of
+get_key_leafs_simple(Pos, [{Key, _Value, SubTree}=Tree | RestTree], KeysToGet, KeyPathAcc) ->
+    case lists:delete({Pos, Key}, KeysToGet) of
     KeysToGet -> % same list, key not found    
-        {LeafsFound, KeysToGet2} = get_key_leafs(SubTree, KeysToGet, [Key | KeyPathAcc]),
-        {RestLeafsFound, KeysRemaining} = get_key_leafs(RestTree, KeysToGet2, KeyPathAcc),
+        {LeafsFound, KeysToGet2} = get_key_leafs_simple(Pos + 1, SubTree, KeysToGet, [Key | KeyPathAcc]),
+        {RestLeafsFound, KeysRemaining} = get_key_leafs_simple(Pos, RestTree, KeysToGet2, KeyPathAcc),
         {LeafsFound ++ RestLeafsFound, KeysRemaining};
     KeysToGet2 ->
-        LeafsFound = get_all_leafs([Tree], KeyPathAcc),
+        LeafsFound = get_all_leafs_simple(Pos, [Tree], KeyPathAcc),
         LeafKeysFound = [LeafKeyFound || {LeafKeyFound, _, _} <- LeafsFound],
         KeysToGet2 = KeysToGet2 -- LeafKeysFound,
-        {RestLeafsFound, KeysRemaining} = get_key_leafs(RestTree, KeysToGet2, KeyPathAcc),
+        {RestLeafsFound, KeysRemaining} = get_key_leafs_simple(Pos, RestTree, KeysToGet2, KeyPathAcc),
         {LeafsFound ++ RestLeafsFound, KeysRemaining}
     end.
 
 get(Tree, KeysToGet) ->
     {KeyPaths, KeysNotFound} = get_full_key_paths(Tree, KeysToGet),
-    FixedResults = [ {Key, Value, [Key0 || {Key0, _} <- Path]} || [{Key, Value}|_] = Path <- KeyPaths],
+    FixedResults = [ {Value, {Pos, [Key0 || {Key0, _} <- Path]}} || {Pos, [{_Key, Value}|_]=Path} <- KeyPaths],
     {FixedResults, KeysNotFound}.
     
 get_full_key_paths(Tree, Keys) ->
     get_full_key_paths(Tree, Keys, []).
     
-get_full_key_paths(_Tree, [], _KeyPathAcc) ->
+get_full_key_paths(_, [], Acc) ->
+    {Acc, []};
+get_full_key_paths([], Keys, Acc) ->
+    {Acc, Keys};
+get_full_key_paths([{Pos, Tree}|Rest], Keys, Acc) ->
+    {Gotten, RemainingKeys} = get_full_key_paths(Pos, [Tree], Keys, []),
+    get_full_key_paths(Rest, RemainingKeys, Gotten ++ Acc).
+    
+    
+get_full_key_paths(_Pos, _Tree, [], _KeyPathAcc) ->
     {[], []};
-get_full_key_paths([], KeysToGet, _KeyPathAcc) ->
+get_full_key_paths(_Pos, [], KeysToGet, _KeyPathAcc) ->
     {[], KeysToGet};
-get_full_key_paths([{KeyId, Value, SubTree} | RestTree], KeysToGet, KeyPathAcc) ->
-    KeysToGet2 = KeysToGet -- [KeyId],
+get_full_key_paths(Pos, [{KeyId, Value, SubTree} | RestTree], KeysToGet, KeyPathAcc) ->
+    KeysToGet2 = KeysToGet -- [{Pos, KeyId}],
     CurrentNodeResult =
     case length(KeysToGet2) == length(KeysToGet) of
     true -> % not in the key list.
         [];
     false -> % this node is the key list. return it
-        [[{KeyId, Value} | KeyPathAcc]]
+        [{Pos, [{KeyId, Value} | KeyPathAcc]}]
     end,
-    {KeysGotten, KeysRemaining} = get_full_key_paths(SubTree, KeysToGet2, [{KeyId, Value} | KeyPathAcc]),
-    {KeysGotten2, KeysRemaining2} = get_full_key_paths(RestTree, KeysRemaining, KeyPathAcc),
+    {KeysGotten, KeysRemaining} = get_full_key_paths(Pos + 1, SubTree, KeysToGet2, [{KeyId, Value} | KeyPathAcc]),
+    {KeysGotten2, KeysRemaining2} = get_full_key_paths(Pos, RestTree, KeysRemaining, KeyPathAcc),
     {CurrentNodeResult ++ KeysGotten ++ KeysGotten2, KeysRemaining2}.
 
 get_all_leafs_full(Tree) ->
     get_all_leafs_full(Tree, []).
     
-get_all_leafs_full([], _KeyPathAcc) ->
+get_all_leafs_full([], Acc) ->
+    Acc;
+get_all_leafs_full([{Pos, Tree} | Rest], Acc) ->
+    get_all_leafs_full(Rest, get_all_leafs_full_simple(Pos, [Tree], []) ++ Acc).
+    
+get_all_leafs_full_simple(_Pos, [], _KeyPathAcc) ->
     [];
-get_all_leafs_full([{KeyId, Value, []} | RestTree], KeyPathAcc) ->
-    [[{KeyId, Value} | KeyPathAcc] | get_all_leafs_full(RestTree, KeyPathAcc)];
-get_all_leafs_full([{KeyId, Value, SubTree} | RestTree], KeyPathAcc) ->
-    get_all_leafs_full(SubTree, [{KeyId, Value} | KeyPathAcc]) ++ get_all_leafs_full(RestTree, KeyPathAcc).
-
-get_all_leafs(Tree) ->
-    get_all_leafs(Tree, []).
+get_all_leafs_full_simple(Pos, [{KeyId, Value, []} | RestTree], KeyPathAcc) ->
+    [{Pos, [{KeyId, Value} | KeyPathAcc]} | get_all_leafs_full_simple(Pos, RestTree, KeyPathAcc)];
+get_all_leafs_full_simple(Pos, [{KeyId, Value, SubTree} | RestTree], KeyPathAcc) ->
+    get_all_leafs_full_simple(Pos + 1, SubTree, [{KeyId, Value} | KeyPathAcc]) ++ get_all_leafs_full_simple(Pos, RestTree, KeyPathAcc).
+
+get_all_leafs(Trees) ->
+    get_all_leafs(Trees, []).
+
+get_all_leafs([], Acc) ->
+    Acc;
+get_all_leafs([{Pos, Tree}|Rest], Acc) ->
+    get_all_leafs(Rest, get_all_leafs_simple(Pos, [Tree], []) ++ Acc).
     
-get_all_leafs([], _KeyPathAcc) ->
+get_all_leafs_simple(_Pos, [], _KeyPathAcc) ->
     [];
-get_all_leafs([{KeyId, Value, []} | RestTree], KeyPathAcc) ->
-    [{KeyId, Value, [KeyId | KeyPathAcc]} | get_all_leafs(RestTree, KeyPathAcc)];
-get_all_leafs([{KeyId, _Value, SubTree} | RestTree], KeyPathAcc) ->
-    get_all_leafs(SubTree, [KeyId | KeyPathAcc]) ++ get_all_leafs(RestTree, KeyPathAcc).
+get_all_leafs_simple(Pos, [{KeyId, Value, []} | RestTree], KeyPathAcc) ->
+    [{Value, {Pos, [KeyId | KeyPathAcc]}} | get_all_leafs_simple(Pos, RestTree, KeyPathAcc)];
+get_all_leafs_simple(Pos, [{KeyId, _Value, SubTree} | RestTree], KeyPathAcc) ->
+    get_all_leafs_simple(Pos + 1, SubTree, [KeyId | KeyPathAcc]) ++ get_all_leafs_simple(Pos, RestTree, KeyPathAcc).
+
 
-get_leaf_keys([]) ->
-    [];
-get_leaf_keys([{Key, _Value, []} | RestTree]) ->
-    [Key | get_leaf_keys(RestTree)];
-get_leaf_keys([{_Key, _Value, SubTree} | RestTree]) ->
-    get_leaf_keys(SubTree) ++ get_leaf_keys(RestTree).
-    
 count_leafs([]) ->
     0;
-count_leafs([{_Key, _Value, []} | RestTree]) ->
-    1 + count_leafs(RestTree);
-count_leafs([{_Key, _Value, SubTree} | RestTree]) ->
-    count_leafs(SubTree) + count_leafs(RestTree).
+count_leafs([{_Pos,Tree}|Rest]) ->
+    count_leafs_simple([Tree]) + count_leafs(Rest).
     
+count_leafs_simple([]) ->
+    0;
+count_leafs_simple([{_Key, _Value, []} | RestTree]) ->
+    1 + count_leafs_simple(RestTree);
+count_leafs_simple([{_Key, _Value, SubTree} | RestTree]) ->
+    count_leafs_simple(SubTree) + count_leafs_simple(RestTree).
 
+    
 map(_Fun, []) ->
     [];
-map(Fun, [{Key, Value, SubTree} | RestTree]) ->
-    Value2 = Fun(Key, Value),
-    [{Key, Value2, map(Fun, SubTree)} | map(Fun, RestTree)].
+map(Fun, [{Pos, Tree}|Rest]) ->
+    [NewTree] = map_simple(Fun, Pos, [Tree]),
+    [{Pos, NewTree} | map(Fun, Rest)].
 
+map_simple(_Fun, _Pos, []) ->
+    [];
+map_simple(Fun, Pos, [{Key, Value, SubTree} | RestTree]) ->
+    Value2 = Fun({Pos, Key}, Value),
+    [{Key, Value2, map_simple(Fun, Pos + 1, SubTree)} | map_simple(Fun, Pos, RestTree)].
+
+
+stem(Trees, Limit) ->
+    % flatten each branch in a tree into a tree path
+    Paths = get_all_leafs_full(Trees),
+    
+    Paths2 = [{Pos, lists:sublist(Path, Limit)} || {Pos, Path} <- Paths],
+    
+    % convert paths back to trees
+    lists:foldl(
+        fun({PathPos, Path},TreeAcc) ->
+            [SingleTree] = lists:foldl(
+                fun({K,V},NewTreeAcc) -> [{K,V,NewTreeAcc}] end, [], Path),
+            {NewTrees, _} = merge(TreeAcc, [{PathPos + 1 - length(Path), SingleTree}]),
+            NewTrees
+        end, [], Paths2).
+
+test() ->
+    EmptyTree = [],
+    One = [{0, {"1","foo",[]}}],
+    TwoSibs = [{0, {"1","foo",[]}},
+               {0, {"2","foo",[]}}],
+    OneChild = [{0, {"1","foo",[{"1a", "bar", []}]}}],
+    TwoChild = [{0, {"1","foo", [{"1a", "bar", [{"1aa", "bar", []}]}]}}],
+    TwoChildSibs = [{0, {"1","foo", [{"1a", "bar", []},
+                                     {"1b", "bar", []}]}}],
+    Stemmed1a = [{1, {"1a", "bar", [{"1aa", "bar", []}]}}],
+    Stemmed1aa = [{2, {"1aa", "bar", []}}],
+    
+    {EmptyTree, no_conflicts} = merge(EmptyTree, EmptyTree),
+    {One, no_conflicts} = merge(EmptyTree, One),
+    {One, no_conflicts} = merge(One, EmptyTree),
+    {TwoSibs, no_conflicts} = merge(One, TwoSibs),
+    {One, no_conflicts} = merge(One, One),
+    {TwoChild, no_conflicts} = merge(TwoChild, TwoChild),
+    {TwoChildSibs, no_conflicts} = merge(TwoChildSibs, TwoChildSibs),
+    {TwoChild, no_conflicts} = merge(TwoChild, Stemmed1aa),
+    {TwoChild, no_conflicts} = merge(TwoChild, Stemmed1a),
+    {Stemmed1a, no_conflicts} = merge(Stemmed1a, Stemmed1aa),
+    Expect1 = OneChild ++ Stemmed1aa,
+    {Expect1, conflicts} = merge(OneChild, Stemmed1aa),
+    {TwoChild, no_conflicts} = merge(Expect1, TwoChild),
+    
+    []=find_missing(TwoChildSibs, [{0,"1"}, {1,"1a"}]),
+    [{0, "10"}, {100, "x"}]=find_missing(TwoChildSibs, [{0,"1"}, {0, "10"}, {1,"1a"}, {100, "x"}]),
+    [{0, "1"}, {100, "x"}]=find_missing(Stemmed1a, [{0,"1"}, {1,"1a"}, {100, "x"}]),
+    [{0, "1"}, {1,"1a"}, {100, "x"}]=find_missing(Stemmed1aa, [{0,"1"}, {1,"1a"}, {100, "x"}]),
+    
+    {TwoChildSibs, []} = remove_leafs(TwoChildSibs, []),
+    {TwoChildSibs, []} = remove_leafs(TwoChildSibs, [{0, "1"}]),
+    {OneChild, [{1, "1b"}]} = remove_leafs(TwoChildSibs, [{1, "1b"}]),
+    {[], [{1, "1b"},{1, "1a"}]} = remove_leafs(TwoChildSibs, [{1, "1a"}, {1, "1b"}]),
+    {Stemmed1a, []} = remove_leafs(Stemmed1a, [{1, "1a"}]),
+    {[], [{2, "1aa"}]} = remove_leafs(Stemmed1a, [{2, "1aa"}]),
+    {TwoChildSibs, []} = remove_leafs(TwoChildSibs, []),
+    
+    {[],[{0,"x"}]} = get_key_leafs(TwoChildSibs, [{0, "x"}]),
+    
+    {[{"bar", {1, ["1a","1"]}}],[]} = get_key_leafs(TwoChildSibs, [{1, "1a"}]),
+    {[{"bar", {1, ["1a","1"]}},{"bar",{1, ["1b","1"]}}],[]} = get_key_leafs(TwoChildSibs, [{0, "1"}]),
+    
+    {[{"foo", {0, ["1"]}}],[]} = get(TwoChildSibs, [{0, "1"}]),
+    {[{"bar", {1, ["1a", "1"]}}],[]} = get(TwoChildSibs, [{1, "1a"}]),
+
+    {[{0,[{"1", "foo"}]}],[]} = get_full_key_paths(TwoChildSibs, [{0, "1"}]),
+    {[{1,[{"1a", "bar"},{"1", "foo"}]}],[]} = get_full_key_paths(TwoChildSibs, [{1, "1a"}]),
+    
+    [{2, [{"1aa", "bar"},{"1a", "bar"}]}] = get_all_leafs_full(Stemmed1a),
+    [{1, [{"1a", "bar"},{"1", "foo"}]}, {1, [{"1b", "bar"},{"1", "foo"}]}] = get_all_leafs_full(TwoChildSibs),
+    
+    [{"bar", {2, ["1aa","1a"]}}] = get_all_leafs(Stemmed1a),
+    [{"bar", {1, ["1a", "1"]}}, {"bar", {1, ["1b","1"]}}] = get_all_leafs(TwoChildSibs),
+    
+    0 = count_leafs(EmptyTree),
+    1 = count_leafs(One),
+    2 = count_leafs(TwoChildSibs),
+    1 = count_leafs(Stemmed1a),
+    
+    TwoChild = stem(TwoChild, 3),
+    Stemmed1a = stem(TwoChild, 2),
+    Stemmed1aa = stem(TwoChild, 1),
+    ok.
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
\ No newline at end of file

Modified: couchdb/trunk/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep.erl?rev=753448&r1=753447&r2=753448&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep.erl Fri Mar 13 22:15:34 2009
@@ -15,11 +15,11 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 
     code_change/3]).
 
--export([replicate/3]).
+-export([replicate/2]).
 
 -include_lib("couch_db.hrl").
 
-%% @spec replicate(Source::binary(), Target::binary(), Options::proplist()) -> 
+%% @spec replicate(Source::binary(), Target::binary()) -> 
 %%      {ok, Stats} | {error, Reason}
 %% @doc Triggers a replication.  Stats is a JSON Object with the following 
 %%      keys: session_id (UUID), source_last_seq (integer), and history (array).
@@ -30,26 +30,29 @@
 %%      The supervisor will try to restart the replication in case of any error
 %%      other than shutdown.  Just call this function again to listen for the 
 %%      result of the retry.
-replicate(Source, Target, Options) ->
-    Id = <<Source/binary, ":", Target/binary>>,
-    Args = [?MODULE, [Source,Target,Options], []],
+replicate(Source, Target) ->
     
-    Replicator = {Id,
+    {ok, HostName} = inet:gethostname(),
+    RepId = couch_util:to_hex(
+            erlang:md5(term_to_binary([HostName, Source, Target]))),
+    Args = [?MODULE, [RepId, Source,Target], []],
+    
+    Replicator = {RepId,
         {gen_server, start_link, Args},
         transient,
-        10000,
+        1,
         worker,
         [?MODULE]
     },
     
     Server = case supervisor:start_child(couch_rep_sup, Replicator) of
         {ok, Pid} -> 
-            ?LOG_INFO("starting new replication ~p at ~p", [Id, Pid]),
+            ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]),
             Pid;
         {error, already_present} ->
-            case supervisor:restart_child(couch_rep_sup, Id) of
+            case supervisor:restart_child(couch_rep_sup, RepId) of
                 {ok, Pid} -> 
-                    ?LOG_INFO("starting replication ~p at ~p", [Id, Pid]),
+                    ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]),
                     Pid;
                 {error, running} -> 
                     %% this error occurs if multiple replicators are racing
@@ -57,16 +60,16 @@
                     %% the Pid by calling start_child again.
                     {error, {already_started, Pid}} = 
                         supervisor:start_child(couch_rep_sup, Replicator),
-                    ?LOG_INFO("replication ~p already running at ~p", [Id, Pid]),
+                    ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]),
                     Pid
             end;
         {error, {already_started, Pid}} -> 
-            ?LOG_INFO("replication ~p already running at ~p", [Id, Pid]),
+            ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]),
             Pid
     end,
     
     case gen_server:call(Server, get_result, infinity) of 
-        retry -> replicate(Source, Target, Options);
+        retry -> replicate(Source, Target);
         Else -> Else
     end.
 
@@ -79,6 +82,7 @@
     headers
 }).
 
+    
 -record(state, {
     context,
     current_seq,
@@ -90,18 +94,14 @@
     listeners = []
 }).
 
-init([Source, Target, Options]) ->
+
+init([RepId, Source, Target]) ->
     process_flag(trap_exit, true),
     
-    {ok, DbSrc} = 
-        open_db(Source, proplists:get_value(source_options, Options, [])),
-    {ok, DbTgt} = 
-        open_db(Target, proplists:get_value(target_options, Options, [])),
-    
-    {ok, Host} = inet:gethostname(),
-    HostBin = list_to_binary(Host),
-    DocKey = <<?LOCAL_DOC_PREFIX, HostBin/binary, ":", Source/binary, ":", 
-        Target/binary>>,
+    {ok, DbSrc, SrcName} = open_db(Source),
+    {ok, DbTgt, TgtName} =  open_db(Target),
+    
+    DocKey = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
     
     {ok, InfoSrc} = get_db_info(DbSrc),
     {ok, InfoTgt} = get_db_info(DbTgt),
@@ -110,49 +110,49 @@
     SrcInstanceStartTime = proplists:get_value(instance_start_time, InfoSrc),
     TgtInstanceStartTime = proplists:get_value(instance_start_time, InfoTgt),
     
-    case proplists:get_value(full, Options, false)
-        orelse proplists:get_value("full", Options, false) of
+    RepRecDocSrc =
+    case open_doc(DbSrc, DocKey, []) of
+    {ok, SrcDoc} ->
+        ?LOG_DEBUG("Found existing replication record on source", []),
+        SrcDoc;
+    _ -> #doc{id=DocKey}
+    end,
+    
+    RepRecDocTgt =
+    case open_doc(DbTgt, DocKey, []) of
+    {ok, TgtDoc} ->
+        ?LOG_DEBUG("Found existing replication record on target", []),
+        TgtDoc;
+    _ -> #doc{id=DocKey}
+    end,
+    
+    #doc{body={RepRecProps}} = RepRecDocSrc,
+    #doc{body={RepRecPropsTgt}} = RepRecDocTgt,
+    
+    case proplists:get_value(<<"session_id">>, RepRecProps) == 
+            proplists:get_value(<<"session_id">>, RepRecPropsTgt) of
     true ->
-        RepRecSrc = RepRecTgt = #doc{id=DocKey};
+        % if the records have the same session id,
+        % then we have a valid replication history
+        OldSeqNum = proplists:get_value(<<"source_last_seq">>, RepRecProps, 0),
+        OldHistory = proplists:get_value(<<"history">>, RepRecProps, []);
     false ->
-        RepRecSrc = case open_doc(DbSrc, DocKey, []) of
-            {ok, SrcDoc} ->
-                ?LOG_DEBUG("Found existing replication record on source", []),
-                SrcDoc;
-            _ -> #doc{id=DocKey}
-        end,
-
-        RepRecTgt = case open_doc(DbTgt, DocKey, []) of
-            {ok, TgtDoc} ->
-                ?LOG_DEBUG("Found existing replication record on target", []),
-                TgtDoc;
-            _ -> #doc{id=DocKey}
-        end
-    end,
-
-    #doc{body={OldRepHistoryProps}} = RepRecSrc,
-    #doc{body={OldRepHistoryPropsTrg}} = RepRecTgt,
-
-    SeqNum = case OldRepHistoryProps == OldRepHistoryPropsTrg of
-        true ->
-            % if the records are identical, then we have a valid replication history
-            proplists:get_value(<<"source_last_seq">>, OldRepHistoryProps, 0);
-        false ->
-            ?LOG_INFO("Replication records differ. "
+        ?LOG_INFO("Replication records differ. "
                 "Performing full replication instead of incremental.", []),
-            ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n", 
-                [OldRepHistoryProps, OldRepHistoryPropsTrg]),
-        0
+        ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
+                [RepRecProps, RepRecPropsTgt]),
+        OldSeqNum = 0,
+        OldHistory = []
     end,
     
     Context = [
-        {start_seq, SeqNum},
-        {history, OldRepHistoryProps},
+        {start_seq, OldSeqNum},
+        {history, OldHistory},
         {rep_starttime, ReplicationStartTime},
         {src_starttime, SrcInstanceStartTime},
         {tgt_starttime, TgtInstanceStartTime},
-        {src_record, RepRecSrc},
-        {tgt_record, RepRecTgt}
+        {src_record, RepRecDocSrc},
+        {tgt_record, RepRecDocTgt}
     ],
     
     Stats = ets:new(replication_stats, [set, private]),
@@ -160,16 +160,17 @@
     ets:insert(Stats, {missing_revs, 0}),
     ets:insert(Stats, {docs_read, 0}),
     ets:insert(Stats, {docs_written, 0}),
+    ets:insert(Stats, {doc_write_failures, 0}),
     
-    couch_task_status:add_task("Replication", <<Source/binary, " -> ",
-        Target/binary>>, "Starting"),
+    couch_task_status:add_task("Replication", <<SrcName/binary, " -> ",
+        TgtName/binary>>, "Starting"),
     
     Parent = self(),
-    Pid = spawn_link(fun() -> enum_docs_since(Parent,DbSrc,DbTgt,{SeqNum,0}) end),
+    Pid = spawn_link(fun() -> enum_docs_since(Parent,DbSrc,DbTgt,{OldSeqNum,0}) end),
     
     State = #state{
         context = Context,
-        current_seq = SeqNum,
+        current_seq = OldSeqNum,
         enum_pid = Pid,
         source = DbSrc,
         target = DbTgt,
@@ -178,7 +179,6 @@
     
     {ok, State}.
 
-
 handle_call(get_result, From, #state{listeners=L} = State) ->
     {noreply, State#state{listeners=[From|L]}};
 
@@ -191,7 +191,7 @@
         target = Target,
         stats = Stats
     } = State,
-    
+
     ets:update_counter(Stats, missing_revs, length(Revs)),
     
     %% get document(s)
@@ -203,8 +203,11 @@
     {NewBuffer, NewContext} = case couch_util:should_flush() of
         true ->
             Docs2 = lists:flatten([Docs|Buffer]),
-            ok = update_docs(Target, Docs2, [], false),
-            ets:update_counter(Stats, docs_written, length(Docs2)),
+            {ok, Errors} = update_docs(Target, Docs2, [], replicated_changes),
+            dump_update_errors(Errors),
+            ets:update_counter(Stats, doc_write_failures, length(Errors)),
+            ets:update_counter(Stats, docs_written, length(Docs2) -
+                    length(Errors)),
             {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats),
             {[], Ctxt};
         false ->
@@ -255,8 +258,11 @@
         stats = Stats
     } = State,
     
-    ok = update_docs(Target, lists:flatten(Buffer), [], false),
-    ets:update_counter(Stats, docs_written, lists:flatlength(Buffer)),
+    {ok, Errors} = update_docs(Target, lists:flatten(Buffer), [], replicated_changes),
+    dump_update_errors(Errors),
+    ets:update_counter(Stats, doc_write_failures, length(Errors)),
+    ets:update_counter(Stats, docs_written, lists:flatlength(Buffer) -
+            length(Errors)),
     
     couch_task_status:update("Finishing"),
     
@@ -264,9 +270,12 @@
     ets:delete(Stats),
     close_db(Target),
     
-    %% reply to original requester
-    [Original|Rest] = Listeners,
-    gen_server:reply(Original, {ok, NewRepHistory}),
+    case Listeners of
+    [Original|Rest] ->
+        %% reply to original requester
+        gen_server:reply(Original, {ok, NewRepHistory});
+    Rest -> ok
+    end,
     
     %% maybe trigger another replication. If this replicator uses a local 
     %% source Db, changes to that Db since we started will not be included in 
@@ -304,6 +313,16 @@
 %% internal functions
 %%=============================================================================
 
+
+% we should probably write these to a special replication log
+% or have a callback where the caller decides what to do with replication
+% errors.
+dump_update_errors([]) -> ok;
+dump_update_errors([{{Id, Rev}, Error}|Rest]) ->
+    ?LOG_INFO("error replicating document \"~s\" rev \"~s\":~p",
+        [Id, couch_doc:rev_to_str(Rev), Error]),
+    dump_update_errors(Rest).
+
 attachment_loop(ReqId) ->
     couch_util:should_flush(),
     receive 
@@ -354,6 +373,16 @@
     end,
     {Name, {Type, {RcvFun, Length}}}.
 
+
+open_db({remote, Url, Headers})->
+    {ok, #http_db{uri=?b2l(Url), headers=Headers}, Url};
+open_db({local, DbName, UserCtx})->
+    case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
+    {ok, Db} -> {ok, Db, DbName};
+    Error -> Error
+    end.
+
+
 close_db(#http_db{})->
     ok;
 close_db(Db)->
@@ -362,27 +391,38 @@
 do_checkpoint(Source, Target, Context, NewSeqNum, Stats) ->
     ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]),
     [
-        {start_seq, SeqNum},
-        {history, OldRepHistoryProps},
+        {start_seq, StartSeqNum},
+        {history, OldHistory},
         {rep_starttime, ReplicationStartTime},
         {src_starttime, SrcInstanceStartTime},
         {tgt_starttime, TgtInstanceStartTime},
-        {src_record, RepRecSrc},
-        {tgt_record, RepRecTgt}
+        {src_record, #doc{body={LastRepRecord}}=RepRecDocSrc},
+        {tgt_record, RepRecDocTgt}
     ] = Context,
     
-    NewHistory = case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of
+    case NewSeqNum == StartSeqNum andalso OldHistory /= [] of
     true ->
         % nothing changed, don't record results
-        {OldRepHistoryProps};
+        {ok, {[{<<"no_changes">>, true} | LastRepRecord]}, Context};
     false ->
+        % something changed, record results for incremental replication,
+        
         % commit changes to both src and tgt. The src because if changes
-        % we replicated are lost, we'll record the a seq number of ahead 
-        % of what was committed and therefore lose future changes with the
-        % same seq nums.
-        {ok, SrcInstanceStartTime2} = ensure_full_commit(Source),
+        % we replicated are lost, we'll record the a seq number ahead 
+        % of what was committed. If those changes are lost and the seq number
+        % reverts to a previous committed value, we will skip future changes
+        % when new doc updates are given our already replicated seq nums.
+        
+        % commit the src async
+        ParentPid = self(),
+        SrcCommitPid = spawn_link(fun() -> 
+                ParentPid ! {self(), ensure_full_commit(Source)} end),
+                
+        % commit tgt sync
         {ok, TgtInstanceStartTime2} = ensure_full_commit(Target),
         
+        receive {SrcCommitPid, {ok, SrcInstanceStartTime2}} -> ok end,
+        
         RecordSeqNum =
         if SrcInstanceStartTime2 == SrcInstanceStartTime andalso
                 TgtInstanceStartTime2 == TgtInstanceStartTime ->
@@ -391,60 +431,57 @@
             ?LOG_INFO("A server has restarted sinced replication start. "
                 "Not recording the new sequence number to ensure the "
                 "replication is redone and documents reexamined.", []),
-            SeqNum
+            StartSeqNum
         end,
         
-        %% format replication history
-        JsonStats = [
+        NewHistoryEntry = {
+            [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
+            {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
+            {<<"start_last_seq">>, StartSeqNum},
+            {<<"end_last_seq">>, NewSeqNum},
             {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
             {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
             {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
-            {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}
+            {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
+            {<<"doc_write_failures">>, ets:lookup_element(Stats, doc_write_failures, 2)}
+            ]},
+        % limit history to 50 entries
+        HistEntries =lists:sublist([NewHistoryEntry |  OldHistory], 50),
+
+        NewRepHistory =
+                {[{<<"session_id">>, couch_util:new_uuid()},
+                  {<<"source_last_seq">>, RecordSeqNum},
+                  {<<"history">>, HistEntries}]},
+
+        {ok, {SrcRevPos,SrcRevId}} = update_doc(Source, 
+                RepRecDocSrc#doc{body=NewRepHistory}, []),
+        {ok, {TgtRevPos,TgtRevId}} = update_doc(Target,
+                RepRecDocTgt#doc{body=NewRepHistory}, []),
+    
+        NewContext = [
+            {start_seq, StartSeqNum},
+            {history, OldHistory},
+            {rep_starttime, ReplicationStartTime},
+            {src_starttime, SrcInstanceStartTime},
+            {tgt_starttime, TgtInstanceStartTime},
+            {src_record, RepRecDocSrc#doc{revs={SrcRevPos,[SrcRevId]}}},
+            {tgt_record, RepRecDocTgt#doc{revs={TgtRevPos,[TgtRevId]}}}
         ],
-        
-        HistEntries =[
-            {
-                [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
-                {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
-                {<<"start_last_seq">>, SeqNum},
-                {<<"end_last_seq">>, NewSeqNum} | JsonStats]}
-            | proplists:get_value(<<"history">>, OldRepHistoryProps, [])],
-        % something changed, record results
-        {[
-            {<<"session_id">>, couch_util:new_uuid()},
-            {<<"source_last_seq">>, RecordSeqNum},
-            {<<"history">>, lists:sublist(HistEntries, 50)}
-        ]}
-    end,
-    
-    %% update local documents
-    RepRecSrc = proplists:get_value(src_record, Context),
-    RepRecTgt = proplists:get_value(tgt_record, Context),
-    {ok, TgtRev} = update_local_doc(Target, RepRecTgt#doc{body=NewHistory}, []),
-    {ok, SrcRev} = update_local_doc(Source, RepRecSrc#doc{body=NewHistory}, []),
-    
-    NewContext = [
-        {start_seq, SeqNum},
-        {history, OldRepHistoryProps},
-        {rep_starttime, ReplicationStartTime},
-        {src_starttime, SrcInstanceStartTime},
-        {tgt_starttime, TgtInstanceStartTime},
-        {src_record, RepRecSrc#doc{revs=[SrcRev]}},
-        {tgt_record, RepRecTgt#doc{revs=[TgtRev]}}
-    ],
     
-    {ok, NewHistory, NewContext}.
+        {ok, NewRepHistory, NewContext}
+    
+    end.
 
 do_http_request(Url, Action, Headers) ->
     do_http_request(Url, Action, Headers, []).
 
 do_http_request(Url, Action, Headers, JsonBody) ->
-    do_http_request(?b2l(?l2b(Url)), Action, Headers, JsonBody, 10).
+    do_http_request(Url, Action, Headers, JsonBody, 10).
 
 do_http_request(Url, Action, _Headers, _JsonBody, 0) ->
     ?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~s", 
         [Action, Url]),
-    exit({http_request_failed, ?l2b(Url)});
+    exit({http_request_failed, Url});
 do_http_request(Url, Action, Headers, JsonBody, Retries) ->
     ?LOG_DEBUG("couch_rep HTTP ~p request: ~s", [Action, Url]),
     Body =
@@ -498,7 +535,6 @@
     [] ->
         gen_server:call(Pid, {fin, {StartSeq, RevsCount}}, infinity);
     DocInfoList ->
-        % UpdateSeqs = [D#doc_info.update_seq || D <- DocInfoList],
         SrcRevsList = lists:map(fun(SrcDocInfo) ->
             #doc_info{id=Id,
                 rev=Rev,
@@ -521,13 +557,8 @@
         enum_docs_since(Pid, DbSource, DbTarget, {LastSeq, RevsCount2})
     end.
 
-fix_url(UrlBin) ->
-    Url = binary_to_list(UrlBin),
-    case lists:last(Url) of
-        $/ -> Url;
-        _ ->  Url ++ "/"
-    end.
 
+            
 get_db_info(#http_db{uri=DbUrl, headers=Headers}) ->
     {DbProps} = do_http_request(DbUrl, get, Headers),
     {ok, [{list_to_existing_atom(?b2l(K)), V} || {K,V} <- DbProps]};
@@ -542,12 +573,12 @@
         {RowValueProps} = proplists:get_value(<<"value">>, RowInfoList),
         #doc_info{
             id=proplists:get_value(<<"id">>, RowInfoList),
-            rev=proplists:get_value(<<"rev">>, RowValueProps),
+            rev=couch_doc:parse_rev(proplists:get_value(<<"rev">>, RowValueProps)),
             update_seq = proplists:get_value(<<"key">>, RowInfoList),
             conflict_revs =
-                proplists:get_value(<<"conflicts">>, RowValueProps, []),
+                couch_doc:parse_revs(proplists:get_value(<<"conflicts">>, RowValueProps, [])),
             deleted_conflict_revs =
-                proplists:get_value(<<"deleted_conflicts">>, RowValueProps, []),
+                couch_doc:parse_revs(proplists:get_value(<<"deleted_conflicts">>, RowValueProps, [])),
             deleted = proplists:get_value(<<"deleted">>, RowValueProps, false)
         }
     end, proplists:get_value(<<"rows">>, Results));
@@ -561,25 +592,18 @@
     lists:reverse(DocInfoList).
 
 get_missing_revs(#http_db{uri=DbUrl, headers=Headers}, DocIdRevsList) ->
+    DocIdRevsList2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- DocIdRevsList],
     {ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, Headers,
-            {DocIdRevsList}),
-    {MissingRevs} = proplists:get_value(<<"missing_revs">>, ResponseMembers),
-    {ok, MissingRevs};
+            {DocIdRevsList2}),
+    {DocMissingRevsList} = proplists:get_value(<<"missing_revs">>, ResponseMembers),
+    DocMissingRevsList2 = [{Id, couch_doc:parse_revs(MissingRevStrs)} || {Id, MissingRevStrs} <- DocMissingRevsList],
+    {ok, DocMissingRevsList2};
 get_missing_revs(Db, DocId) ->
     couch_db:get_missing_revs(Db, DocId).
 
-open_http_db(UrlBin, Options) ->
-    Headers = proplists:get_value(headers, Options, {[]}),
-    {ok, #http_db{uri=fix_url(UrlBin), headers=Headers}}.
-            
-open_db(<<"http://", _/binary>>=Url, Options)->
-    open_http_db(Url, Options);
-open_db(<<"https://", _/binary>>=Url, Options)->
-    open_http_db(Url, Options);
-open_db(DbName, Options)->
-    couch_db:open(DbName, Options).
 
-open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, []) ->
+open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, Options) ->
+    [] = Options,
     case do_http_request(DbUrl ++ url_encode(DocId), get, Headers) of
     {[{<<"error">>, ErrId}, {<<"reason">>, Reason}]} ->
         {couch_util:to_existing_atom(ErrId), Reason};
@@ -589,7 +613,9 @@
 open_doc(Db, DocId, Options) ->
     couch_db:open_doc(Db, DocId, Options).
 
-open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs, _Opts) ->
+open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs0, 
+        [latest]) ->
+    Revs = couch_doc:rev_to_strs(Revs0),
     BaseUrl = DbUrl ++ url_encode(DocId) ++ "?revs=true&latest=true",
     
     %% MochiWeb expects URLs < 8KB long, so maybe split into multiple requests
@@ -612,39 +638,52 @@
             lists:flatten(?JSON_ENCODE(lists:reverse(Rest))), get, Headers)
     end,
     
-    Results = 
-    lists:map(fun({[{<<"missing">>, Rev}]}) ->
-        {{not_found, missing}, Rev};
-    ({[{<<"ok">>, JsonDoc}]}) ->
+    Results =
+    lists:map(
+        fun({[{<<"missing">>, Rev}]}) ->
+            {{not_found, missing}, couch_doc:parse_rev(Rev)};
+        ({[{<<"ok">>, JsonDoc}]}) ->
         #doc{id=Id, attachments=Attach} = Doc = couch_doc:from_json_obj(JsonDoc),
         Attach2 = [attachment_stub_converter(DbS,Id,A) || A <- Attach],
         {ok, Doc#doc{attachments=Attach2}}
-    end, JsonResults),
+        end, JsonResults),
     {ok, Results};
 open_doc_revs(Db, DocId, Revs, Options) ->
     couch_db:open_doc_revs(Db, DocId, Revs, Options).
 
-update_docs(_, [], _, _) ->
-    ok;
-update_docs(#http_db{uri=DbUrl, headers=Headers}, Docs, [], NewEdits) ->
-    JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
-    {Returned} =
-        do_http_request(DbUrl ++ "_bulk_docs", post, Headers,
-                {[{new_edits, NewEdits}, {docs, JsonDocs}]}),
-    true = proplists:get_value(<<"ok">>, Returned),
-    ok;
-update_docs(Db, Docs, Options, NewEdits) ->
-    couch_db:update_docs(Db, Docs, Options, NewEdits).
 
-update_local_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, []) ->
+update_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, Options) ->
+    [] = Options,
     Url = DbUrl ++ url_encode(DocId),
     {ResponseMembers} = do_http_request(Url, put, Headers,
-            couch_doc:to_json_obj(Doc, [revs,attachments])),
-    RevId = proplists:get_value(<<"rev">>, ResponseMembers),
-    {ok, RevId};
-update_local_doc(Db, Doc, Options) ->
+            couch_doc:to_json_obj(Doc, [attachments])),
+    Rev = proplists:get_value(<<"rev">>, ResponseMembers),
+    {ok, couch_doc:parse_rev(Rev)};
+update_doc(Db, Doc, Options) ->
     couch_db:update_doc(Db, Doc, Options).
 
+update_docs(_, [], _, _) ->
+    {ok, []};
+update_docs(#http_db{uri=DbUrl, headers=Headers}, Docs, [], replicated_changes) ->
+    JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
+    ErrorsJson =
+        do_http_request(DbUrl ++ "_bulk_docs", post, Headers,
+                {[{new_edits, false}, {docs, JsonDocs}]}),
+    ErrorsList =
+    lists:map(
+        fun({Props}) ->
+            Id = proplists:get_value(<<"id">>, Props),
+            Rev = couch_doc:parse_rev(proplists:get_value(<<"rev">>, Props)),
+            ErrId = couch_util:to_existing_atom(
+                    proplists:get_value(<<"error">>, Props)),
+            Reason = proplists:get_value(<<"reason">>, Props),
+            Error = {ErrId, Reason},
+            {{Id, Rev}, Error}
+        end, ErrorsJson),
+    {ok, ErrorsList};
+update_docs(Db, Docs, Options, UpdateType) ->
+    couch_db:update_docs(Db, Docs, Options, UpdateType).
+
 up_to_date(#http_db{}, _Seq) ->
     true;
 up_to_date(Source, Seq) ->

Modified: couchdb/trunk/src/couchdb/couch_util.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_util.erl?rev=753448&r1=753447&r2=753448&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_util.erl (original)
+++ couchdb/trunk/src/couchdb/couch_util.erl Fri Mar 13 22:15:34 2009
@@ -13,7 +13,7 @@
 -module(couch_util).
 
 -export([start_driver/1]).
--export([should_flush/0, should_flush/1, to_existing_atom/1]).
+-export([should_flush/0, should_flush/1, to_existing_atom/1, to_binary/1]).
 -export([new_uuid/0, rand32/0, implode/2, collate/2, collate/3]).
 -export([abs_pathname/1,abs_pathname/2, trim/1, ascii_lower/1]).
 -export([encodeBase64/1, decodeBase64/1, to_hex/1,parse_term/1,dict_find/3]).
@@ -57,6 +57,19 @@
 to_digit(N)             -> $a + N-10.
 
 
+to_binary(V) when is_binary(V) ->
+    V;
+to_binary(V) when is_list(V) -> 
+    try list_to_binary(V)
+    catch
+        _ -> list_to_binary(io_lib:format("~p", [V]))
+    end;
+to_binary(V) when is_atom(V) ->
+    list_to_binary(atom_to_list(V));
+to_binary(V) ->
+    list_to_binary(io_lib:format("~p", [V])).
+
+
 parse_term(Bin) when is_binary(Bin)->
     parse_term(binary_to_list(Bin));
 parse_term(List) ->



Mime
View raw message