couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kocol...@apache.org
Subject svn commit: r802145 - in /couchdb/trunk: src/couchdb/Makefile.am src/couchdb/couch_db.hrl src/couchdb/couch_rep.erl src/couchdb/couch_rep_httpc.erl test/etap/110-replication-httpc.t
Date Fri, 07 Aug 2009 19:12:39 GMT
Author: kocolosk
Date: Fri Aug  7 19:12:39 2009
New Revision: 802145

URL: http://svn.apache.org/viewvc?rev=802145&view=rev
Log:
ibrowse wrapper for replicator, will replace do_http_request

Added:
    couchdb/trunk/src/couchdb/couch_rep_httpc.erl
    couchdb/trunk/test/etap/110-replication-httpc.t
Modified:
    couchdb/trunk/src/couchdb/Makefile.am
    couchdb/trunk/src/couchdb/couch_db.hrl
    couchdb/trunk/src/couchdb/couch_rep.erl

Modified: couchdb/trunk/src/couchdb/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/Makefile.am?rev=802145&r1=802144&r2=802145&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/Makefile.am (original)
+++ couchdb/trunk/src/couchdb/Makefile.am Fri Aug  7 19:12:39 2009
@@ -75,6 +75,7 @@
     couch_ref_counter.erl \
     couch_rep.erl \
     couch_rep_changes_feed.erl \
+    couch_rep_httpc.erl \
     couch_rep_sup.erl \
     couch_server.erl \
     couch_server_sup.erl \
@@ -122,6 +123,7 @@
     couch_ref_counter.beam \
     couch_rep.beam \
     couch_rep_changes_feed.beam \
+    couch_rep_httpc.beam \
     couch_rep_sup.beam \
     couch_server.beam \
     couch_server_sup.beam \

Modified: couchdb/trunk/src/couchdb/couch_db.hrl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_db.hrl?rev=802145&r1=802144&r2=802145&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_db.hrl (original)
+++ couchdb/trunk/src/couchdb/couch_db.hrl Fri Aug  7 19:12:39 2009
@@ -228,5 +228,26 @@
     view_states=nil
     }).
 
+-record(http_db, {
+    url,
+    auth = [],
+    resource = "",
+    headers = [
+        {"User-Agent", "CouchDb/"++couch_server:get_version()},
+        {"Accept", "application/json"},
+        {"Accept-Encoding", "gzip"}
+    ],
+    qs = [],
+    method = get,
+    body = nil,
+    options = [
+        {response_format,binary},
+        {inactivity_timeout, 30000}
+    ],
+    retries = 10,
+    pause = 1,
+    conn = nil
+}).
+
 % small value used in revision trees to indicate the revision isn't stored
 -define(REV_MISSING, []).

Modified: couchdb/trunk/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep.erl?rev=802145&r1=802144&r2=802145&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep.erl Fri Aug  7 19:12:39 2009
@@ -82,7 +82,7 @@
 %% gen_server callbacks
 %%=============================================================================
 
