couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kocol...@apache.org
Subject svn commit: r810350 - in /couchdb/trunk/src: couchdb/couch_app.erl couchdb/couch_rep_changes_feed.erl couchdb/couch_rep_httpc.erl ibrowse/ibrowse_http_client.erl
Date Wed, 02 Sep 2009 03:40:45 GMT
Author: kocolosk
Date: Wed Sep  2 03:40:44 2009
New Revision: 810350

URL: http://svn.apache.org/viewvc?rev=810350&view=rev
Log:
Support for replication over SSL.  Resolves COUCHDB-491

This turned out to be a decent amount of work, since:

1) ibrowse did not use SSL on dedicated connections.  Wrote a simplistic patch,
   will contact Chandru for further discussion.
2) When nginx is used for the SSL wrapper, it wants to buffer the changes feed.
   Setting "proxy_buffering off" in nginx.conf helps, but some buffering still
   occurred. Fixed by making couch_rep_changes_feed smart enough to split
   merged chunks.
3) The Erlang ssl application showed instabilities when used with {active,once}.
   Switched to the "new implementation" using {ssl_imp, new} and instabilities
   disappeared.


Modified:
    couchdb/trunk/src/couchdb/couch_app.erl
    couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl
    couchdb/trunk/src/couchdb/couch_rep_httpc.erl
    couchdb/trunk/src/ibrowse/ibrowse_http_client.erl

Modified: couchdb/trunk/src/couchdb/couch_app.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_app.erl?rev=810350&r1=810349&r2=810350&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_app.erl (original)
+++ couchdb/trunk/src/couchdb/couch_app.erl Wed Sep  2 03:40:44 2009
@@ -20,7 +20,7 @@
 
 start(_Type, DefaultIniFiles) ->
     IniFiles = get_ini_files(DefaultIniFiles),
-    case start_apps([crypto, sasl, inets, oauth, ibrowse, mochiweb]) of
+    case start_apps([crypto, sasl, inets, oauth, ssl, ibrowse, mochiweb]) of
     ok ->
         couch_server_sup:start_link(IniFiles);
     {error, Reason} ->

Modified: couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl?rev=810350&r1=810349&r2=810350&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl Wed Sep  2 03:40:44 2009
@@ -30,7 +30,7 @@
     reqid = nil,
     complete = false,
     count = 0,
-    partial_chunk = nil,
+    partial_chunk = <<>>,
     reply_to = nil,
     rows = queue:new()
 }).
@@ -60,7 +60,7 @@
         conn = Pid,
         options = [{stream_to, {self(), once}}, {response_format, binary}],
         headers = Source#http_db.headers -- [{"Accept-Encoding", "gzip"}]
-    },    
+    },
     {ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req),
 
     receive
@@ -127,8 +127,12 @@
 handle_info({ibrowse_async_headers, Id, Code, Hdrs}, #state{reqid=Id}=State) ->
     handle_headers(list_to_integer(Code), Hdrs, State);
 
-handle_info({ibrowse_async_response, Id, Msg}, #state{reqid=Id} = State) ->
-    handle_response(Msg, State);
+handle_info({ibrowse_async_response, Id, {error,E}}, #state{reqid=Id}=State) ->
+    {stop, {error, E}, State};
+
+handle_info({ibrowse_async_response, Id, Chunk}, #state{reqid=Id}=State) ->
+    Messages = [M || M <- re:split(Chunk, ",?\n", [trim]), M =/= <<>>],
+    handle_messages(Messages, State);
 
 handle_info({ibrowse_async_response_end, Id}, #state{reqid=Id} = State) ->
     handle_feed_completion(State);
@@ -200,60 +204,41 @@
         [Code,Hdrs]),
     {stop, {error, Code}, State}.
 
-handle_response({error, Reason}, State) ->
-    {stop, {error, Reason}, State};
-handle_response(<<"\n">>, State) ->
-    ?LOG_DEBUG("got a heartbeat from the remote server", []),
-    ok = maybe_stream_next(State),
-    {noreply, State};
-handle_response(<<"{\"results\":[\n">>, State) ->
+handle_messages([], State) ->
     ok = maybe_stream_next(State),
     {noreply, State};
-handle_response(<<"\n],\n\"last_seq\":", LastSeqStr/binary>>, State) ->
+handle_messages([<<"{\"results\":[">>|Rest], State) ->
+    handle_messages(Rest, State);
+handle_messages([<<"]">>, <<"\"last_seq\":", LastSeqStr/binary>>],
State) ->
     LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))),
-    {noreply, State#state{last_seq = LastSeq}};
-handle_response(<<"{\"last_seq\":", LastSeqStr/binary>>, State) ->
+    handle_feed_completion(State#state{last_seq = LastSeq});
+handle_messages([<<"{\"last_seq\":", LastSeqStr/binary>>], State) ->
     LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))),
