Return-Path: Delivered-To: apmail-incubator-couchdb-commits-archive@locus.apache.org Received: (qmail 35020 invoked from network); 15 May 2008 21:51:53 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 15 May 2008 21:51:53 -0000 Received: (qmail 55182 invoked by uid 500); 15 May 2008 21:51:55 -0000 Delivered-To: apmail-incubator-couchdb-commits-archive@incubator.apache.org Received: (qmail 55136 invoked by uid 500); 15 May 2008 21:51:55 -0000 Mailing-List: contact couchdb-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: couchdb-dev@incubator.apache.org Delivered-To: mailing list couchdb-commits@incubator.apache.org Received: (qmail 55098 invoked by uid 99); 15 May 2008 21:51:55 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 May 2008 14:51:55 -0700 X-ASF-Spam-Status: No, hits=-1998.4 required=10.0 tests=ALL_TRUSTED,FRT_OFFER2 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 May 2008 21:51:08 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 3F0A223889F3; Thu, 15 May 2008 14:51:23 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r656861 - in /incubator/couchdb/trunk: share/server/ share/www/script/ src/couchdb/ Date: Thu, 15 May 2008 21:51:22 -0000 To: couchdb-commits@incubator.apache.org From: damien@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080515215123.3F0A223889F3@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: damien Date: Thu May 15 14:51:22 2008 New Revision: 656861 URL: http://svn.apache.org/viewvc?rev=656861&view=rev Log: Incremental reduce first checkin. Warning! Disk format change. Modified: incubator/couchdb/trunk/share/server/main.js incubator/couchdb/trunk/share/www/script/couch.js incubator/couchdb/trunk/share/www/script/couch_tests.js incubator/couchdb/trunk/src/couchdb/couch_btree.erl incubator/couchdb/trunk/src/couchdb/couch_db.erl incubator/couchdb/trunk/src/couchdb/couch_db.hrl incubator/couchdb/trunk/src/couchdb/couch_httpd.erl incubator/couchdb/trunk/src/couchdb/couch_key_tree.erl incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl incubator/couchdb/trunk/src/couchdb/couch_util.erl incubator/couchdb/trunk/src/couchdb/couch_view.erl Modified: incubator/couchdb/trunk/share/server/main.js URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/share/server/main.js?rev=656861&r1=656860&r2=656861&view=diff ============================================================================== --- incubator/couchdb/trunk/share/server/main.js [utf-8] (original) +++ incubator/couchdb/trunk/share/server/main.js [utf-8] Thu May 15 14:51:22 2008 @@ -11,21 +11,30 @@ // the License. var cmd; -var map_funs = []; // The map functions to compute against documents -var map_results = []; +var funs = []; // holds functions used for computation +var map_results = []; // holds temporary emitted values during doc map -try { - var sandbox = evalcx(''); - sandbox.map = function(key, value) { +var sandbox = null; + +map = function(key, value) { map_results.push([key, value]); } -} catch (e) { - // fallback for older versions of spidermonkey that don't have evalcx - var sandbox = null; - map = function(key, value) { - map_results.push([key, value]); + +sum = function(values) { + var values_sum=0; + for(var i in values) { + values_sum += values[i]; + } + return values_sum; } -} + + +try { + // if possible, use evalcx (not always available) + sandbox = evalcx(''); + sandbox.map = map; + sandbox.sum = sum; +} catch (e) {} // Commands are in the form of json arrays: // ["commandname",..optional args...]\n @@ -33,83 +42,132 @@ // Responses are json values followed by a new line ("\n") while (cmd = eval(readline())) { - switch (cmd[0]) { - case "reset": - // clear the map_functions and run gc - map_funs = []; - gc(); - print("true"); // indicates success - break; - case "add_fun": - // The second arg is a string that will compile to a function. - // and then we add it to map_functions array - try { - var functionObject = sandbox ? evalcx(cmd[1], sandbox) : eval(cmd[1]); - } catch (err) { - print(toJSON({error: {id: "map_compilation_error", - reason: err.toString() + " (" + toJSON(cmd[1]) + ")"}})); + try { + switch (cmd[0]) { + case "reset": + // clear the globals and run gc + funs = []; + gc(); + print("true"); // indicates success break; - } - if (typeof(functionObject) == "function") { - map_funs.push(functionObject); - print("true"); - } else { - print(toJSON({error: "map_compilation_error", - reason: "expression does not eval to a function. (" + cmd[1] + ")"})); - } - break; - case "map_doc": - // The second arg is a document. We compute all the map functions against - // it. - // - // Each function can output multiple keys value, pairs for each document - // - // Example output of map_doc after three functions set by add_fun cmds: - // [ - // [["Key","Value"]], <- fun 1 returned 1 key value - // [], <- fun 2 returned 0 key values - // [["Key1","Value1"],["Key2","Value2"]] <- fun 3 returned 2 key values - // ] - // - var doc = cmd[1]; - seal(doc); // seal to prevent map functions from changing doc - var buf = []; - for (var i = 0; i < map_funs.length; i++) { - map_results = []; - try { - map_funs[i](doc); - buf.push(map_results.filter(function(pair) { - return pair[0] !== undefined && pair[1] !== undefined; - })); - } catch (err) { - if (err == "fatal_error") { - // Only if it's a "fatal_error" do we exit. What's a fatal error? - // That's for the query to decide. - // - // This will make it possible for queries to completely error out, - // by catching their own local exception and rethrowing a - // fatal_error. But by default if they don't do error handling we - // just eat the exception and carry on. - print(toJSON({error: "map_runtime_error", - reason: "function raised fatal exception"})); - quit(); + case "add_fun": + // The second arg is a string that will compile to a function. + // and then we add it to funs array + funs.push(safe_compile_function(cmd[1])); + print("true"); + break; + case "map_doc": + // The second arg is a document. We compute all the map functions against + // it. + // + // Each function can output multiple keys value, pairs for each document + // + // Example output of map_doc after three functions set by add_fun cmds: + // [ + // [["Key","Value"]], <- fun 1 returned 1 key value + // [], <- fun 2 returned 0 key values + // [["Key1","Value1"],["Key2","Value2"]] <- fun 3 returned 2 key values + // ] + // + var doc = cmd[1]; + seal(doc); // seal to prevent map functions from changing doc + var buf = []; + for (var i = 0; i < funs.length; i++) { + map_results = []; + try { + funs[i](doc); + buf.push(map_results.filter(function(pair) { + return pair[0] !== undefined && pair[1] !== undefined; + })); + } catch (err) { + if (err == "fatal_error") { + // Only if it's a "fatal_error" do we exit. What's a fatal error? + // That's for the query to decide. + // + // This will make it possible for queries to completely error out, + // by catching their own local exception and rethrowing a + // fatal_error. But by default if they don't do error handling we + // just eat the exception and carry on. + throw {error: "map_runtime_error", + reason: "function raised fatal exception"}; + } + print(toJSON({log: "function raised exception (" + err + ")"})); + buf.push([]); } - print(toJSON({log: "function raised exception (" + err + ")"})); - buf.push([]); } - } - print(toJSON(buf)); - break; - default: - print(toJSON({error: "query_server_error", - reason: "unknown command '" + cmd[0] + "'"})); - quit(); + print(toJSON(buf)); + break; + + case "combine": + case "reduce": + { + var keys = null; + var values = null; + var reduceFuns = cmd[1]; + var is_combine = false; + if (cmd[0] == "reduce") { + var kvs = cmd[2]; + keys = new Array(kvs.length); + values = new Array(kvs.length); + for (var i = 0; i < kvs.length; i++) { + keys[i] = kvs[i][0]; + values[i] = kvs[i][1]; + } + } else { + values = cmd[2]; + is_combine = true; + } + + for(var i in reduceFuns) { + reduceFuns[i] = safe_compile_function(reduceFuns[i]); + } + + var reductions = new Array(funs.length); + for (var i = 0; i < reduceFuns.length; i++) { + try { + reductions[i] = reduceFuns[i](keys, values, is_combine); + } catch (err) { + if (err == "fatal_error") { + throw {error: "reduce_runtime_error", + reason: "function raised fatal exception"}; + } + print(toJSON({log: "function raised exception (" + err + ")"})); + reductions[i] = null; + } + } + print("[true," + toJSON(reductions) + "]"); + } + break; + + default: + print(toJSON({error: "query_server_error", + reason: "unknown command '" + cmd[0] + "'"})); + quit(); + } + } catch(exception) { + print(toJSON(exception)); + } +} + + +function safe_compile_function(Src) { + try { + var functionObject = sandbox ? evalcx(Src, sandbox) : eval(Src); + } catch (err) { + throw {error: "compilation_error", + reason: err.toString() + " (" + Src + ")"}; + } + if (typeof(functionObject) == "function") { + return functionObject; + } else { + throw {error: "compilation_error", + reason: "expression does not eval to a function. (" + Src + ")"}; } } function toJSON(val) { if (typeof(val) == "undefined") { - throw new TypeError("Cannot encode undefined value as JSON"); + throw {error:"bad_value", reason:"Cannot encode 'undefined' value as JSON"}; } var subs = {'\b': '\\b', '\t': '\\t', '\n': '\\n', '\f': '\\f', '\r': '\\r', '"' : '\\"', '\\': '\\\\'}; Modified: incubator/couchdb/trunk/share/www/script/couch.js URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/share/www/script/couch.js?rev=656861&r1=656860&r2=656861&view=diff ============================================================================== --- incubator/couchdb/trunk/share/www/script/couch.js [utf-8] (original) +++ incubator/couchdb/trunk/share/www/script/couch.js [utf-8] Thu May 15 14:51:22 2008 @@ -99,7 +99,23 @@ mapFun = mapFun.toSource ? mapFun.toSource() : "(" + mapFun.toString() + ")"; var req = request("POST", this.uri + "_temp_view" + encodeOptions(options), { headers: {"Content-Type": "text/javascript"}, - body: mapFun + body: JSON.stringify(mapFun) + }); + var result = JSON.parse(req.responseText); + if (req.status != 200) + throw result; + return result; + } + + // Applies the map function to the contents of database and returns the results. + this.reduce_query = function(mapFun, reduceFun, options) { + if (typeof(mapFun) != "string") + mapFun = mapFun.toSource ? mapFun.toSource() : "(" + mapFun.toString() + ")"; + if (typeof(reduceFun) != "string") + reduceFun = reduceFun.toSource ? reduceFun.toSource() : "(" + reduceFun.toString() + ")"; + var req = request("POST", this.uri + "_temp_view" + encodeOptions(options), { + headers: {"Content-Type": "text/javascript"}, + body: JSON.stringify({map:mapFun, reduce:reduceFun}) }); var result = JSON.parse(req.responseText); if (req.status != 200) Modified: incubator/couchdb/trunk/share/www/script/couch_tests.js URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/share/www/script/couch_tests.js?rev=656861&r1=656860&r2=656861&view=diff ============================================================================== --- incubator/couchdb/trunk/share/www/script/couch_tests.js [utf-8] (original) +++ incubator/couchdb/trunk/share/www/script/couch_tests.js [utf-8] Thu May 15 14:51:22 2008 @@ -91,7 +91,15 @@ // 1 more document should now be in the result. T(results.total_rows == 3); T(db.info().doc_count == 6); + + var reduceFunction = function(keys, values){ + return sum(values); + }; + + result = db.reduce_query(mapFunction, reduceFunction); + T(result.result == 33); + // delete a document T(db.deleteDoc(existingDoc).ok); @@ -219,6 +227,39 @@ T(results.rows[numDocsToCreate-1-i].key==i); } }, + + reduce: function(debug) { + var db = new CouchDB("test_suite_db"); + db.deleteDb(); + db.createDb(); + if (debug) debugger; + var numDocs = 500 + var docs = makeDocs(1,numDocs + 1); + T(db.bulkSave(docs).ok); + var summate = function(N) {return (N+1)*N/2;}; + + var map = function (doc) {map(doc.integer, doc.integer)}; + var reduce = function (keys, values) { return sum(values); }; + var result = db.reduce_query(map, reduce).result; + T(result == summate(numDocs)); + + result = db.reduce_query(map, reduce, {startkey:4,endkey:4}).result; + + T(result == 4); + + result = db.reduce_query(map, reduce, {startkey:4,endkey:5}).result; + + T(result == 9); + + result = db.reduce_query(map, reduce, {startkey:4,endkey:6}).result; + + T(result == 15); + + for(var i=1; i {Key, Value} end, assemble_kv = fun(Key, Value) -> {Key, Value} end, - less = fun(A, B) -> A < B end + less = fun(A, B) -> A < B end, + reduce = nil }). extract(#btree{extract_kv=Extract}, Value) -> @@ -46,7 +47,9 @@ set_options(Bt, [{join, Assemble}|Rest]) -> set_options(Bt#btree{assemble_kv=Assemble}, Rest); set_options(Bt, [{less, Less}|Rest]) -> - set_options(Bt#btree{less=Less}, Rest). + set_options(Bt#btree{less=Less}, Rest); +set_options(Bt, [{reduce, Reduce}|Rest]) -> + set_options(Bt#btree{reduce=Reduce}, Rest). open(State, Fd, Options) -> {ok, set_options(#btree{root=State, fd=Fd}, Options)}. @@ -54,10 +57,36 @@ get_state(#btree{root=Root}) -> Root. -row_count(#btree{root=nil}) -> - 0; -row_count(#btree{root={_RootPointer, Count}}) -> - Count. +final_reduce(#btree{reduce=Reduce}, Val) -> + final_reduce(Reduce, Val); +final_reduce(Reduce, {[], []}) -> + Reduce(reduce, []); +final_reduce(_Bt, {[], [Red]}) -> + Red; +final_reduce(Reduce, {[], Reductions}) -> + Reduce(combine, Reductions); +final_reduce(Reduce, {KVs, Reductions}) -> + Red = Reduce(reduce, KVs), + final_reduce(Reduce, {[], [Red | Reductions]}). + +reduce(Bt, Key1, Key2) -> + {ok, Reds} = partial_reduce(Bt, Key1, Key2), + {ok, final_reduce(Bt, Reds)}. + +partial_reduce(#btree{root=Root}=Bt, Key1, Key2) -> + {KeyStart, KeyEnd} = + case Key1 == nil orelse Key2 == nil orelse less(Bt, Key1, Key2) of + true -> {Key1, Key2}; + false -> {Key2, Key1} + end, + case Root of + nil -> + {ok, {[], []}}; + _ -> + {KVs, Nodes} = collect_node(Bt, Root, KeyStart, KeyEnd), + {ok, {KVs, [Red || {_K,{_P,Red}} <- Nodes]}} + end. + foldl(Bt, Fun, Acc) -> fold(Bt, fwd, Fun, Acc). @@ -73,16 +102,16 @@ % wraps a 2 arity function with the proper 3 arity function convert_fun_arity(Fun) when is_function(Fun, 2) -> - fun(KV, _Offset, AccIn) -> Fun(KV, AccIn) end; + fun(KV, _Reds, AccIn) -> Fun(KV, AccIn) end; convert_fun_arity(Fun) when is_function(Fun, 3) -> Fun. % Already arity 3 fold(Bt, Dir, Fun, Acc) -> - {_ContinueFlag, Acc2} = stream_node(Bt, 0, Bt#btree.root, nil, Dir, convert_fun_arity(Fun), Acc), + {_ContinueFlag, Acc2} = stream_node(Bt, [], Bt#btree.root, nil, Dir, convert_fun_arity(Fun), Acc), {ok, Acc2}. fold(Bt, Key, Dir, Fun, Acc) -> - {_ContinueFlag, Acc2} = stream_node(Bt, 0, Bt#btree.root, Key, Dir, convert_fun_arity(Fun), Acc), + {_ContinueFlag, Acc2} = stream_node(Bt, [], Bt#btree.root, Key, Dir, convert_fun_arity(Fun), Acc), {ok, Acc2}. add(Bt, InsertKeyValues) -> @@ -136,7 +165,7 @@ lookup(_Bt, nil, Keys) -> {ok, [{Key, not_found} || Key <- Keys]}; -lookup(Bt, {Pointer, _Count}, Keys) -> +lookup(Bt, {Pointer, _Reds}, Keys) -> {NodeType, NodeList} = get_node(Bt, Pointer), case NodeType of kp_node -> @@ -229,7 +258,7 @@ nil -> NodeType = kv_node, NodeList = []; - {Pointer, _count} -> + {Pointer, _Reds} -> {NodeType, NodeList} = get_node(Bt, Pointer) end, case NodeType of @@ -249,14 +278,12 @@ {ok, ResultList, QueryOutput2, Bt3} end. - -count(kv_node, NodeList) -> - length(NodeList); -count(kp_node, NodeList) -> - lists:foldl( fun({_Key, {_Pointer, Count}}, AccCount) -> - Count + AccCount - end, - 0, NodeList). +reduce_node(#btree{reduce=nil}, _NodeType, _NodeList) -> + []; +reduce_node(#btree{reduce=R}, kp_node, NodeList) -> + R(combine, [Red || {_K, {_P, Red}} <- NodeList]); +reduce_node(#btree{reduce=R}, kv_node, NodeList) -> + R(reduce, NodeList). get_node(#btree{fd = Fd}, NodePos) -> @@ -267,7 +294,7 @@ % Validating this prevents infinite loops should % a disk corruption occur. [throw({error, disk_corruption}) - || {_Key, {SubNodePos, _Count}} + || {_Key, {SubNodePos, _Reds}} <- NodeList, SubNodePos >= NodePos]; kv_node -> ok @@ -282,7 +309,7 @@ begin {ok, Pointer} = couch_file:append_term(Bt#btree.fd, {NodeType, ANodeList}), {LastKey, _} = lists:last(ANodeList), - {LastKey, {Pointer, count(NodeType, ANodeList)}} + {LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList)}} end || ANodeList <- NodeListList @@ -362,93 +389,172 @@ end end. + +collect_node(Bt, {P, R}, KeyStart, KeyEnd) -> + case get_node(Bt, P) of + {kp_node, NodeList} -> + collect_kp_node(Bt, NodeList, KeyStart, KeyEnd); + {kv_node, KVs} -> + GTEKeyStartKVs = + case KeyStart of + nil -> + KVs; + _ -> + lists:dropwhile( + fun({Key,_}) -> + less(Bt, Key, KeyStart) + end, KVs) + end, + KVs2 = + case KeyEnd of + nil -> + GTEKeyStartKVs; + _ -> + lists:dropwhile( + fun({Key,_}) -> + less(Bt, KeyEnd, Key) + end, lists:reverse(GTEKeyStartKVs)) + end, + case length(KVs2) == length(KVs) of + true -> % got full node, return the already calculated reduction + {[], [{nil, {P, R}}]}; + false -> % otherwise return the keyvalues for later reduction + {KVs2, []} + end + end. + + +collect_kp_node(Bt, NodeList, KeyStart, KeyEnd) -> + Nodes = + case KeyStart of + nil -> + NodeList; + _ -> + lists:dropwhile( + fun({Key,_}) -> + less(Bt, Key, KeyStart) + end, NodeList) + end, + + case KeyEnd of + nil -> + case Nodes of + [] -> + {[], []}; + [{_, StartNodeInfo}|RestNodes] -> + {DownKVs, DownNodes} = collect_node(Bt, StartNodeInfo, KeyStart, KeyEnd), + {DownKVs, DownNodes ++ RestNodes} + end; + _ -> + {GTEKeyEndNodes, LTKeyEndNodes} = lists:splitwith( + fun({Key,_}) -> + not less(Bt, Key, KeyEnd) + end, lists:reverse(Nodes)), + + {MatchingKVs, MatchingNodes} = + case lists:reverse(LTKeyEndNodes) of + [{_, StartNodeInfo}] -> + collect_node(Bt, StartNodeInfo, KeyStart, KeyEnd); + [{_, StartNodeInfo}|RestLTNodes] -> + % optimization, since we have more KP nodes in range, we don't need + % to provide the endkey when searching the start node, making + % collecting the node faster. + {DownKVs, DownNodes} = collect_node(Bt, StartNodeInfo, KeyStart, nil), + {DownKVs, DownNodes ++ RestLTNodes}; + [] -> + {[], []} + end, + + case lists:reverse(GTEKeyEndNodes) of + [{_, EndNodeInfo} | _] when LTKeyEndNodes == [] -> + collect_node(Bt, EndNodeInfo, KeyStart, KeyEnd); + [{_, EndNodeInfo} | _] -> + {KVs1, DownNodes1} = collect_node(Bt, EndNodeInfo, nil, KeyEnd), + {KVs1 ++ MatchingKVs, DownNodes1 ++ MatchingNodes}; + [] -> + {MatchingKVs, MatchingNodes} + end + end. + + adjust_dir(fwd, List) -> List; adjust_dir(rev, List) -> lists:reverse(List). -stream_node(Bt, Offset, PointerInfo, nil, Dir, Fun, Acc) -> - stream_node(Bt, Offset, PointerInfo, Dir, Fun, Acc); -stream_node(_Bt, _Offset, nil, _StartKey, _Dir, _Fun, Acc) -> +stream_node(Bt, Reds, PointerInfo, nil, Dir, Fun, Acc) -> + stream_node(Bt, Reds, PointerInfo, Dir, Fun, Acc); +stream_node(_Bt, _Reds, nil, _StartKey, _Dir, _Fun, Acc) -> {ok, Acc}; -stream_node(Bt, Offset, {Pointer, _Count}, StartKey, Dir, Fun, Acc) -> +stream_node(Bt, Reds, {Pointer, _Reds}, StartKey, Dir, Fun, Acc) -> {NodeType, NodeList} = get_node(Bt, Pointer), case NodeType of kp_node -> - stream_kp_node(Bt, Offset, adjust_dir(Dir, NodeList), StartKey, Dir, Fun, Acc); + stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, Dir, Fun, Acc); kv_node -> - stream_kv_node(Bt, Offset, adjust_dir(Dir, NodeList), StartKey, Dir, Fun, Acc) + stream_kv_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, Dir, Fun, Acc) end. -stream_node(_Bt, _Offset, nil, _Dir, _Fun, Acc) -> +stream_node(_Bt, _Reds, nil, _Dir, _Fun, Acc) -> {ok, Acc}; -stream_node(Bt, Offset, {Pointer, _Count}, Dir, Fun, Acc) -> +stream_node(Bt, Reds, {Pointer, _Reds}, Dir, Fun, Acc) -> {NodeType, NodeList} = get_node(Bt, Pointer), case NodeType of kp_node -> - stream_kp_node(Bt, Offset, adjust_dir(Dir, NodeList), Dir, Fun, Acc); + stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), Dir, Fun, Acc); kv_node -> - stream_kv_node(Bt, Offset, adjust_dir(Dir, NodeList), Dir, Fun, Acc) + stream_kv_node2(Bt, Reds, [], adjust_dir(Dir, NodeList), Dir, Fun, Acc) end. -stream_kp_node(_Bt, _Offset, [], _Dir, _Fun, Acc) -> +stream_kp_node(_Bt, _Reds, [], _Dir, _Fun, Acc) -> {ok, Acc}; -stream_kp_node(Bt, Offset, [{_Key, {Pointer, Count}} | Rest], Dir, Fun, Acc) -> - case stream_node(Bt, Offset, {Pointer, Count}, Dir, Fun, Acc) of +stream_kp_node(Bt, Reds, [{_Key, {Pointer, Red}} | Rest], Dir, Fun, Acc) -> + case stream_node(Bt, Reds, {Pointer, Red}, Dir, Fun, Acc) of {ok, Acc2} -> - stream_kp_node(Bt, Offset + Count, Rest, Dir, Fun, Acc2); + stream_kp_node(Bt, [Red | Reds], Rest, Dir, Fun, Acc2); {stop, Acc2} -> {stop, Acc2} end. -drop_nodes(_Bt, Offset, _StartKey, []) -> - {Offset, []}; -drop_nodes(Bt, Offset, StartKey, [{NodeKey, {Pointer, Count}} | RestKPs]) -> +drop_nodes(_Bt, Reds, _StartKey, []) -> + {Reds, []}; +drop_nodes(Bt, Reds, StartKey, [{NodeKey, {Pointer, Red}} | RestKPs]) -> case less(Bt, NodeKey, StartKey) of - true -> drop_nodes(Bt, Offset + Count, StartKey, RestKPs); - false -> {Offset, [{NodeKey, {Pointer, Count}} | RestKPs]} + true -> drop_nodes(Bt, [Red | Reds], StartKey, RestKPs); + false -> {Reds, [{NodeKey, {Pointer, Reds}} | RestKPs]} end. -stream_kp_node(Bt, Offset, KPs, StartKey, Dir, Fun, Acc) -> - {NewOffset, NodesToStream} = +stream_kp_node(Bt, Reds, KPs, StartKey, Dir, Fun, Acc) -> + {NewReds, NodesToStream} = case Dir of fwd -> % drop all nodes sorting before the key - drop_nodes(Bt, Offset, StartKey, KPs); + drop_nodes(Bt, Reds, StartKey, KPs); rev -> % keep all nodes sorting before the key, AND the first node to sort after RevKPs = lists:reverse(KPs), case lists:splitwith(fun({Key, _Pointer}) -> less(Bt, Key, StartKey) end, RevKPs) of {_RevBefore, []} -> % everything sorts before it - {Offset, KPs}; + {Reds, KPs}; {RevBefore, [FirstAfter | Drop]} -> - {Offset + count(kp_node, Drop), [FirstAfter | lists:reverse(RevBefore)]} + {[Red || {_K,{_P,Red}} <- Drop] ++ Reds, + [FirstAfter | lists:reverse(RevBefore)]} end end, case NodesToStream of [] -> {ok, Acc}; [{_Key, PointerInfo} | Rest] -> - case stream_node(Bt, NewOffset, PointerInfo, StartKey, Dir, Fun, Acc) of + case stream_node(Bt, NewReds, PointerInfo, StartKey, Dir, Fun, Acc) of {ok, Acc2} -> - stream_kp_node(Bt, NewOffset, Rest, Dir, Fun, Acc2); + stream_kp_node(Bt, NewReds, Rest, Dir, Fun, Acc2); {stop, Acc2} -> {stop, Acc2} end end. -stream_kv_node(_Bt, _Offset, [], _Dir, _Fun, Acc) -> - {ok, Acc}; -stream_kv_node(Bt, Offset, [{K, V} | RestKVs], Dir, Fun, Acc) -> - case Fun(assemble(Bt, K, V), Offset, Acc) of - {ok, Acc2} -> - stream_kv_node(Bt, Offset + 1, RestKVs, Dir, Fun, Acc2); - {stop, Acc2} -> - {stop, Acc2} - end. - -stream_kv_node(Bt, Offset, KVs, StartKey, Dir, Fun, Acc) -> +stream_kv_node(Bt, Reds, KVs, StartKey, Dir, Fun, Acc) -> DropFun = case Dir of fwd -> @@ -456,11 +562,36 @@ rev -> fun({Key, _}) -> less(Bt, StartKey, Key) end end, - % drop all nodes preceding the key - GTEKVs = lists:dropwhile(DropFun, KVs), - LenSkipped = length(KVs) - length(GTEKVs), - stream_kv_node(Bt, Offset + LenSkipped, GTEKVs, Dir, Fun, Acc). + {LTKVs, GTEKVs} = lists:splitwith(DropFun, KVs), + stream_kv_node2(Bt, Reds, LTKVs, GTEKVs, Dir, Fun, Acc). +stream_kv_node2(_Bt, _Reds, _PrevKVs, [], _Dir, _Fun, Acc) -> + {ok, Acc}; +stream_kv_node2(Bt, Reds, PrevKVs, [{K,V} | RestKVs], Dir, Fun, Acc) -> + case Fun(assemble(Bt, K, V), {PrevKVs, Reds}, Acc) of + {ok, Acc2} -> + stream_kv_node2(Bt, Reds, [{K,V} | PrevKVs], RestKVs, Dir, Fun, Acc2); + {stop, Acc2} -> + {stop, Acc2} + end. + +shuffle(List) -> +%% Determine the log n portion then randomize the list. + randomize(round(math:log(length(List)) + 0.5), List). + +randomize(1, List) -> + randomize(List); +randomize(T, List) -> + lists:foldl(fun(_E, Acc) -> + randomize(Acc) + end, randomize(List), lists:seq(1, (T - 1))). + +randomize(List) -> + D = lists:map(fun(A) -> + {random:uniform(), A} + end, List), + {_, D1} = lists:unzip(lists:keysort(1, D)), + D1. @@ -468,20 +599,58 @@ test(1000). test(N) -> - KeyValues = [{random:uniform(), random:uniform()} || _Seq <- lists:seq(1, N)], - test_btree(KeyValues), % randomly distributed - Sorted = lists:sort(KeyValues), + Sorted = [{Seq, random:uniform()} || Seq <- lists:seq(1, N)], test_btree(Sorted), % sorted regular - test_btree(lists:reverse(Sorted)). % sorted reverse + test_btree(lists:reverse(Sorted)), % sorted reverse + test_btree(shuffle(Sorted)). % randomly distributed test_btree(KeyValues) -> {ok, Fd} = couch_file:open("foo", [create,overwrite]), {ok, Btree} = open(nil, Fd), + ReduceFun = + fun(reduce, KVs) -> + length(KVs); + (combine, Reds) -> + lists:sum(Reds) + end, + Btree1 = set_options(Btree, [{reduce, ReduceFun}]), % first dump in all the values in one go - {ok, Btree10} = add_remove(Btree, KeyValues, []), + {ok, Btree10} = add_remove(Btree1, KeyValues, []), + + Len = length(KeyValues), + + {ok, Len} = reduce(Btree10, nil, nil), + + % Count of all from start to Val1 + Val1 = Len div 3, + {ok, Val1} = reduce(Btree10, nil, Val1), + % Count of all from Val1 to end + CountVal1ToEnd = Len - Val1 + 1, + {ok, CountVal1ToEnd} = reduce(Btree10, Val1, nil), + + % Count of all from Val1 to Val2 + Val2 = 2*Len div 3, + CountValRange = Val2 - Val1 + 1, + {ok, CountValRange} = reduce(Btree10, Val1, Val2), + + % get the leading reduction as we foldl/r + {ok, true} = foldl(Btree10, Val1, fun(_X, LeadingReds, _Acc) -> + CountToStart = Val1 - 1, + CountToStart = final_reduce(Btree10, LeadingReds), + {stop, true} % change Acc to 'true' + end, + false), + + {ok, true} = foldr(Btree10, Val1, fun(_X, LeadingReds, _Acc) -> + CountToEnd = Len - Val1, + CountToEnd = final_reduce(Btree10, LeadingReds), + {stop, true} % change Acc to 'true' + end, + false), + ok = test_keys(Btree10, KeyValues), % remove everything Modified: incubator/couchdb/trunk/src/couchdb/couch_db.erl URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_db.erl?rev=656861&r1=656860&r2=656861&view=diff ============================================================================== --- incubator/couchdb/trunk/src/couchdb/couch_db.erl (original) +++ incubator/couchdb/trunk/src/couchdb/couch_db.erl Thu May 15 14:51:22 2008 @@ -17,10 +17,18 @@ -export([save_docs/2, save_docs/3, get_db_info/1, update_doc/3, update_docs/2, update_docs/3]). -export([delete_doc/3,open_doc/2,open_doc/3,enum_docs_since/4,enum_docs_since/5]). -export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]). +-export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]). -export([start_update_loop/2]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). -export([start_copy_compact_int/2]). +-export([btree_by_id_split/1, + btree_by_id_join/2, + btree_by_id_reduce/2, + btree_by_seq_split/1, + btree_by_seq_join/2, + btree_by_seq_reduce/2]). + -include("couch_db.hrl"). -record(db_header, @@ -363,6 +371,12 @@ Doc#doc{attachments = NewBins}. +enum_docs_since_reduce_to_count(Reds) -> + couch_btree:final_reduce(fun btree_by_seq_reduce/2, Reds). + +enum_docs_reduce_to_count(Reds) -> + couch_btree:final_reduce(fun btree_by_id_reduce/2, Reds). + enum_docs_since(MainPid, SinceSeq, Direction, InFun, Ctx) -> Db = get_db(MainPid), couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Ctx). @@ -407,22 +421,38 @@ deleted_conflict_revs = DelConflicts, deleted = Deleted}. -btree_by_name_split(#full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}) -> - {Id, {Seq, Tree}}. +btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq, + deleted=Deleted, rev_tree=Tree}) -> + {Id, {Seq, case Deleted of true -> 1; false-> 0 end, Tree}}. -btree_by_name_join(Id, {Seq, Tree}) -> - #full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}. +btree_by_id_join(Id, {Seq, Deleted, Tree}) -> + #full_doc_info{id=Id, update_seq=Seq, deleted=Deleted==1, rev_tree=Tree}. + +btree_by_id_reduce(reduce, FullDocInfos) -> + % count the number of deleted documents + length([1 || #full_doc_info{deleted=false} <- FullDocInfos]); +btree_by_id_reduce(combine, Reds) -> + lists:sum(Reds). + +btree_by_seq_reduce(reduce, DocInfos) -> + % count the number of deleted documents + length(DocInfos); +btree_by_seq_reduce(combine, Reds) -> + lists:sum(Reds). + init_db(DbName, Filepath, Fd, Header) -> {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd), ok = couch_stream:set_min_buffer(SummaryStream, 10000), {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd, - [{split, fun(V) -> btree_by_name_split(V) end}, - {join, fun(K,V) -> btree_by_name_join(K,V) end}] ), + [{split, fun btree_by_id_split/1}, + {join, fun btree_by_id_join/2}, + {reduce, fun btree_by_id_reduce/2}]), {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd, - [{split, fun(V) -> btree_by_seq_split(V) end}, - {join, fun(K,V) -> btree_by_seq_join(K,V) end}] ), + [{split, fun btree_by_seq_split/1}, + {join, fun btree_by_seq_join/2}, + {reduce, fun btree_by_seq_reduce/2}]), {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd), #db{ @@ -437,8 +467,7 @@ doc_count = Header#db_header.doc_count, doc_del_count = Header#db_header.doc_del_count, name = DbName, - filepath=Filepath - }. + filepath=Filepath }. close_db(#db{fd=Fd,summary_stream=Ss}) -> couch_file:close(Fd), @@ -759,7 +788,9 @@ if Deleted -> {DocCount, DelCount + 1}; true -> {DocCount + 1, DelCount} end, - new_index_entries(RestInfos, DocCount2, DelCount2, [FullDocInfo|AccById], [DocInfo|AccBySeq]). + new_index_entries(RestInfos, DocCount2, DelCount2, + [FullDocInfo#full_doc_info{deleted=Deleted}|AccById], + [DocInfo|AccBySeq]). update_docs_int(Db, DocsList, Options) -> #db{ Modified: incubator/couchdb/trunk/src/couchdb/couch_db.hrl URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_db.hrl?rev=656861&r1=656860&r2=656861&view=diff ============================================================================== --- incubator/couchdb/trunk/src/couchdb/couch_db.hrl (original) +++ incubator/couchdb/trunk/src/couchdb/couch_db.hrl Thu May 15 14:51:22 2008 @@ -45,6 +45,7 @@ -record(full_doc_info, {id = "", update_seq = 0, + deleted = false, rev_tree = [] }). Modified: incubator/couchdb/trunk/src/couchdb/couch_httpd.erl URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_httpd.erl?rev=656861&r1=656860&r2=656861&view=diff ============================================================================== --- incubator/couchdb/trunk/src/couchdb/couch_httpd.erl (original) +++ incubator/couchdb/trunk/src/couchdb/couch_httpd.erl Thu May 15 14:51:22 2008 @@ -13,7 +13,7 @@ -module(couch_httpd). -include("couch_db.hrl"). --export([start_link/3, stop/0]). +-export([start_link/3, stop/0, handle_request/2]). -record(doc_query_args, { options = [], @@ -35,7 +35,7 @@ }). start_link(BindAddress, Port, DocumentRoot) -> - Loop = fun (Req) -> handle_request(Req, DocumentRoot) end, + Loop = fun (Req) -> apply(couch_httpd, handle_request, [Req, DocumentRoot]) end, mochiweb_http:start([ {loop, Loop}, {name, ?MODULE}, @@ -47,6 +47,7 @@ mochiweb_http:stop(?MODULE). handle_request(Req, DocumentRoot) -> + % alias HEAD to GET as mochiweb takes care of stripping the body Method = case Req:get(method) of 'HEAD' -> 'GET'; @@ -263,18 +264,19 @@ true -> StartDocId end, - FoldlFun = make_view_fold_fun(Req, QueryArgs), + FoldlFun = make_view_fold_fun(Req, QueryArgs, TotalRowCount, + fun couch_db:enum_docs_reduce_to_count/1), AdapterFun = fun(#full_doc_info{id=Id}=FullDocInfo, Offset, Acc) -> case couch_doc:to_doc_info(FullDocInfo) of #doc_info{deleted=false, rev=Rev} -> - FoldlFun(Id, Id, {obj, [{rev, Rev}]}, Offset, TotalRowCount, Acc); + FoldlFun({{Id, Id}, {obj, [{rev, Rev}]}}, Offset, Acc); #doc_info{deleted=true} -> {ok, Acc} end end, {ok, FoldResult} = couch_db:enum_docs(Db, StartId, Dir, AdapterFun, {Count, SkipCount, undefined, []}), - finish_view_fold(Req, {ok, TotalRowCount, FoldResult}); + finish_view_fold(Req, TotalRowCount, {ok, FoldResult}); handle_db_request(_Req, _Method, {_DbName, _Db, ["_all_docs"]}) -> throw({method_not_allowed, "GET,HEAD"}); @@ -290,7 +292,8 @@ {ok, Info} = couch_db:get_db_info(Db), TotalRowCount = proplists:get_value(doc_count, Info), - FoldlFun = make_view_fold_fun(Req, QueryArgs), + FoldlFun = make_view_fold_fun(Req, QueryArgs, TotalRowCount, + fun couch_db:enum_docs_since_reduce_to_count/1), StartKey2 = case StartKey of nil -> 0; <<>> -> 100000000000; @@ -321,9 +324,9 @@ false -> [] end }, - FoldlFun(Id, UpdateSeq, Json, Offset, TotalRowCount, Acc) + FoldlFun({{UpdateSeq, Id}, Json}, Offset, Acc) end, {Count, SkipCount, undefined, []}), - finish_view_fold(Req, {ok, TotalRowCount, FoldResult}); + finish_view_fold(Req, TotalRowCount, {ok, FoldResult}); handle_db_request(_Req, _Method, {_DbName, _Db, ["_all_docs_by_seq"]}) -> throw({method_not_allowed, "GET,HEAD"}); @@ -331,17 +334,31 @@ handle_db_request(Req, 'GET', {DbName, _Db, ["_view", DocId, ViewName]}) -> #view_query_args{ start_key = StartKey, + end_key = EndKey, count = Count, skip = SkipCount, direction = Dir, - start_docid = StartDocId - } = QueryArgs = parse_view_query(Req), - View = {DbName, "_design/" ++ DocId, ViewName}, - Start = {StartKey, StartDocId}, - FoldlFun = make_view_fold_fun(Req, QueryArgs), - FoldAccInit = {Count, SkipCount, undefined, []}, - FoldResult = couch_view:fold(View, Start, Dir, FoldlFun, FoldAccInit), - finish_view_fold(Req, FoldResult); + start_docid = StartDocId, + end_docid = EndDocId + } = QueryArgs = parse_view_query(Req), + case couch_view:get_map_view({DbName, "_design/" ++ DocId, ViewName}) of + {ok, View} -> + {ok, RowCount} = couch_view:get_row_count(View), + Start = {StartKey, StartDocId}, + FoldlFun = make_view_fold_fun(Req, QueryArgs, RowCount, + fun couch_view:reduce_to_count/1), + FoldAccInit = {Count, SkipCount, undefined, []}, + FoldResult = couch_view:fold(View, Start, Dir, FoldlFun, FoldAccInit), + finish_view_fold(Req, RowCount, FoldResult); + {not_found, Reason} -> + case couch_view:get_reduce_view({DbName, "_design/" ++ DocId, ViewName}) of + {ok, View} -> + {ok, Value} = couch_view:reduce(View, {StartKey, StartDocId}, {EndKey, EndDocId}), + send_json(Req, {obj, [{ok,true}, {result, Value}]}); + _ -> + throw({not_found, Reason}) + end + end; handle_db_request(_Req, _Method, {_DbName, _Db, ["_view", _DocId, _ViewName]}) -> throw({method_not_allowed, "GET,HEAD"}); @@ -358,10 +375,12 @@ handle_db_request(Req, 'POST', {DbName, _Db, ["_temp_view"]}) -> #view_query_args{ start_key = StartKey, + end_key = EndKey, count = Count, skip = SkipCount, direction = Dir, - start_docid = StartDocId + start_docid = StartDocId, + end_docid = EndDocId } = QueryArgs = parse_view_query(Req), ContentType = case Req:get_primary_header_value("content-type") of @@ -370,13 +389,25 @@ Else -> Else end, - - View = {temp, DbName, ContentType, Req:recv_body()}, - Start = {StartKey, StartDocId}, - FoldlFun = make_view_fold_fun(Req, QueryArgs), - FoldAccInit = {Count, SkipCount, undefined, []}, - FoldResult = couch_view:fold(View, Start, Dir, FoldlFun, FoldAccInit), - finish_view_fold(Req, FoldResult); + case cjson:decode(Req:recv_body()) of + {obj, Props} -> + MapSrc = proplists:get_value("map",Props), + RedSrc = proplists:get_value("reduce",Props), + {ok, View} = couch_view:get_reduce_view( + {temp, DbName, ContentType, MapSrc, RedSrc}), + {ok, Value} = couch_view:reduce(View, {StartKey, StartDocId}, {EndKey, EndDocId}), + send_json(Req, {obj, [{ok,true}, {result, Value}]}); + Src when is_list(Src) -> + + {ok, View} = couch_view:get_map_view({temp, DbName, ContentType, Src}), + Start = {StartKey, StartDocId}, + {ok, TotalRows} = couch_view:get_row_count(View), + FoldlFun = make_view_fold_fun(Req, QueryArgs, TotalRows, + fun couch_view:reduce_to_count/1), + FoldAccInit = {Count, SkipCount, undefined, []}, + FoldResult = couch_view:fold(View, Start, Dir, FoldlFun, FoldAccInit), + finish_view_fold(Req, TotalRows, FoldResult) + end; handle_db_request(_Req, _Method, {_DbName, _Db, ["_temp_view"]}) -> throw({method_not_allowed, "POST"}); @@ -618,7 +649,8 @@ end end, #view_query_args{}, QueryList). -make_view_fold_fun(Req, QueryArgs) -> + +make_view_fold_fun(Req, QueryArgs, TotalViewCount, ReduceCountFun) -> #view_query_args{ end_key = EndKey, end_docid = EndDocId, @@ -626,7 +658,8 @@ count = Count } = QueryArgs, - PassedEndFun = case Dir of + PassedEndFun = + case Dir of fwd -> fun(ViewKey, ViewId) -> couch_view:less_json({EndKey, EndDocId}, {ViewKey, ViewId}) @@ -636,10 +669,11 @@ couch_view:less_json({ViewKey, ViewId}, {EndKey, EndDocId}) end end, - - NegCountFun = fun(Id, Key, Value, Offset, TotalViewCount, + + NegCountFun = fun({{Key, DocId}, Value}, OffsetReds, {AccCount, AccSkip, Resp, AccRevRows}) -> - PassedEnd = PassedEndFun(Key, Id), + Offset = ReduceCountFun(OffsetReds), + PassedEnd = PassedEndFun(Key, DocId), case {PassedEnd, AccCount, AccSkip, Resp} of {true, _, _, _} -> % The stop key has been passed, stop looping. {stop, {AccCount, AccSkip, Resp, AccRevRows}}; @@ -654,17 +688,18 @@ JsonBegin = io_lib:format("{\"total_rows\":~w,\"offset\":~w,\"rows\":[", [TotalViewCount, Offset2]), Resp2:write_chunk(lists:flatten(JsonBegin)), - JsonObj = {obj, [{id, Id}, {key, Key}, {value, Value}]}, + JsonObj = {obj, [{id, DocId}, {key, Key}, {value, Value}]}, {ok, {AccCount + 1, 0, Resp2, [cjson:encode(JsonObj) | AccRevRows]}}; {_, AccCount, _, Resp} -> - JsonObj = {obj, [{id, Id}, {key, Key}, {value, Value}]}, + JsonObj = {obj, [{id, DocId}, {key, Key}, {value, Value}]}, {ok, {AccCount + 1, 0, Resp, [cjson:encode(JsonObj), "," | AccRevRows]}} end end, - PosCountFun = fun(Id, Key, Value, Offset, TotalViewCount, + PosCountFun = fun({{Key, DocId}, Value}, OffsetReds, {AccCount, AccSkip, Resp, AccRevRows}) -> - PassedEnd = PassedEndFun(Key, Id), + Offset = ReduceCountFun(OffsetReds), + PassedEnd = PassedEndFun(Key, DocId), case {PassedEnd, AccCount, AccSkip, Resp} of {true, _, _, _} -> % The stop key has been passed, stop looping. @@ -678,11 +713,11 @@ Resp2 = start_json_response(Req, 200), JsonBegin = io_lib:format("{\"total_rows\":~w,\"offset\":~w,\"rows\":[\r\n", [TotalViewCount, Offset]), - JsonObj = {obj, [{id, Id}, {key, Key}, {value, Value}]}, + JsonObj = {obj, [{id, DocId}, {key, Key}, {value, Value}]}, Resp2:write_chunk(lists:flatten(JsonBegin ++ cjson:encode(JsonObj))), {ok, {AccCount - 1, 0, Resp2, AccRevRows}}; {_, AccCount, _, Resp} when (AccCount > 0) -> - JsonObj = {obj, [{"id", Id}, {"key", Key}, {"value", Value}]}, + JsonObj = {obj, [{"id", DocId}, {"key", Key}, {"value", Value}]}, Resp:write_chunk(",\r\n" ++ lists:flatten(cjson:encode(JsonObj))), {ok, {AccCount - 1, 0, Resp, AccRevRows}} end @@ -692,16 +727,16 @@ false -> NegCountFun end. -finish_view_fold(Req, FoldResult) -> +finish_view_fold(Req, TotalRows, FoldResult) -> case FoldResult of - {ok, TotalRows, {_, _, undefined, _}} -> + {ok, {_, _, undefined, _}} -> % nothing found in the view, nothing has been returned % send empty view send_json(Req, 200, {obj, [ {total_rows, TotalRows}, {rows, {}} ]}); - {ok, _TotalRows, {_, _, Resp, AccRevRows}} -> + {ok, {_, _, Resp, AccRevRows}} -> % end the view Resp:write_chunk(lists:flatten(AccRevRows) ++ "\r\n]}"), end_json_response(Resp); Modified: incubator/couchdb/trunk/src/couchdb/couch_key_tree.erl URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_key_tree.erl?rev=656861&r1=656860&r2=656861&view=diff ============================================================================== --- incubator/couchdb/trunk/src/couchdb/couch_key_tree.erl (original) +++ incubator/couchdb/trunk/src/couchdb/couch_key_tree.erl Thu May 15 14:51:22 2008 @@ -18,7 +18,7 @@ -export([map/2, get_all_leafs/1, get_leaf_keys/1, count_leafs/1]). % a key tree looks like this: -% Tree -> [] or [{Key, Value, Tree} | SiblingTree] +% Tree -> [] or [{Key, Value, ChildTree} | SiblingTree] % ChildTree -> Tree % SiblingTree -> [] or [{SiblingKey, Value, Tree} | Tree] % And each Key < SiblingKey Modified: incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl?rev=656861&r1=656860&r2=656861&view=diff ============================================================================== --- incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl (original) +++ incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl Thu May 15 14:51:22 2008 @@ -17,7 +17,7 @@ -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3,stop/0]). -export([start_doc_map/2, map_docs/2, stop_doc_map/1]). - +-export([reduce/3, combine/3]). -export([test/0, test/1]). -include("couch_db.hrl"). @@ -65,32 +65,21 @@ % send command and get a response. prompt(Port, Json) -> writeline(Port, cjson:encode(Json)), - read_json(Port). + case read_json(Port) of + {obj, [{"error", Id}, {"reason", Reason}]} -> + throw({list_to_atom(Id),Reason}); + {obj, [{"reason", Reason}, {"error", Id}]} -> + throw({list_to_atom(Id),Reason}); + Result -> + Result + end. start_doc_map(Lang, Functions) -> - Port = - case gen_server:call(couch_query_servers, {get_port, Lang}) of - {ok, Port0} -> - link(Port0), - Port0; - {empty, Cmd} -> - ?LOG_INFO("Spawning new ~s instance.", [Lang]), - open_port({spawn, Cmd}, [stream, - {line, 1000}, - exit_status, - hide]); - Error -> - throw(Error) - end, - true = prompt(Port, {"reset"}), + Port = get_linked_port(Lang), % send the functions as json strings lists:foreach(fun(FunctionSource) -> - case prompt(Port, {"add_fun", FunctionSource}) of - true -> ok; - {obj, [{"error", Id}, {"reason", Reason}]} -> - throw({Id, Reason}) - end + true = prompt(Port, {"add_fun", FunctionSource}) end, Functions), {ok, {Lang, Port}}. @@ -100,19 +89,13 @@ Results = lists:map( fun(Doc) -> Json = couch_doc:to_json_obj(Doc, []), - case prompt(Port, {"map_doc", Json}) of - {obj, [{"error", Id}, {"reason", Reason}]} -> - throw({list_to_atom(Id),Reason}); - {obj, [{"reason", Reason}, {"error", Id}]} -> - throw({list_to_atom(Id),Reason}); - Results when is_tuple(Results) -> - % the results are a json array of function map yields like this: - % {FunResults1, FunResults2 ...} - % where funresults is are json arrays of key value pairs: - % {{Key1, Value1}, {Key2, Value2}} - % Convert to real lists, execept the key, value pairs - [tuple_to_list(FunResult) || FunResult <- tuple_to_list(Results)] - end + Results = prompt(Port, {"map_doc", Json}), + % the results are a json array of function map yields like this: + % {FunResults1, FunResults2 ...} + % where funresults is are json arrays of key value pairs: + % {{Key1, Value1}, {Key2, Value2}} + % Convert to real lists, execept the key, value pairs + [tuple_to_list(FunResult) || FunResult <- tuple_to_list(Results)] end, Docs), {ok, Results}. @@ -121,10 +104,68 @@ stop_doc_map(nil) -> ok; stop_doc_map({Lang, Port}) -> + return_linked_port(Lang, Port). + +get_linked_port(Lang) -> + case gen_server:call(couch_query_servers, {get_port, Lang}) of + {ok, Port0} -> + link(Port0), + true = prompt(Port0, {"reset"}), + Port0; + {empty, Cmd} -> + ?LOG_INFO("Spawning new ~s instance.", [Lang]), + open_port({spawn, Cmd}, [stream, + {line, 1000}, + exit_status, + hide]); + Error -> + throw(Error) + end. + +return_linked_port(Lang, Port) -> ok = gen_server:call(couch_query_servers, {return_port, {Lang, Port}}), true = unlink(Port), ok. +group_reductions_results([]) -> + []; +group_reductions_results(List) -> + {Heads, Tails} = lists:foldl( + fun([H|T], {HAcc,TAcc}) -> + {[H|HAcc], [T|TAcc]} + end, {[], []}, List), + case Tails of + [[]|_] -> % no tails left + [Heads]; + _ -> + [Heads | group_reductions_results(Tails)] + end. + +combine(_Lang, [], _ReducedValues) -> + {ok, []}; +combine(Lang, RedSrcs, ReducedValues) -> + Port = get_linked_port(Lang), + Grouped = group_reductions_results(ReducedValues), + Results = lists:zipwith( + fun(FunSrc, Values) -> + {true, {Result}} = + prompt(Port, {"combine", {FunSrc}, list_to_tuple(Values)}), + Result + end, RedSrcs, Grouped), + + return_linked_port(Lang, Port), + {ok, Results}. + +reduce(_Lang, [], _KVs) -> + {ok, []}; +reduce(Lang, RedSrcs, KVs) -> + Port = get_linked_port(Lang), + {true, Results} = prompt(Port, + {"reduce", list_to_tuple(RedSrcs), list_to_tuple(KVs)}), + return_linked_port(Lang, Port), + {ok, tuple_to_list(Results)}. + + init(QueryServerList) -> {ok, {QueryServerList, []}}. Modified: incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl?rev=656861&r1=656860&r2=656861&view=diff ============================================================================== --- incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl (original) +++ incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl Thu May 15 14:51:22 2008 @@ -176,7 +176,7 @@ case StartResult of {ok,_} -> % only output when startup was successful - io:format("Find Futon, the management interface, at:~nhttp://~s:~s/_utils/index.html~n~n", [BindAddress, Port]), + %io:format("Find Futon, the management interface, at:~nhttp://~s:~s/_utils/index.html~n~n", [BindAddress, Port]), io:format("Apache CouchDB has started. Time to relax.~n"); _ -> % Since we failed startup, unconditionally dump configuration data to console Modified: incubator/couchdb/trunk/src/couchdb/couch_util.erl URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_util.erl?rev=656861&r1=656860&r2=656861&view=diff ============================================================================== --- incubator/couchdb/trunk/src/couchdb/couch_util.erl (original) +++ incubator/couchdb/trunk/src/couchdb/couch_util.erl Thu May 15 14:51:22 2008 @@ -16,7 +16,7 @@ -export([parse_ini/1,should_flush/0, should_flush/1]). -export([new_uuid/0, rand32/0, implode/2, collate/2, collate/3]). -export([abs_pathname/1,abs_pathname/2, trim/1, ascii_lower/1]). --export([encodeBase64/1, decodeBase64/1]). +-export([encodeBase64/1, decodeBase64/1, to_hex/1]). % arbitrarily chosen amount of memory to use before flushing to disk @@ -30,20 +30,21 @@ {error, already_loaded} -> ok; Error -> exit(Error) end. - - + + + new_uuid() -> - to_hex(binary_to_list(crypto:rand_bytes(16))). - + to_hex(crypto:rand_bytes(16)). + to_hex([]) -> []; +to_hex(Bin) when is_binary(Bin) -> + to_hex(binary_to_list(Bin)); to_hex([H|T]) -> [to_digit(H div 16), to_digit(H rem 16) | to_hex(T)]. -to_digit(N) when N < 10 -> - $0 + N; -to_digit(N) -> - $a + N-10. +to_digit(N) when N < 10 -> $0 + N; +to_digit(N) -> $a + N-10. % returns a random integer Modified: incubator/couchdb/trunk/src/couchdb/couch_view.erl URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_view.erl?rev=656861&r1=656860&r2=656861&view=diff ============================================================================== --- incubator/couchdb/trunk/src/couchdb/couch_view.erl (original) +++ incubator/couchdb/trunk/src/couchdb/couch_view.erl Thu May 15 14:51:22 2008 @@ -15,8 +15,9 @@ -module(couch_view). -behaviour(gen_server). --export([start_link/1,fold/4,fold/5,less_json/2, start_update_loop/3, start_temp_update_loop/4]). +-export([start_link/1,fold/4,fold/5,less_json/2, start_update_loop/3, start_temp_update_loop/5]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,code_change/3]). +-export([get_reduce_view/1, get_map_view/1,get_row_count/1,reduce_to_count/1, reduce/3]). -include("couch_db.hrl"). @@ -26,16 +27,18 @@ name, def_lang, views, - id_btree, - current_seq, + reductions=[], % list of reduction names and id_num of view that contains it. + id_btree=nil, + current_seq=0, query_server=nil }). -record(view, {id_num, - name, - btree, - def + map_names=[], + def, + btree=nil, + reduce_funs=[] }). -record(server, @@ -47,8 +50,8 @@ -get_temp_updater(DbName, Type, Src) -> - {ok, Pid} = gen_server:call(couch_view, {start_temp_updater, DbName, Type, Src}), +get_temp_updater(DbName, Type, MapSrc, RedSrc) -> + {ok, Pid} = gen_server:call(couch_view, {start_temp_updater, DbName, Type, MapSrc, RedSrc}), Pid. get_updater(DbName, GroupId) -> @@ -75,44 +78,135 @@ end end. -fold(ViewInfo, Dir, Fun, Acc) -> - fold(ViewInfo, nil, Dir, Fun, Acc). - -fold({temp, DbName, Type, Src}, StartKey, Dir, Fun, Acc) -> - {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, Src)), - fold_view(View#view.btree, StartKey, Dir, Fun, Acc); -fold({DbName, GroupId, ViewName}, StartKey, Dir, Fun, Acc) -> +get_row_count(#view{btree=Bt}) -> + {ok, Reds} = couch_btree:partial_reduce(Bt, nil, nil), + {ok, reduce_to_count(Reds)}. + +get_reduce_view({temp, DbName, Type, MapSrc, RedSrc}) -> + {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, MapSrc, RedSrc)), + {ok, {temp_reduce, View}}; +get_reduce_view({DbName, GroupId, Name}) -> + {ok, #group{views=Views,def_lang=Lang}} = + get_updated_group(get_updater(DbName, GroupId)), + get_reduce_view0(Name, Lang, Views). + +get_reduce_view0(_Name, _Lang, []) -> + {not_found, missing_named_view}; +get_reduce_view0(Name, Lang, [#view{reduce_funs=RedFuns}=View|Rest]) -> + case get_key_pos(Name, RedFuns, 0) of + 0 -> get_reduce_view0(Name, Lang, Rest); + N -> {ok, {reduce, N, Lang, View}} + end. + +reduce({temp_reduce, #view{btree=Bt}}, Key1, Key2) -> + {ok, {_Count, [Reduction]}} = couch_btree:reduce(Bt, Key1, Key2), + {ok, Reduction}; + +reduce({reduce, NthRed, Lang, #view{btree=Bt, reduce_funs=RedFuns}}, Key1, Key2) -> + {ok, PartialReductions} = couch_btree:partial_reduce(Bt, Key1, Key2), + PreResultPadding = lists:duplicate(NthRed - 1, []), + PostResultPadding = lists:duplicate(length(RedFuns) - NthRed, []), + {_Name, FunSrc} = lists:nth(NthRed,RedFuns), + ReduceFun = + fun(reduce, KVs) -> + {ok, Reduced} = couch_query_servers:reduce(Lang, [FunSrc], KVs), + {0, PreResultPadding ++ Reduced ++ PostResultPadding}; + (combine, Reds) -> + UserReds = [[lists:nth(NthRed, UserRedsList)] || {_, UserRedsList} <- Reds], + {ok, Reduced} = couch_query_servers:combine(Lang, [FunSrc], UserReds), + {0, PreResultPadding ++ Reduced ++ PostResultPadding} + end, + {0, [FinalReduction]} = couch_btree:final_reduce(ReduceFun, PartialReductions), + {ok, FinalReduction}. + +get_key_pos(_Key, [], _N) -> + 0; +get_key_pos(Key, [{Key1,_Value}|_], N) when Key == Key1 -> + N + 1; +get_key_pos(Key, [_|Rest], N) -> + get_key_pos(Key, Rest, N+1). + +get_map_view({temp, DbName, Type, Src}) -> + {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, Src, [])), + {ok, View}; +get_map_view({DbName, GroupId, Name}) -> {ok, #group{views=Views}} = get_updated_group(get_updater(DbName, GroupId)), - Btree = get_view_btree(Views, ViewName), - fold_view(Btree, StartKey, Dir, Fun, Acc). + get_map_view0(Name, Views). + +get_map_view0(_Name, []) -> + {not_found, missing_named_view}; +get_map_view0(Name, [#view{map_names=MapNames}=View|Rest]) -> + case lists:member(Name, MapNames) of + true -> {ok, View}; + false -> get_map_view0(Name, Rest) + end. + +reduce_to_count(Reductions) -> + {Count, _} = + couch_btree:final_reduce( + fun(reduce, KVs) -> + {length(KVs), []}; + (combine, Reds) -> + {lists:sum([Count0 || {Count0, _} <- Reds]), []} + end, Reductions), + Count. + + +design_doc_to_view_group(#doc{id=Id,body={obj, Fields}}) -> + Language = proplists:get_value("language", Fields, "text/javascript"), + {obj, RawViews} = proplists:get_value("views", Fields, {obj, []}), + + % extract the map/reduce views from the json fields and into lists + MapViewsRaw = [{Name, Src, nil} || {Name, Src} <- RawViews, is_list(Src)], + MapReduceViewsRaw = + [{Name, + proplists:get_value("map", MRFuns), + proplists:get_value("reduce", MRFuns)} + || {Name, {obj, MRFuns}} <- RawViews], + + % add the views to a dictionary object, with the map source as the key + DictBySrc = + lists:foldl( + fun({Name, MapSrc, RedSrc}, DictBySrcAcc) -> + View = + case dict:find(MapSrc, DictBySrcAcc) of + {ok, View0} -> View0; + error -> #view{def=MapSrc} % create new view object + end, + View2 = + if RedSrc == nil -> + View#view{map_names=[Name|View#view.map_names]}; + true -> + View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]} + end, + dict:store(MapSrc, View2, DictBySrcAcc) + end, dict:new(), MapViewsRaw ++ MapReduceViewsRaw), + % number the views + {Views, _N} = lists:mapfoldl( + fun({_Src, View}, N) -> + {View#view{id_num=N},N+1} + end, 0, dict:to_list(DictBySrc)), -fold_view(Btree, StartKey, Dir, Fun, Acc) -> - TotalRowCount = couch_btree:row_count(Btree), - WrapperFun = fun({{Key, DocId}, Value}, Offset, WrapperAcc) -> - Fun(DocId, Key, Value, Offset, TotalRowCount, WrapperAcc) - end, - {ok, AccResult} = couch_btree:fold(Btree, StartKey, Dir, WrapperFun, Acc), - {ok, TotalRowCount, AccResult}. + reset_group(#group{name=Id, views=Views, def_lang=Language}). + + +fold(#view{btree=Btree}, Dir, Fun, Acc) -> + {ok, _AccResult} = couch_btree:fold(Btree, Dir, Fun, Acc). -get_view_btree([], _ViewName) -> - throw({not_found, missing_named_view}); -get_view_btree([View | _RestViews], ViewName) when View#view.name == ViewName -> - View#view.btree; -get_view_btree([_View | RestViews], ViewName) -> - get_view_btree(RestViews, ViewName). +fold(#view{btree=Btree}, StartKey, Dir, Fun, Acc) -> + {ok, _AccResult} = couch_btree:fold(Btree, StartKey, Dir, Fun, Acc). init(RootDir) -> - UpdateNotifierFun = + couch_db_update_notifier:start_link( fun({deleted, DbName}) -> gen_server:cast(couch_view, {reset_indexes, DbName}); ({created, DbName}) -> gen_server:cast(couch_view, {reset_indexes, DbName}); (_Else) -> ok - end, - couch_db_update_notifier:start_link(UpdateNotifierFun), + end), ets:new(couch_views_by_db, [bag, private, named_table]), ets:new(couch_views_by_name, [set, protected, named_table]), ets:new(couch_views_by_updater, [set, private, named_table]), @@ -127,8 +221,8 @@ catch ets:delete(couch_views_temp_fd_by_db). -handle_call({start_temp_updater, DbName, Lang, Query}, _From, #server{root_dir=Root}=Server) -> - <> = erlang:md5(Lang ++ Query), +handle_call({start_temp_updater, DbName, Lang, MapSrc, RedSrc}, _From, #server{root_dir=Root}=Server) -> + <> = erlang:md5(Lang ++ [0] ++ MapSrc ++ [0] ++ RedSrc), Name = lists:flatten(io_lib:format("_temp_~.36B",[SigInt])), Pid = case ets:lookup(couch_views_by_name, {DbName, Name}) of @@ -142,7 +236,8 @@ ok end, ?LOG_DEBUG("Spawning new temp update process for db ~s.", [DbName]), - NewPid = spawn_link(couch_view, start_temp_update_loop, [DbName, Fd, Lang, Query]), + NewPid = spawn_link(couch_view, start_temp_update_loop, + [DbName, Fd, Lang, MapSrc, RedSrc]), true = ets:insert(couch_views_temp_fd_by_db, {DbName, Fd, Count + 1}), add_to_ets(NewPid, DbName, Name), NewPid; @@ -219,18 +314,22 @@ {ok, State}. -start_temp_update_loop(DbName, Fd, Lang, Query) -> +start_temp_update_loop(DbName, Fd, Lang, MapSrc, RedSrc) -> NotifyPids = get_notify_pids(1000), case couch_server:open(DbName) of {ok, Db} -> - View = #view{name="_temp", id_num=0, btree=nil, def=Query}, + View = #view{map_names=["_temp"], + id_num=0, + btree=nil, + def=MapSrc, + reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end}, Group = #group{name="_temp", db=Db, views=[View], current_seq=0, def_lang=Lang, id_btree=nil}, - Group2 = disk_group_to_mem(Fd, Group), + Group2 = disk_group_to_mem(Db, Fd, Group), temp_update_loop(Group2, NotifyPids); Else -> exit(Else) @@ -242,24 +341,24 @@ garbage_collect(), temp_update_loop(Group2, get_notify_pids(10000)). + +reset_group(#group{views=Views}=Group) -> + Views2 = [View#view{btree=nil} || View <- Views], + Group#group{db=nil,fd=nil,query_server=nil,current_seq=0, + id_btree=nil,views=Views2}. + start_update_loop(RootDir, DbName, GroupId) -> % wait for a notify request before doing anything. This way, we can just % exit and any exits will be noticed by the callers. start_update_loop(RootDir, DbName, GroupId, get_notify_pids(1000)). start_update_loop(RootDir, DbName, GroupId, NotifyPids) -> - {Db, DefLang, Defs} = - case couch_server:open(DbName) of + {Db, DbGroup} = + case (catch couch_server:open(DbName)) of {ok, Db0} -> - case couch_db:open_doc(Db0, GroupId) of + case (catch couch_db:open_doc(Db0, GroupId)) of {ok, Doc} -> - case couch_doc:get_view_functions(Doc) of - none -> - delete_index_file(RootDir, DbName, GroupId), - exit({not_found, no_views_found}); - {DefLang0, Defs0} -> - {Db0, DefLang0, Defs0} - end; + {Db0, design_doc_to_view_group(Doc)}; Else -> delete_index_file(RootDir, DbName, GroupId), exit(Else) @@ -268,26 +367,48 @@ delete_index_file(RootDir, DbName, GroupId), exit(Else) end, - Group = open_index_file(RootDir, DbName, GroupId, DefLang, Defs), + FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view", + Group = + case couch_file:open(FileName) of + {ok, Fd} -> + case couch_file:read_header(Fd, <<$r, $c, $k, 0>>) of + {ok, ExistingDiskGroup} -> + % validate all the view definitions in the index are correct. + case reset_group(ExistingDiskGroup) == reset_group(DbGroup) of + true -> disk_group_to_mem(Db, Fd, ExistingDiskGroup); + false -> reset_file(Db, Fd, DbName, DbGroup) + end; + _ -> + reset_file(Db, Fd, DbName, DbGroup) + end; + {error, enoent} -> + case couch_file:open(FileName, [create]) of + {ok, Fd} -> reset_file(Db, Fd, DbName, DbGroup); + Error -> throw(Error) + end + end, - try update_loop(Group#group{db=Db}, NotifyPids) of - _ -> ok + update_loop(RootDir, DbName, GroupId, Group, NotifyPids). + +reset_file(Db, Fd, DbName, #group{name=Name} = DiskReadyGroup) -> + ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]), + ok = couch_file:truncate(Fd, 0), + ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, DiskReadyGroup), + disk_group_to_mem(Db, Fd, DiskReadyGroup). + +update_loop(RootDir, DbName, GroupId, #group{fd=Fd}=Group, NotifyPids) -> + try update_group(Group) of + {ok, Group2} -> + ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, mem_group_to_disk(Group2)), + [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids], + garbage_collect(), + update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000)) catch restart -> couch_file:close(Group#group.fd), start_update_loop(RootDir, DbName, GroupId, NotifyPids ++ get_notify_pids()) end. -update_loop(#group{fd=Fd}=Group, NotifyPids) -> - {ok, Group2} = update_group(Group), - ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, mem_group_to_disk(Group2)), - [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids], - garbage_collect(), - update_loop(Group2). - -update_loop(Group) -> - update_loop(Group, get_notify_pids(100000)). - % wait for the first request to come in. get_notify_pids(Wait) -> receive @@ -351,51 +472,29 @@ delete_index_file(RootDir, DbName, GroupId) -> file:delete(RootDir ++ "/." ++ DbName ++ GroupId ++ ".view"). - -open_index_file(RootDir, DbName, GroupId, ViewLang, ViewDefs) -> - FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view", - case couch_file:open(FileName) of - {ok, Fd} -> - case couch_file:read_header(Fd, <<$r, $c, $k, 0>>) of - {ok, #group{views=Views}=Group} -> - % validate all the view definitions in the index are correct. - case same_view_def(Views, ViewDefs) of - true -> disk_group_to_mem(Fd, Group); - false -> reset_header(GroupId, Fd, ViewLang, ViewDefs) - end; - _ -> - reset_header(GroupId, Fd, ViewLang, ViewDefs) - end; - _ -> - case couch_file:open(FileName, [create]) of - {ok, Fd} -> - reset_header(GroupId, Fd, ViewLang, ViewDefs); - Error -> - throw(Error) - end - end. - -same_view_def([], []) -> - true; -same_view_def(DiskViews, ViewDefs) when DiskViews == [] orelse ViewDefs == []-> - false; -same_view_def([#view{name=DiskName,def=DiskDef}|RestViews], [{Name, Def}|RestDefs]) -> - if DiskName == Name andalso DiskDef == Def -> - same_view_def(RestViews, RestDefs); - true -> - false - end. % Given a disk ready group structure, return an initialized, in-memory version. -disk_group_to_mem(Fd, #group{id_btree=IdState,views=Views}=Group) -> +disk_group_to_mem(Db, Fd, #group{id_btree=IdState,def_lang=Lang,views=Views}=Group) -> {ok, IdBtree} = couch_btree:open(IdState, Fd), Views2 = lists:map( - fun(#view{btree=BtreeState}=View) -> - {ok, Btree} = couch_btree:open(BtreeState, Fd, [{less, fun less_json/2}]), + fun(#view{btree=BtreeState,reduce_funs=RedFuns}=View) -> + FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns], + ReduceFun = + fun(reduce, KVs) -> + {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs, KVs), + {length(KVs), Reduced}; + (combine, Reds) -> + Count = lists:sum([Count0 || {Count0, _} <- Reds]), + UserReds = [UserRedsList || {_, UserRedsList} <- Reds], + {ok, Reduced} = couch_query_servers:combine(Lang, FunSrcs, UserReds), + {Count, Reduced} + end, + {ok, Btree} = couch_btree:open(BtreeState, Fd, + [{less, fun less_json/2},{reduce, ReduceFun}]), View#view{btree=Btree} end, Views), - Group#group{fd=Fd, id_btree=IdBtree, views=Views2}. + Group#group{db=Db, fd=Fd, id_btree=IdBtree, views=Views2}. % Given an initialized, in-memory group structure, return a disk ready version. mem_group_to_disk(#group{id_btree=IdBtree,views=Views}=Group) -> @@ -405,23 +504,7 @@ View#view{btree=State} end, Views), - Group#group{fd=nil, id_btree=couch_btree:get_state(IdBtree), views=Views2}. - -reset_header(GroupId, Fd, DefLanguage, NamedViews) -> - couch_file:truncate(Fd, 0), - {Views, _N} = lists:mapfoldl( - fun({Name, Definiton}, N) -> - {#view{name=Name, id_num=N, btree=nil, def=Definiton}, N+1} - end, - 0, NamedViews), - Group = #group{name=GroupId, - fd=Fd, - views=Views, - current_seq=0, - def_lang=DefLanguage, - id_btree=nil}, - ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, Group), - disk_group_to_mem(Fd, Group). + Group#group{db=nil, fd=nil, id_btree=couch_btree:get_state(IdBtree), views=Views2}. @@ -506,17 +589,12 @@ % anything in the definition changed. case couch_db:open_doc(Db, DocInfo) of {ok, Doc} -> - case couch_doc:get_view_functions(Doc) of - none -> - throw(restart); - {DefLang, NewDefs} -> - case Group#group.def_lang == DefLang andalso same_view_def(Group#group.views, NewDefs) of - true -> - % nothing changed, keeping on computing - {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}}; - false -> - throw(restart) - end + case design_doc_to_view_group(Doc) == reset_group(Group) of + true -> + % nothing changed, keeping on computing + {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}}; + false -> + throw(restart) end; {not_found, deleted} -> throw(restart)