--record(http_db, {
+-record(old_http_db, {
     uri,
     headers,
     oauth
@@ -391,7 +391,7 @@
 
 att_stub_converter(DbS, Id, Rev,
         #att{name=Name,data=stub,type=Type,len=Length}=Att) ->
-    #http_db{uri=DbUrl, headers=Headers} = DbS,
+    #old_http_db{uri=DbUrl, headers=Headers} = DbS,
     {Pos, [RevId|_]} = Rev,
     Url = lists:flatten([DbUrl, couch_util:url_encode(Id), "/", couch_util:url_encode(?b2l(Name)),
         "?rev=", ?b2l(couch_doc:rev_to_str({Pos,RevId}))]),
@@ -498,7 +498,7 @@
 
 
 open_db({remote, Url, Headers, Auth})->
-    {ok, #http_db{uri=?b2l(Url), headers=Headers, oauth=Auth}, Url};
+    {ok, #old_http_db{uri=?b2l(Url), headers=Headers, oauth=Auth}, Url};
 open_db({local, DbName, UserCtx})->
     case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
     {ok, Db} -> {ok, Db, DbName};
@@ -506,7 +506,7 @@
     end.
 
 
-close_db(#http_db{})->
+close_db(#old_http_db{})->
     ok;
 close_db(Db)->
     couch_db:close(Db).
@@ -675,7 +675,7 @@
         do_http_request0(Url, Action, Headers, JsonBody, Retries - 1, 2*Pause)
     end.
 
-ensure_full_commit(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) ->
+ensure_full_commit(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) ->
     {ResultProps} = do_http_request(DbUrl ++ "_ensure_full_commit", post,
         Headers, OAuth, true),
     true = proplists:get_value(<<"ok">>, ResultProps),
@@ -707,13 +707,13 @@
 
 
 
-get_db_info(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) ->
+get_db_info(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) ->
     {DbProps} = do_http_request(DbUrl, get, Headers, OAuth),
     {ok, [{list_to_atom(?b2l(K)), V} || {K,V} <- DbProps]};
 get_db_info(Db) ->
     couch_db:get_db_info(Db).
 
-get_doc_info_list(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, StartSeq) ->
+get_doc_info_list(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, StartSeq) ->
     Url = DbUrl ++ "_all_docs_by_seq?limit=100&startkey="
         ++ integer_to_list(StartSeq),
     {Results} = do_http_request(Url, get, Headers, OAuth),
@@ -739,7 +739,7 @@
     end, {0, []}),
     lists:reverse(DocInfoList).
 
-get_missing_revs(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocIdRevsList) ->
+get_missing_revs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocIdRevsList) ->
     DocIdRevsList2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- DocIdRevsList],
     {ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, Headers, OAuth,
             {DocIdRevsList2}),
@@ -750,7 +750,7 @@
     couch_db:get_missing_revs(Db, DocId).
 
 
-open_doc(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocId, Options) ->
+open_doc(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocId, Options) ->
     [] = Options,
     case do_http_request(DbUrl ++ couch_util:url_encode(DocId), get, Headers, OAuth) of
     {[{<<"error">>, ErrId}, {<<"reason">>, Reason}]} ->
@@ -761,7 +761,7 @@
 open_doc(Db, DocId, Options) ->
     couch_db:open_doc(Db, DocId, Options).
 
-open_doc_revs(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth} = DbS, DocId, Revs0,
+open_doc_revs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth} = DbS, DocId, Revs0,
         [latest]) ->
     Revs = couch_doc:rev_to_strs(Revs0),
     BaseUrl = DbUrl ++ couch_util:url_encode(DocId) ++ "?revs=true&latest=true",
@@ -845,7 +845,7 @@
     lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end,
         0, element(2,process_info(Pid, binary))).
 
-update_doc(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, #doc{id=DocId}=Doc, Options)
->
+update_doc(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, #doc{id=DocId}=Doc, Options)
->
     [] = Options,
     Url = DbUrl ++ couch_util:url_encode(DocId),
     {ResponseMembers} = do_http_request(Url, put, Headers, OAuth,
@@ -857,7 +857,7 @@
 
 update_docs(_, [], _, _) ->
     {ok, []};
-update_docs(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, Docs, [], replicated_changes)
->
+update_docs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, Docs, [], replicated_changes)
->
     JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
     ErrorsJson =
         do_http_request(DbUrl ++ "_bulk_docs", post, Headers, OAuth,
@@ -877,7 +877,7 @@
 update_docs(Db, Docs, Options, UpdateType) ->
     couch_db:update_docs(Db, Docs, Options, UpdateType).
 
-up_to_date(#http_db{}, _Seq) ->
+up_to_date(#old_http_db{}, _Seq) ->
     true;
 up_to_date(Source, Seq) ->
     {ok, NewDb} = couch_db:open(Source#db.name, []),

Added: couchdb/trunk/src/couchdb/couch_rep_httpc.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_httpc.erl?rev=802145&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_httpc.erl (added)
+++ couchdb/trunk/src/couchdb/couch_rep_httpc.erl Fri Aug  7 19:12:39 2009
@@ -0,0 +1,163 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rep_httpc).
+-include("couch_db.hrl").
+-include("../ibrowse/ibrowse.hrl").
+
+-export([db_exists/1, full_url/1, request/1, spawn_worker_process/1,
+    spawn_link_worker_process/1]).
+
+request(Req) when is_record(Req, http_db) ->
+    do_request(Req).
+
+do_request(#http_db{url=Url} = Req) when is_binary(Url) ->
+    do_request(Req#http_db{url = ?b2l(Url)});
+
+do_request(Req) ->
+    #http_db{
+        auth = Auth,
+        headers = Headers0,
+        method = Method,
+        body = B,
+        options = Opts,
+        conn = Conn
+    } = Req,
+    Url = full_url(Req),
+    Headers = case proplists:get_value(<<"oauth">>, Auth) of
+    undefined ->
+        Headers0;
+    {OAuthProps} ->
+        [oauth_header(Url, Method, OAuthProps) | Headers0]
+    end,
+    Body = if B =:= nil -> []; true -> iolist_to_binary(?JSON_ENCODE(B)) end,
+    Resp = case Conn of
+    nil ->
+        ibrowse:send_req(Url, Headers, Method, Body, Opts, infinity);
+    _ ->
+        ibrowse:send_req_direct(Conn, Url, Headers, Method, Body, Opts, infinity)
+    end,
+    process_response(Resp, Req).
+
+db_exists(Req) ->
+    #http_db{
+        url = Url,
+        headers = Headers
+    } = Req,
+    case catch ibrowse:send_req(Url, Headers, head) of
+    {ok, "200", _, _} ->
+        true;
+    {ok, "301", Headers, _} ->
+        MochiHeaders = mochiweb_headers:make(Headers),
+        RedirectUrl = mochiweb_headers:get_value("Location", MochiHeaders),
+        db_exists(Req#http_db{url = RedirectUrl});
+    Error ->
+        ?LOG_DEBUG("DB at ~s could not be found because ~p", [Url, Error]),
+        false
+    end.
+
+full_url(#http_db{url=Url} = Req) when is_binary(Url) ->
+    full_url(Req#http_db{url = ?b2l(Url)});
+
+full_url(#http_db{qs=[]} = Req) ->
+    Req#http_db.url ++ Req#http_db.resource;
+
+full_url(Req) ->
+    #http_db{
+        url = Url,
+        resource = Resource,
+        qs = QS
+    } = Req,
+    QStr = lists:map(fun({K,V}) -> io_lib:format("~s=~s", 
+        [couch_util:to_list(K), couch_util:to_list(V)]) end, QS),
+    lists:flatten([Url, Resource, "?", string:join(QStr, "&")]).
+
+process_response({ok, Status, Headers, Body}, Req) ->
+    Code = list_to_integer(Status),
+    if Code =:= 200; Code =:= 201 ->
+        ?JSON_DECODE(maybe_decompress(Headers, Body));
+    Code =:= 301 ->
+        MochiHeaders = mochiweb_headers:make(Headers),
+        RedirectUrl = mochiweb_headers:get_value("Location", MochiHeaders),
+        do_request(Req#http_db{url = RedirectUrl});
+    Code >= 400, Code < 500 ->
+        ?JSON_DECODE(maybe_decompress(Headers, Body));
+    Code =:= 500; Code =:= 502 ->
+        #http_db{pause = Pause, retries = Retries} = Req,
+        ?LOG_INFO("retrying couch_rep_httpc request in ~p seconds " ++
+            % "due to remote server error: ~s~s", [Pause/1000, Req#http_db.url,
+            "due to remote server error: ~p Body ~s", [Pause, Code,
+            Body]),
+        timer:sleep(1000*Pause),
+        do_request(Req#http_db{retries = Retries-1, pause = 2*Pause})
+    end;
+
+process_response({ibrowse_req_id, Id}, _Req) ->
+    {ibrowse_req_id, Id};
+
+process_response({error, _Reason}, #http_db{url=Url, retries=0}) ->
+    ?LOG_ERROR("couch_rep_httpc request failed after 10 retries: ~s", [Url]),
+    exit({http_request_failed, ?l2b(["failed to replicate ", Url])});
+process_response({error, Reason}, Req) ->
+    #http_db{
+        method = Method,
+        retries = Retries,
+        pause = Pause
+    } = Req,
+    ShortReason = case Reason of
+    connection_closed ->
+        connection_closed;
+    {'EXIT', {noproc, _}} ->
+        noproc;
+    {'EXIT', {normal, _}} ->
+        normal;
+    Else ->
+        Else
+    end,
+    ?LOG_DEBUG("retrying couch_rep_httpc ~p request in ~p seconds due to " ++
+        "{error, ~p}", [Method, Pause, ShortReason]),
+        % "{error}", [Method, Pause]),
+    timer:sleep(1000*Pause),
+    do_request(Req#http_db{retries = Retries-1, pause = 2*Pause}).
+
+spawn_worker_process(Req) ->
+    Url = ibrowse_lib:parse_url(Req#http_db.url),
+    {ok, Pid} = ibrowse:spawn_worker_process(Url#url.host, Url#url.port),
+    Pid.
+
+spawn_link_worker_process(Req) ->
+    Url = ibrowse_lib:parse_url(Req#http_db.url),
+    {ok, Pid} = ibrowse:spawn_link_worker_process(Url#url.host, Url#url.port),
+    Pid.
+
+maybe_decompress(Headers, Body) ->
+    MochiHeaders = mochiweb_headers:make(Headers),
+    case mochiweb_headers:get_value("Content-Encoding", MochiHeaders) of
+    "gzip" ->
+        zlib:gunzip(Body);
+    _ ->
+        Body
+    end.
+
+oauth_header(Url, Action, Props) ->
+    ConsumerKey = ?b2l(proplists:get_value(<<"consumer_key">>, Props)),
+    Token = ?b2l(proplists:get_value(<<"token">>, Props)),
+    TokenSecret = ?b2l(proplists:get_value(<<"token_secret">>, Props)),
+    ConsumerSecret = ?b2l(proplists:get_value(<<"consumer_secret">>, Props)),
+    Consumer = {ConsumerKey, ConsumerSecret, hmac_sha1},
+    Method = case Action of
+        get -> "GET";
+        post -> "POST";
+        put -> "PUT"
+    end,
+    Params = oauth:signed_params(Method, Url, [], Consumer, Token, TokenSecret),
+    {"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)}.

Added: couchdb/trunk/test/etap/110-replication-httpc.t
URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/110-replication-httpc.t?rev=802145&view=auto
==============================================================================
--- couchdb/trunk/test/etap/110-replication-httpc.t (added)
+++ couchdb/trunk/test/etap/110-replication-httpc.t Fri Aug  7 19:12:39 2009
@@ -0,0 +1,134 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License.  You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% XXX: Figure out how to -include("couch_rep.hrl")
+-record(http_db, {
+    url,
+    auth = [],
+    resource = "",
+    headers = [
+        {"User-Agent", "CouchDb/"++couch_server:get_version()},
+        {"Accept", "application/json"},
+        {"Accept-Encoding", "gzip"}
+    ],
+    qs = [],
+    method = get,
+    body = nil,
+    options = [
+        {response_format,binary},
+        {inactivity_timeout, 30000}
+    ],
+    retries = 10,
+    pause = 1,
+    conn = nil
+}).
+
+-define(SERVER, "http://127.0.0.1:5984/").
+-define(DBNAME, "etap-test-db").
+
+main(_) ->
+    code:add_pathz("src/couchdb"),
+    code:add_pathz("src/ibrowse"),
+    code:add_pathz("src/mochiweb"),
+    code:add_pathz("src/erlang-oauth"),
+    
+    etap:plan(7),
+    case (catch test()) of
+        ok ->
+            etap:end_tests();
+        Other ->
+            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
+            etap:bail(Other)
+    end,
+    ok.
+
+test() ->
+    couch_server:start(
+        ["etc/couchdb/default_dev.ini", "etc/couchdb/local_dev.ini"]
+    ),
+    ibrowse:start(),
+    crypto:start(),
+
+    couch_server:delete(list_to_binary(?DBNAME), []),
+    {ok, Db} = couch_db:create(list_to_binary(?DBNAME), []),
+
+    test_welcome(),
+    test_binary_url(),
+    test_put(),
+    test_qs(),
+    test_db_exists(),
+
+    couch_db:close(Db),
+    couch_server:delete(list_to_binary(?DBNAME), []),
+    ok.
+
+test_welcome() ->
+    WelcomeReq = #http_db{url=?SERVER},
+    Expect = {[
+        {<<"couchdb">>, <<"Welcome">>},
+        {<<"version">>, list_to_binary(couch_server:get_version())}
+    ]},
+    etap:is(
+        couch_rep_httpc:request(WelcomeReq),
+        Expect,
+        "welcome request with url-as-list"
+    ).
+
+test_binary_url() ->
+    Req = #http_db{url=list_to_binary(?SERVER)},
+    Expect = {[
+        {<<"couchdb">>, <<"Welcome">>},
+        {<<"version">>, list_to_binary(couch_server:get_version())}
+    ]},
+    Fun = fun(Expect) -> true; (_) -> false end,
+    etap:is(
+        couch_rep_httpc:request(Req),
+        Expect,
+        "welcome request with url-as-binary"
+    ).
+
+test_put() ->
+    Req = #http_db{
+        url = ?SERVER ++ ?DBNAME ++ "/",
+        resource = "test_put",
+        body = {[{<<"foo">>, <<"bar">>}]},
+        method = put
+    },
+    {Resp} = couch_rep_httpc:request(Req),
+    etap:ok(proplists:get_value(<<"ok">>, Resp), "ok:true on upload"),
+    etap:is(<<"test_put">>, proplists:get_value(<<"id">>, Resp),
"id is correct").
+
+test_qs() ->
+    Req = #http_db{
+        url = ?SERVER ++ ?DBNAME ++ "/",
+        resource = "foo",
+        qs = [
+            {bar, true},
+            {baz, 1.03},
+            {bif, mochijson2:encode(<<"1-23456">>)}
+        ]
+    },
+    Expect = ?SERVER ++ ?DBNAME ++ "/foo?bar=true&baz=1.03&bif=\"1-23456\"",
+    etap:is(
+        couch_rep_httpc:full_url(Req),
+        Expect,
+        "query-string proplist encoding ok"
+    ).
+
+test_db_exists() ->
+    Req1 = #http_db{url=?SERVER ++ ?DBNAME ++ "/"},
+    Req2 = #http_db{url=?SERVER ++ ?DBNAME ++ "_foo/"},
+    etap:ok(couch_rep_httpc:db_exists(Req1), "db_exists true check"),
+    etap:is(couch_rep_httpc:db_exists(Req2), false, "db_exists false check").



Mime
View raw message