-    {noreply, State#state{last_seq = LastSeq}};
-handle_response(Chunk, #state{partial_chunk=nil} = State) ->
-    #state{
-        count = Count,
-        rows = Rows
-    } = State,
-    ok = maybe_stream_next(State),
-    try
-        Row = decode_row(Chunk),
-        case State of
-        #state{reply_to=nil} ->
-            {noreply, State#state{count=Count+1, rows = queue:in(Row, Rows)}};
-        #state{count=0, reply_to=From}->
-            gen_server:reply(From, [Row]),
-            {noreply, State#state{reply_to=nil}}
-        end
-    catch
-    throw:{invalid_json, Bad} ->
-        {noreply, State#state{partial_chunk = Bad}}
-    end;
-handle_response(Chunk, State) ->
+    handle_feed_completion(State#state{last_seq = LastSeq});
+handle_messages([Chunk|Rest], State) ->
     #state{
         count = Count,
         partial_chunk = Partial,
         rows = Rows
     } = State,
-    ok = maybe_stream_next(State),
-    try
+    NewState = try
         Row = decode_row(<<Partial/binary, Chunk/binary>>),
-        {noreply, case State of
+        case State of
         #state{reply_to=nil} ->
-            State#state{count=Count+1, partial_chunk=nil, rows=queue:in(Row,Rows)};
+            State#state{
+                count = Count+1,
+                partial_chunk = <<>>,
+                rows=queue:in(Row,Rows)
+            };
         #state{count=0, reply_to=From}->
             gen_server:reply(From, [Row]),
-            State#state{reply_to=nil, partial_chunk=nil}
-        end}
+            State#state{reply_to = nil, partial_chunk = <<>>}
+        end
     catch
     throw:{invalid_json, Bad} ->
-        {noreply, State#state{partial_chunk = Bad}}
-    end.
+        State#state{partial_chunk = Bad}
+    end,
+    handle_messages(Rest, NewState).
 
 handle_feed_completion(#state{reply_to=nil} = State)->
     {noreply, State#state{complete=true}};

Modified: couchdb/trunk/src/couchdb/couch_rep_httpc.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_httpc.erl?rev=810350&r1=810349&r2=810350&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_httpc.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_httpc.erl Wed Sep  2 03:40:44 2009
@@ -149,12 +149,12 @@
 
 spawn_worker_process(Req) ->
     Url = ibrowse_lib:parse_url(Req#http_db.url),
-    {ok, Pid} = ibrowse:spawn_worker_process(Url#url.host, Url#url.port),
+    {ok, Pid} = ibrowse_http_client:start(Url),
     Pid.
 
 spawn_link_worker_process(Req) ->
     Url = ibrowse_lib:parse_url(Req#http_db.url),
-    {ok, Pid} = ibrowse:spawn_link_worker_process(Url#url.host, Url#url.port),
+    {ok, Pid} = ibrowse_http_client:start_link(Url),
     Pid.
 
 maybe_decompress(Headers, Body) ->

Modified: couchdb/trunk/src/ibrowse/ibrowse_http_client.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/ibrowse/ibrowse_http_client.erl?rev=810350&r1=810349&r2=810350&view=diff
==============================================================================
--- couchdb/trunk/src/ibrowse/ibrowse_http_client.erl (original)
+++ couchdb/trunk/src/ibrowse/ibrowse_http_client.erl Wed Sep  2 03:40:44 2009
@@ -113,6 +113,16 @@
 		   port = Port},
     put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]),
     put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
+    {ok, State};
+init(#url{host=Host, port=Port, protocol=Protocol}) ->
+    State = #state{
+        host = Host,
+        port = Port,
+        is_ssl = (Protocol == https),
+        ssl_options = [{ssl_imp, new}]
+    },
+    put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]),
+    put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
     {ok, State}.
 
 %%--------------------------------------------------------------------
@@ -137,7 +147,7 @@
 handle_call(stop, _From, State) ->
     do_close(State),
     do_error_reply(State, closing_on_request),
-    {stop, normal, ok, State};
+    {stop, normal, ok, State#state{socket=undefined}};
 
 handle_call(Request, _From, State) ->
     Reply = {unknown_request, Request},



Mime
View raw message