couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1000880 - in /couchdb/trunk/src: couchdb/couch_rep_changes_feed.erl couchdb/couch_rep_httpc.erl ibrowse/Makefile.am ibrowse/ibrowse.app.in ibrowse/ibrowse.erl ibrowse/ibrowse_http_client.erl ibrowse/ibrowse_lb.erl ibrowse/ibrowse_test.erl
Date Fri, 24 Sep 2010 14:18:56 GMT
Author: fdmanana
Date: Fri Sep 24 14:18:56 2010
New Revision: 1000880

URL: http://svn.apache.org/viewvc?rev=1000880&view=rev
Log:
Upgrading ibrowse from version 1.6.2 to 2.0.1.
This version fixes a serious issue regarding streaming of chunked HTTP(S) responses.
The issue is that the client occasionally gets blocked or receives a timeout (if inactivity_timeout
parameter is given to ibrowse).

This fixes part of ticket COUCHDB-491.



Modified:
    couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl
    couchdb/trunk/src/couchdb/couch_rep_httpc.erl
    couchdb/trunk/src/ibrowse/Makefile.am
    couchdb/trunk/src/ibrowse/ibrowse.app.in
    couchdb/trunk/src/ibrowse/ibrowse.erl
    couchdb/trunk/src/ibrowse/ibrowse_http_client.erl
    couchdb/trunk/src/ibrowse/ibrowse_lb.erl
    couchdb/trunk/src/ibrowse/ibrowse_test.erl

Modified: couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl?rev=1000880&r1=1000879&r2=1000880&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl Fri Sep 24 14:18:56 2010
@@ -187,7 +187,7 @@ handle_cast(_Msg, State) ->
 handle_info({ibrowse_async_headers, Id, Code, Hdrs}, #state{reqid=Id}=State) ->
     handle_headers(list_to_integer(Code), Hdrs, State);
 
-handle_info({ibrowse_async_response, Id, {error,connection_closed}},
+handle_info({ibrowse_async_response, Id, {error, sel_conn_closed}},
         #state{reqid=Id}=State) ->
     handle_retry(State);
 

Modified: couchdb/trunk/src/couchdb/couch_rep_httpc.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_httpc.erl?rev=1000880&r1=1000879&r2=1000880&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_httpc.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_httpc.erl Fri Sep 24 14:18:56 2010
@@ -168,7 +168,7 @@ process_response({error, Reason}, Req) -
         pause = Pause
     } = Req,
     ShortReason = case Reason of
-    connection_closed ->
+    sel_conn_closed ->
         connection_closed;
     {'EXIT', {noproc, _}} ->
         noproc;

Modified: couchdb/trunk/src/ibrowse/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/ibrowse/Makefile.am?rev=1000880&r1=1000879&r2=1000880&view=diff
==============================================================================
--- couchdb/trunk/src/ibrowse/Makefile.am (original)
+++ couchdb/trunk/src/ibrowse/Makefile.am Fri Sep 24 14:18:56 2010
@@ -10,7 +10,7 @@
 ## License for the specific language governing permissions and limitations under
 ## the License.
 
-ibrowseebindir = $(localerlanglibdir)/ibrowse-1.6.2/ebin
+ibrowseebindir = $(localerlanglibdir)/ibrowse-2.0.1/ebin
 
 ibrowse_file_collection = \
 	ibrowse.app.in \

Modified: couchdb/trunk/src/ibrowse/ibrowse.app.in
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/ibrowse/ibrowse.app.in?rev=1000880&r1=1000879&r2=1000880&view=diff
==============================================================================
--- couchdb/trunk/src/ibrowse/ibrowse.app.in (original)
+++ couchdb/trunk/src/ibrowse/ibrowse.app.in Fri Sep 24 14:18:56 2010
@@ -1,6 +1,6 @@
 {application, ibrowse,
         [{description, "HTTP client application"},
-         {vsn, "1.6.2"},
+         {vsn, "2.0.1"},
          {modules, [ ibrowse, 
 		     ibrowse_http_client, 
 		     ibrowse_app, 

Modified: couchdb/trunk/src/ibrowse/ibrowse.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/ibrowse/ibrowse.erl?rev=1000880&r1=1000879&r2=1000880&view=diff
==============================================================================
--- couchdb/trunk/src/ibrowse/ibrowse.erl (original)
+++ couchdb/trunk/src/ibrowse/ibrowse.erl Fri Sep 24 14:18:56 2010
@@ -7,7 +7,7 @@
 %%%-------------------------------------------------------------------
 %% @author Chandrashekhar Mullaparthi <chandrashekhar dot mullaparthi at gmail dot com>
 %% @copyright 2005-2010 Chandrashekhar Mullaparthi
-%% @version 1.6.0
+%% @version 2.0.1
 %% @doc The ibrowse application implements an HTTP 1.1 client. This
 %% module implements the API of the HTTP client. There is one named
 %% process called 'ibrowse' which assists in load balancing and maintaining configuration.
There is one load balancing process per unique webserver. There is
@@ -236,6 +236,11 @@ send_req(Url, Headers, Method, Body) ->
 %% caller to get access to the raw status line and raw unparsed
 %% headers. Not quite sure why someone would want this, but one of my
 %% users asked for it, so here it is. </li>
+%%
+%% <li> The <code>preserve_chunked_encoding</code> option enables the caller
+%% to receive the raw data stream when the Transfer-Encoding of the server
+%% response is Chunked.
+%% </li>
 %% </ul>
 %%
 %% @spec send_req(Url::string(), Headers::headerList(), Method::method(), Body::body(), Options::optionList())
-> response()
@@ -266,7 +271,8 @@ send_req(Url, Headers, Method, Body) ->
 %%          {socket_options, Sock_opts}        |
 %%          {transfer_encoding, {chunked, ChunkSize}} | 
 %%          {headers_as_is, boolean()}         |
-%%          {give_raw_headers, boolean()}
+%%          {give_raw_headers, boolean()}      |
+%%          {preserve_chunked_encoding,boolean()}
 %%
 %% stream_to() = process() | {process(), once}
 %% process() = pid() | atom()
@@ -302,23 +308,45 @@ send_req(Url, Headers, Method, Body, Opt
             Options_1 = merge_options(Host, Port, Options),
             {SSLOptions, IsSSL} =
                 case (Protocol == https) orelse
-                     get_value(is_ssl, Options_1, false) of
+                    get_value(is_ssl, Options_1, false) of
                     false -> {[], false};
                     true -> {get_value(ssl_options, Options_1, []), true}
                 end,
-            case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url,
+            try_routing_request(Lb_pid, Parsed_url,
+                                Max_sessions, 
+                                Max_pipeline_size,
+                                {SSLOptions, IsSSL}, 
+                                Headers, Method, Body, Options_1, Timeout, 0);
+        Err ->
+            {error, {url_parsing_failed, Err}}
+    end.
+
+try_routing_request(Lb_pid, Parsed_url,
+                    Max_sessions, 
+                    Max_pipeline_size,
+                    {SSLOptions, IsSSL}, 
+                    Headers, Method, Body, Options_1, Timeout, Try_count) when Try_count
< 3 ->
+    case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url,
                                              Max_sessions, 
                                              Max_pipeline_size,
                                              {SSLOptions, IsSSL}) of
-                {ok, Conn_Pid} ->
-                    do_send_req(Conn_Pid, Parsed_url, Headers,
-                                Method, Body, Options_1, Timeout);
-                Err ->
-                    Err
+        {ok, Conn_Pid} ->
+            case do_send_req(Conn_Pid, Parsed_url, Headers,
+                             Method, Body, Options_1, Timeout) of
+                {error, sel_conn_closed} ->
+                    try_routing_request(Lb_pid, Parsed_url,
+                                        Max_sessions, 
+                                        Max_pipeline_size,
+                                        {SSLOptions, IsSSL}, 
+                                        Headers, Method, Body, Options_1, Timeout, Try_count
+ 1);
+                Res ->
+                    Res
             end;
         Err ->
-            {error, {url_parsing_failed, Err}}
-    end.
+            Err
+    end;
+try_routing_request(_, _, _, _, _, _, _, _, _, _, _) ->
+    {error, retry_later}.
 
 merge_options(Host, Port, Options) ->
     Config_options = get_config_value({options, Host, Port}, []),
@@ -337,11 +365,27 @@ get_lb_pid(Url) ->
 
 get_max_sessions(Host, Port, Options) ->
     get_value(max_sessions, Options,
-              get_config_value({max_sessions, Host, Port}, ?DEF_MAX_SESSIONS)).
+              get_config_value({max_sessions, Host, Port},
+                               default_max_sessions())).
 
 get_max_pipeline_size(Host, Port, Options) ->
     get_value(max_pipeline_size, Options,
-              get_config_value({max_pipeline_size, Host, Port}, ?DEF_MAX_PIPELINE_SIZE)).
+              get_config_value({max_pipeline_size, Host, Port},
+                               default_max_pipeline_size())).
+
+default_max_sessions() ->
+    safe_get_env(ibrowse, default_max_sessions, ?DEF_MAX_SESSIONS).
+
+default_max_pipeline_size() ->
+    safe_get_env(ibrowse, default_max_pipeline_size, ?DEF_MAX_PIPELINE_SIZE).
+
+safe_get_env(App, Key, Def_val) ->
+    case application:get_env(App, Key) of
+        undefined ->
+            Def_val;
+        {ok, Val} ->
+            Val
+    end.
 
 %% @doc Deprecated. Use set_max_sessions/3 and set_max_pipeline_size/3
 %% for achieving the same effect.
@@ -375,6 +419,10 @@ do_send_req(Conn_Pid, Parsed_url, Header
                                             Options, Timeout) of
         {'EXIT', {timeout, _}} ->
             {error, req_timedout};
+        {'EXIT', {noproc, {gen_server, call, [Conn_Pid, _, _]}}} ->
+            {error, sel_conn_closed};
+        {error, connection_closed} ->
+            {error, sel_conn_closed};
         {'EXIT', Reason} ->
             {error, {'EXIT', Reason}};
         {ok, St_code, Headers, Body} = Ret when is_binary(Body) ->
@@ -684,6 +732,10 @@ handle_call({get_lb_pid, #url{host = Hos
 
 handle_call(stop, _From, State) ->
     do_trace("IBROWSE shutting down~n", []),
+    ets:foldl(fun(#lb_pid{pid = Pid}, Acc) ->
+                      ibrowse_lb:stop(Pid),
+                      Acc
+              end, [], ibrowse_lb),
     {stop, normal, ok, State};
 
 handle_call({set_config_value, Key, Val}, _From, State) ->

Modified: couchdb/trunk/src/ibrowse/ibrowse_http_client.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/ibrowse/ibrowse_http_client.erl?rev=1000880&r1=1000879&r2=1000880&view=diff
==============================================================================
--- couchdb/trunk/src/ibrowse/ibrowse_http_client.erl (original)
+++ couchdb/trunk/src/ibrowse/ibrowse_http_client.erl Fri Sep 24 14:18:56 2010
@@ -47,7 +47,8 @@
                 status_line, raw_headers, 
                 is_closing, send_timer, content_length,
                 deleted_crlf = false, transfer_encoding,
-                chunk_size, chunk_size_buffer = <<>>, recvd_chunk_size,
+                chunk_size, chunk_size_buffer = <<>>,
+                recvd_chunk_size, interim_reply_sent = false,
                 lb_ets_tid, cur_pipeline_size = 0, prev_req_id
                }).
 
@@ -57,7 +58,7 @@
                   req_id,
                   stream_chunk_size,
                   save_response_to_file = false, 
-                  tmp_file_name, tmp_file_fd,
+                  tmp_file_name, tmp_file_fd, preserve_chunked_encoding,
                   response_format}).
 
 -import(ibrowse_lib, [
@@ -82,8 +83,13 @@ start_link(Args) ->
     gen_server:start_link(?MODULE, Args, []).
 
 stop(Conn_pid) ->
-    catch gen_server:call(Conn_pid, stop),
-    ok.
+    case catch gen_server:call(Conn_pid, stop) of
+        {'EXIT', {timeout, _}} ->
+            exit(Conn_pid, kill),
+            ok;
+        _ ->
+            ok
+    end.
 
 send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) ->
     gen_server:call(
@@ -171,6 +177,7 @@ handle_cast(_Msg, State) ->
 %%          {stop, Reason, State}            (terminate/2 is called)
 %%--------------------------------------------------------------------
 handle_info({tcp, _Sock, Data}, #state{status = Status} = State) ->
+%%    io:format("Recvd data: ~p~n", [Data]),
     do_trace("Data recvd in state: ~p. Size: ~p. ~p~n~n", [Status, size(Data), Data]),
     handle_sock_data(Data, State);
 handle_info({ssl, _Sock, Data}, State) ->
@@ -178,13 +185,14 @@ handle_info({ssl, _Sock, Data}, State) -
 
 handle_info({stream_next, Req_id}, #state{socket = Socket,
                                           cur_req = #request{req_id = Req_id}} = State) ->
+    %% io:format("Client process set {active, once}~n", []),
     do_setopts(Socket, [{active, once}], State),
     {noreply, State};
 
 handle_info({stream_next, _Req_id}, State) ->
     {noreply, State};
 
-handle_info({tcp_closed, _Sock}, State) ->
+handle_info({tcp_closed, _Sock}, State) ->    
     do_trace("TCP connection closed by peer!~n", []),
     handle_sock_closed(State),
     {stop, normal, State};
@@ -194,11 +202,11 @@ handle_info({ssl_closed, _Sock}, State) 
     {stop, normal, State};
 
 handle_info({tcp_error, _Sock}, State) ->
-    io:format("Error on connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
+    do_trace("Error on connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
     handle_sock_closed(State),
     {stop, normal, State};
 handle_info({ssl_error, _Sock}, State) ->
-    io:format("Error on SSL connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
+    do_trace("Error on SSL connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
     handle_sock_closed(State),
     {stop, normal, State};
 
@@ -233,7 +241,8 @@ handle_info(Info, State) ->
 %% Returns: any (ignored by gen_server)
 %%--------------------------------------------------------------------
 terminate(_Reason, State) ->
-    do_close(State).
+    do_close(State),
+    ok.
 
 %%--------------------------------------------------------------------
 %% Func: code_change/3
@@ -269,6 +278,7 @@ handle_sock_data(Data, #state{status = g
     end;
 
 handle_sock_data(Data, #state{status           = get_body,
+                              socket           = Socket,
                               content_length   = CL,
                               http_status_code = StatCode,
                               recvd_headers    = Headers,
@@ -293,6 +303,19 @@ handle_sock_data(Data, #state{status    
                     fail_pipelined_requests(State,
                                             {error, {Reason, {stat_code, StatCode}, Headers}}),
                     {stop, normal, State};
+                #state{cur_req = #request{caller_controls_socket = Ccs},
+                       interim_reply_sent = Irs} = State_1 ->
+                    case Irs of
+                        true ->
+                            active_once(State_1);
+                        false when Ccs == true ->
+                            do_setopts(Socket, [{active, once}], State);
+                        false ->
+                            active_once(State_1)
+                    end,
+                    State_2 = State_1#state{interim_reply_sent = false},
+                    set_inac_timer(State_2),
+                    {noreply, State_2};
                 State_1 ->
                     active_once(State_1),
                     set_inac_timer(State_1),
@@ -338,17 +361,25 @@ accumulate_response(Data, #state{cur_req
         {error, Reason} ->
             {error, {file_write_error, Reason}}
     end;
-accumulate_response(<<>>, State) ->
-    State;
-accumulate_response(Data, #state{reply_buffer = RepBuf,
-                                 rep_buf_size = RepBufSize,
-                                 streamed_size = Streamed_size,
-                                 cur_req = CurReq}=State) ->
-    #request{stream_to=StreamTo, req_id=ReqId,
-             stream_chunk_size = Stream_chunk_size,
-             response_format = Response_format,
-             caller_controls_socket = Caller_controls_socket} = CurReq,
-    RepBuf_1 = list_to_binary([RepBuf, Data]),
+%% accumulate_response(<<>>, #state{cur_req = #request{caller_controls_socket
= Ccs},
+%%                                  socket = Socket} = State) ->
+%%     case Ccs of
+%%         true ->
+%%             do_setopts(Socket, [{active, once}], State);
+%%         false ->
+%%             ok
+%%     end,
+%%     State;
+accumulate_response(Data, #state{reply_buffer      = RepBuf,
+                                 rep_buf_size      = RepBufSize,
+                                 streamed_size     = Streamed_size,
+                                 cur_req           = CurReq}=State) ->
+    #request{stream_to                 = StreamTo,
+             req_id                    = ReqId,
+             stream_chunk_size         = Stream_chunk_size,
+             response_format           = Response_format,
+             caller_controls_socket    = Caller_controls_socket} = CurReq,
+    RepBuf_1 = <<RepBuf/binary, Data/binary>>,
     New_data_size = RepBufSize - Streamed_size,
     case StreamTo of
         undefined ->
@@ -356,15 +387,21 @@ accumulate_response(Data, #state{reply_b
         _ when Caller_controls_socket == true ->
             do_interim_reply(StreamTo, Response_format, ReqId, RepBuf_1),
             State#state{reply_buffer = <<>>, 
+                        interim_reply_sent = true,
                         streamed_size = Streamed_size + size(RepBuf_1)};
         _ when New_data_size >= Stream_chunk_size ->
             {Stream_chunk, Rem_data} = split_binary(RepBuf_1, Stream_chunk_size),
             do_interim_reply(StreamTo, Response_format, ReqId, Stream_chunk),
-            accumulate_response(
-              Rem_data,
-              State#state{
-                reply_buffer = <<>>,
-                streamed_size = Streamed_size + Stream_chunk_size});
+            State_1 = State#state{
+                        reply_buffer = <<>>,
+                        interim_reply_sent = true,
+                        streamed_size = Streamed_size + Stream_chunk_size},
+            case Rem_data of
+                <<>> ->
+                    State_1;
+                _ ->
+                    accumulate_response(Rem_data, State_1)
+            end;
         _ ->
             State#state{reply_buffer = RepBuf_1}
     end.
@@ -498,9 +535,9 @@ do_close(#state{socket = Sock,
                 is_ssl = true,
                 use_proxy = true,
                 proxy_tunnel_setup = Pts
-               }) when Pts /= done ->  gen_tcp:close(Sock);
-do_close(#state{socket = Sock, is_ssl = true})  ->  ssl:close(Sock);
-do_close(#state{socket = Sock, is_ssl = false}) ->  gen_tcp:close(Sock).
+               }) when Pts /= done ->  catch gen_tcp:close(Sock);
+do_close(#state{socket = Sock, is_ssl = true})  ->  catch ssl:close(Sock);
+do_close(#state{socket = Sock, is_ssl = false}) ->  catch gen_tcp:close(Sock).
 
 active_once(#state{cur_req = #request{caller_controls_socket = true}}) ->
     ok;
@@ -542,25 +579,17 @@ send_req_1(From,
         end,
     State_2 = check_ssl_options(Options, State_1),
     do_trace("Connecting...~n", []),
-    Start_ts = now(),
     Conn_timeout = get_value(connect_timeout, Options, Timeout),
     case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of
         {ok, Sock} ->
-            do_trace("Connected!~n", []),
-            End_ts = now(),
-            Timeout_1 = case Timeout of
-                            infinity ->
-                                infinity;
-                            _ ->
-                                Timeout - trunc(round(timer:now_diff(End_ts, Start_ts) /
1000))
-                        end,
+            do_trace("Connected! Socket: ~1000.p~n", [Sock]),
             State_3 = State_2#state{socket = Sock,
                                     connect_timeout = Conn_timeout},
-            send_req_1(From, Url, Headers, Method, Body, Options, Timeout_1, State_3);
+            send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State_3);
         Err ->
             shutting_down(State_2),
             do_trace("Error connecting. Reason: ~1000.p~n", [Err]),
-            gen_server:reply(From, {error, conn_failed}),
+            gen_server:reply(From, {error, {conn_failed, Err}}),
             {stop, normal, State_2}
     end;
 
@@ -580,8 +609,9 @@ send_req_1(From,
                   use_proxy = true,
                   is_ssl    = true} = State) ->
     NewReq = #request{
-      method                 = connect,
-      options                = Options
+      method                    = connect,
+      preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false),
+      options                   = Options
      },
     State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
     Pxy_auth_headers = maybe_modify_headers(Url, Method, Options, [], State_1),
@@ -611,13 +641,13 @@ send_req_1(From,
                 Err ->
                     shutting_down(State_1),
                     do_trace("Send failed... Reason: ~p~n", [Err]),
-                    gen_server:reply(From, {error, send_failed}),
+                    gen_server:reply(From, {error, {send_failed, Err}}),
                     {stop, normal, State_1}
             end;
         Err ->
             shutting_down(State_1),
             do_trace("Send failed... Reason: ~p~n", [Err]),
-            gen_server:reply(From, {error, send_failed}),
+            gen_server:reply(From, {error, {send_failed, Err}}),
             {stop, normal, State_1}
     end;
 
@@ -666,7 +696,9 @@ send_req_1(From,
                       save_response_to_file  = SaveResponseToFile,
                       stream_chunk_size      = get_stream_chunk_size(Options),
                       response_format        = Resp_format,
-                      from                   = From},
+                      from                   = From,
+                      preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options,
false)
+                     },
     State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
     Headers_1 = maybe_modify_headers(Url, Method, Options, Headers, State_1),
     {Req, Body_1} = make_request(Method,
@@ -705,13 +737,13 @@ send_req_1(From,
                 Err ->
                     shutting_down(State_1),
                     do_trace("Send failed... Reason: ~p~n", [Err]),
-                    gen_server:reply(From, {error, send_failed}),
+                    gen_server:reply(From, {error, {send_failed, Err}}),
                     {stop, normal, State_1}
             end;
         Err ->
             shutting_down(State_1),
             do_trace("Send failed... Reason: ~p~n", [Err]),
-            gen_server:reply(From, {error, send_failed}),
+            gen_server:reply(From, {error, {send_failed, Err}}),
             {stop, normal, State_1}
     end.
 
@@ -768,14 +800,14 @@ http_auth_digest(Username, Password) ->
     ibrowse_lib:encode_base64(Username ++ [$: | Password]).
 
 make_request(Method, Headers, AbsPath, RelPath, Body, Options,
-             #state{use_proxy = UseProxy}) ->
+             #state{use_proxy = UseProxy, is_ssl = Is_ssl}) ->
     HttpVsn = http_vsn_string(get_value(http_vsn, Options, {1,1})),
     Headers_1 =
         case get_value(content_length, Headers, false) of
             false when (Body == []) or
-                       (Body == <<>>) or
-                       is_tuple(Body) or
-                       is_function(Body) ->
+            (Body == <<>>) or
+            is_tuple(Body) or
+            is_function(Body) ->
                 Headers;
             false when is_binary(Body) ->
                 [{"content-length", integer_to_list(size(Body))} | Headers];
@@ -799,7 +831,12 @@ make_request(Method, Headers, AbsPath, R
     Headers_3 = cons_headers(Headers_2),
     Uri = case get_value(use_absolute_uri, Options, false) or UseProxy of
               true ->
-                  AbsPath;
+                  case Is_ssl of
+                      true ->
+                          RelPath;
+                      false ->
+                          AbsPath
+                  end;
               false ->
                   RelPath
           end,
@@ -1017,7 +1054,7 @@ upgrade_to_ssl(#state{socket = Socket, 
             send_queued_requests(lists:reverse(Q), State_1);
         Err ->
             do_trace("Upgrade to SSL socket failed. Reson: ~p~n", [Err]),
-            do_error_reply(State, {error, send_failed}),
+            do_error_reply(State, {error, {send_failed, Err}}),
             {error, send_failed}
     end.
 
@@ -1029,12 +1066,12 @@ send_queued_requests([{From, Url, Header
     case send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State) of
         {noreply, State_1} ->
             send_queued_requests(Q, State_1);
-        _ ->
+        Err ->
             do_trace("Error sending queued SSL request: ~n"
                      "URL     : ~s~n"
                      "Method  : ~p~n"
                      "Headers : ~p~n", [Url, Method, Headers]),
-            do_error_reply(State, {error, send_failed}),
+            do_error_reply(State, {error, {send_failed, Err}}),
             {error, send_failed}
     end.
 
@@ -1046,11 +1083,12 @@ is_connection_closing(_, _)             
 %% This clause determines the chunk size when given data from the beginning of the chunk
 parse_11_response(DataRecvd,
                   #state{transfer_encoding = chunked, 
-                         chunk_size = chunk_start,
+                         chunk_size        = chunk_start,
                          chunk_size_buffer = Chunk_sz_buf
                         } = State) ->
     case scan_crlf(Chunk_sz_buf, DataRecvd) of
         {yes, ChunkHeader, Data_1} ->
+            State_1 = maybe_accumulate_ce_data(State, <<ChunkHeader/binary, $\r, $\n>>),
             ChunkSize = parse_chunk_header(ChunkHeader),
             %%
             %% Do we have to preserve the chunk encoding when
@@ -1061,10 +1099,10 @@ parse_11_response(DataRecvd,
             RemLen = size(Data_1),
             do_trace("Determined chunk size: ~p. Already recvd: ~p~n",
                      [ChunkSize, RemLen]),
-            parse_11_response(Data_1, State#state{chunk_size_buffer = <<>>,
-                                                  deleted_crlf = true,
-                                                  recvd_chunk_size = 0,
-                                                  chunk_size = ChunkSize});
+            parse_11_response(Data_1, State_1#state{chunk_size_buffer = <<>>,
+                                                    deleted_crlf = true,
+                                                    recvd_chunk_size = 0,
+                                                    chunk_size = ChunkSize});
         {no, Data_1} ->
             State#state{chunk_size_buffer = Data_1}
     end;
@@ -1074,13 +1112,15 @@ parse_11_response(DataRecvd,
 parse_11_response(DataRecvd,
                   #state{transfer_encoding = chunked, 
                          chunk_size = tbd,
-                         chunk_size_buffer = Buf}=State) ->
+                         chunk_size_buffer = Buf
+                        } = State) ->
     case scan_crlf(Buf, DataRecvd) of
         {yes, _, NextChunk} ->
-            State_1 = State#state{chunk_size = chunk_start,
-                                  chunk_size_buffer = <<>>,
-                                  deleted_crlf = true},
-            parse_11_response(NextChunk, State_1);
+            State_1 = maybe_accumulate_ce_data(State, <<$\r, $\n>>),
+            State_2 = State_1#state{chunk_size = chunk_start,
+                                    chunk_size_buffer = <<>>,
+                                    deleted_crlf = true},
+            parse_11_response(NextChunk, State_2);
         {no, Data_1} ->
             State#state{chunk_size_buffer = Data_1}
     end;
@@ -1090,9 +1130,10 @@ parse_11_response(DataRecvd,
 %% received is silently discarded.
 parse_11_response(DataRecvd,
                   #state{transfer_encoding = chunked, chunk_size = 0, 
-                         cur_req = CurReq,
-                         deleted_crlf = DelCrlf,
-                         chunk_size_buffer = Trailer, reqs = Reqs}=State) ->
+                         cur_req           = CurReq,
+                         deleted_crlf      = DelCrlf,
+                         chunk_size_buffer = Trailer,
+                         reqs              = Reqs} = State) ->
     do_trace("Detected end of chunked transfer...~n", []),
     DataRecvd_1 = case DelCrlf of
                       false ->
@@ -1101,12 +1142,14 @@ parse_11_response(DataRecvd,
                           <<$\r, $\n, DataRecvd/binary>>
                   end,
     case scan_header(Trailer, DataRecvd_1) of
-        {yes, _TEHeaders, Rem} ->
+        {yes, TEHeaders, Rem} ->
             {_, Reqs_1} = queue:out(Reqs),
-            State_1 = handle_response(CurReq, State#state{reqs = Reqs_1}),
-            parse_response(Rem, reset_state(State_1));
+            State_1 = maybe_accumulate_ce_data(State, <<TEHeaders/binary, $\r, $\n>>),
+            State_2 = handle_response(CurReq,
+                                      State_1#state{reqs = Reqs_1}),
+            parse_response(Rem, reset_state(State_2));
         {no, Rem} ->
-            State#state{chunk_size_buffer = Rem, deleted_crlf = false}
+            accumulate_response(<<>>, State#state{chunk_size_buffer = Rem, deleted_crlf
= false})
     end;
 
 %% This clause extracts a chunk, given the size.
@@ -1121,7 +1164,7 @@ parse_11_response(DataRecvd,
     case DataLen >= NeedBytes of
         true ->
             {RemChunk, RemData} = split_binary(DataRecvd, NeedBytes),
-            do_trace("Recvd another chunk...~n", []),
+            do_trace("Recvd another chunk...~p~n", [RemChunk]),
             do_trace("RemData -> ~p~n", [RemData]),
             case accumulate_response(RemChunk, State) of
                 {error, Reason} ->
@@ -1155,6 +1198,11 @@ parse_11_response(DataRecvd,
             accumulate_response(DataRecvd, State#state{rep_buf_size = (RepBufSz+DataLen)})
     end.
 
+maybe_accumulate_ce_data(#state{cur_req = #request{preserve_chunked_encoding = false}} =
State, _) ->
+    State;
+maybe_accumulate_ce_data(State, Data) ->
+    accumulate_response(Data, State).
+
 handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
                          response_format = Resp_format,
                          save_response_to_file = SaveResponseToFile,
@@ -1177,11 +1225,12 @@ handle_response(#request{from=From, stre
                        _ ->
                            {file, TmpFilename}
                    end,
+    {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(RespHeaders, Raw_headers,
Options),
     Reply = case get_value(give_raw_headers, Options, false) of
                 true ->
-                    {ok, Status_line, Raw_headers, ResponseBody};
+                    {ok, Status_line, Raw_headers_1, ResponseBody};
                 false ->
-                    {ok, SCode, RespHeaders, ResponseBody}
+                    {ok, SCode, Resp_headers_1, ResponseBody}
             end,
     State_2 = do_reply(State_1, From, StreamTo, ReqId, Resp_format, Reply),
     cancel_timer(ReqTimer, {eat_message, {req_timedout, From}}),
@@ -1192,16 +1241,17 @@ handle_response(#request{from=From, stre
                 #state{http_status_code = SCode,
                        status_line      = Status_line,
                        raw_headers      = Raw_headers,
-                       recvd_headers    = RespHeaders,
+                       recvd_headers    = Resp_headers,
                        reply_buffer     = RepBuf,
                        send_timer       = ReqTimer} = State) ->
     Body = RepBuf,
 %%    State_1 = set_cur_request(State),
+    {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Resp_headers, Raw_headers,
Options),
     Reply = case get_value(give_raw_headers, Options, false) of
                 true ->
-                    {ok, Status_line, Raw_headers, Body};
+                    {ok, Status_line, Raw_headers_1, Body};
                 false ->
-                    {ok, SCode, RespHeaders, Body}
+                    {ok, SCode, Resp_headers_1, Body}
             end,
     State_1 = case get(conn_close) of
         "close" ->
@@ -1227,7 +1277,8 @@ reset_state(State) ->
                 deleted_crlf      = false,
                 http_status_code  = undefined,
                 chunk_size        = undefined,
-                transfer_encoding = undefined}.
+                transfer_encoding = undefined
+               }.
 
 set_cur_request(#state{reqs = Reqs} = State) ->
     case queue:to_list(Reqs) of
@@ -1459,15 +1510,29 @@ send_async_headers(_ReqId, undefined, _,
     ok;
 send_async_headers(ReqId, StreamTo, Give_raw_headers, 
                    #state{status_line = Status_line, raw_headers = Raw_headers, 
-                          recvd_headers = Headers, http_status_code = StatCode
-                          }) ->
+                          recvd_headers = Headers, http_status_code = StatCode,
+                          cur_req = #request{options = Opts}
+                         }) ->
+    {Headers_1, Raw_headers_1} = maybe_add_custom_headers(Headers, Raw_headers, Opts),
     case Give_raw_headers of
         false ->
-            catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers};
+            catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers_1};
         true ->
-            catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers}
+            catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers_1}
     end.
 
+maybe_add_custom_headers(Headers, Raw_headers, Opts) ->
+    Custom_headers = get_value(add_custom_headers, Opts, []),
+    Headers_1 = Headers ++ Custom_headers,
+    Raw_headers_1 = case Custom_headers of
+                        [_ | _] when is_binary(Raw_headers) ->
+                            Custom_headers_bin = list_to_binary(string:join([[X, $:, Y] ||
{X, Y} <- Custom_headers], "\r\n")),
+                            <<Raw_headers/binary, "\r\n", Custom_headers_bin/binary>>;
+                        _ ->
+                            Raw_headers
+                    end,
+    {Headers_1, Raw_headers_1}.
+
 format_response_data(Resp_format, Body) ->
     case Resp_format of
         list when is_list(Body) ->

Modified: couchdb/trunk/src/ibrowse/ibrowse_lb.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/ibrowse/ibrowse_lb.erl?rev=1000880&r1=1000879&r2=1000880&view=diff
==============================================================================
--- couchdb/trunk/src/ibrowse/ibrowse_lb.erl (original)
+++ couchdb/trunk/src/ibrowse/ibrowse_lb.erl Fri Sep 24 14:18:56 2010
@@ -16,7 +16,8 @@
 %% External exports
 -export([
 	 start_link/1,
-	 spawn_connection/5
+	 spawn_connection/5,
+         stop/1
 	]).
 
 %% gen_server callbacks
@@ -85,6 +86,14 @@ spawn_connection(Lb_pid, Url,
        is_integer(Max_sessions) ->
     gen_server:call(Lb_pid,
 		    {spawn_connection, Url, Max_sessions, Max_pipeline_size, SSL_options}).
+
+stop(Lb_pid) ->
+    case catch gen_server:call(Lb_pid, stop) of
+        {'EXIT', {timeout, _}} ->
+            exit(Lb_pid, kill);
+        ok ->
+            ok
+    end.
 %%--------------------------------------------------------------------
 %% Function: handle_call/3
 %% Description: Handling call messages
@@ -120,6 +129,18 @@ handle_call({spawn_connection, Url, _Max
     ets:insert(Tid, {{1, Pid}, []}),
     {reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1}};
 
+handle_call(stop, _From, #state{ets_tid = undefined} = State) ->
+    gen_server:reply(_From, ok),
+    {stop, normal, State};
+
+handle_call(stop, _From, #state{ets_tid = Tid} = State) ->
+    ets:foldl(fun({{_, Pid}, _}, Acc) ->
+                      ibrowse_http_client:stop(Pid),
+                      Acc
+              end, [], Tid),
+    gen_server:reply(_From, ok),
+    {stop, normal, State};
+
 handle_call(Request, _From, State) ->
     Reply = {unknown_request, Request},
     {reply, Reply, State}.

Modified: couchdb/trunk/src/ibrowse/ibrowse_test.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/ibrowse/ibrowse_test.erl?rev=1000880&r1=1000879&r2=1000880&view=diff
==============================================================================
--- couchdb/trunk/src/ibrowse/ibrowse_test.erl (original)
+++ couchdb/trunk/src/ibrowse/ibrowse_test.erl Fri Sep 24 14:18:56 2010
@@ -17,6 +17,7 @@
 	 ue_test/1,
 	 verify_chunked_streaming/0,
 	 verify_chunked_streaming/1,
+         test_chunked_streaming_once/0,
 	 i_do_async_req_list/4,
 	 test_stream_once/3,
 	 test_stream_once/4
@@ -260,7 +261,20 @@ verify_chunked_streaming(Options) ->
     io:format("Fetching data with streaming as binary...~n", []),
     Async_response_bin = do_async_req_list(
 			   Url, get, [{response_format, binary} | Options]),
-    compare_responses(Result_without_streaming, Async_response_list, Async_response_bin).
+    io:format("Fetching data with streaming as binary, {active, once}...~n", []),
+    Async_response_bin_once = do_async_req_list(
+                                Url, get, [once, {response_format, binary} | Options]),
+    compare_responses(Result_without_streaming, Async_response_list, Async_response_bin),
+    compare_responses(Result_without_streaming, Async_response_list, Async_response_bin_once).
+
+test_chunked_streaming_once() ->
+    test_chunked_streaming_once([]).
+
+test_chunked_streaming_once(Options) ->
+    Url = "http://www.httpwatch.com/httpgallery/chunked/",
+    io:format("URL: ~s~n", [Url]),
+    io:format("Fetching data with streaming as binary, {active, once}...~n", []),
+    do_async_req_list(Url, get, [once, {response_format, binary} | Options]).
 
 compare_responses({ok, St_code, _, Body}, {ok, St_code, _, Body}, {ok, St_code, _, Body})
->
     success;
@@ -313,31 +327,54 @@ wait_for_resp(Pid) ->
 	Msg ->
 	    io:format("Recvd unknown message: ~p~n", [Msg]),
 	    wait_for_resp(Pid)
-    after 10000 ->
+    after 100000 ->
 	  {error, timeout}
     end.
 
 i_do_async_req_list(Parent, Url, Method, Options) ->
-    Res = ibrowse:send_req(Url, [], Method, [], [{stream_to, self()} | Options]),
+    Options_1 = case lists:member(once, Options) of
+                    true ->
+                        [{stream_to, {self(), once}} | (Options -- [once])];
+                    false ->
+                        [{stream_to, self()} | Options]
+                end,
+    Res = ibrowse:send_req(Url, [], Method, [], Options_1),
     case Res of
 	{ibrowse_req_id, Req_id} ->
-	    Result = wait_for_async_resp(Req_id, undefined, undefined, []),
+	    Result = wait_for_async_resp(Req_id, Options, undefined, undefined, []),
 	    Parent ! {async_result, self(), Result};
 	Err ->
 	    Parent ! {async_result, self(), Err}
     end.
 
-wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, Body) ->
+wait_for_async_resp(Req_id, Options, Acc_Stat_code, Acc_Headers, Body) ->    
     receive
 	{ibrowse_async_headers, Req_id, StatCode, Headers} ->
-	    wait_for_async_resp(Req_id, StatCode, Headers, Body);
+            %% io:format("Recvd headers...~n", []),
+            maybe_stream_next(Req_id, Options),
+	    wait_for_async_resp(Req_id, Options, StatCode, Headers, Body);
 	{ibrowse_async_response_end, Req_id} ->
+            io:format("Recvd end of response.~n", []),
 	    Body_1 = list_to_binary(lists:reverse(Body)),
 	    {ok, Acc_Stat_code, Acc_Headers, Body_1};
 	{ibrowse_async_response, Req_id, Data} ->
-	    wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, [Data | Body]);
+            maybe_stream_next(Req_id, Options),
+            %% io:format("Recvd data...~n", []),
+	    wait_for_async_resp(Req_id, Options, Acc_Stat_code, Acc_Headers, [Data | Body]);
+	{ibrowse_async_response, Req_id, {error, _} = Err} ->
+            {ok, Acc_Stat_code, Acc_Headers, Err};
 	Err ->
 	    {ok, Acc_Stat_code, Acc_Headers, Err}
+    after 10000 ->
+            {timeout, Acc_Stat_code, Acc_Headers, Body}
+    end.
+
+maybe_stream_next(Req_id, Options) ->
+    case lists:member(once, Options) of
+        true ->
+            ibrowse:stream_next(Req_id);
+        false ->
+            ok
     end.
 
 execute_req(Url, Method, Options) ->



Mime
View raw message