On May 15, 2008, at 23:51, damien@apache.org wrote:
> Author: damien
> Date: Thu May 15 14:51:22 2008
> New Revision: 656861
>
> URL: http://svn.apache.org/viewvc?rev=656861&view=rev
> Log:
> Incremental reduce first checkin. Warning! Disk format change.
Woohoo! Nice work :)
On a clean install, I get an error & crash when running the
"basics" test:
jan@macnolia ~/Work/runcouch/conred1> ./bin/couchdb
couch 0.7.3a656947 (LogLevel=debug)
Apache CouchDB is starting.
Apache CouchDB has started. Time to relax.
[debug] [<0.1.0>] Config Info /Users/jan/Work/runcouch/conred1/etc/
couchdb/couch.ini:
CurrentWorkingDir=/Users/jan/Work/runcouch/conred1
DbRootDir=/Users/jan/Work/runcouch/conred1/var/lib/couchdb
BindAddress="127.0.0.1"
Port="5984"
DocumentRoot=/Users/jan/Work/runcouch/conred1/share/couchdb/www
LogFile=/Users/jan/Work/runcouch/conred1/var/log/couchdb/couch.log
UtilDriverDir=/Users/jan/Work/runcouch/conred1/lib/couchdb/erlang/lib/
couch-0.7.3a656947/priv/lib
DbUpdateNotificationProcesses=
FullTextSearchQueryServer=
text/javascript=/Users/jan/Work/runcouch/conred1/bin/couchjs /Users/
jan/Work/runcouch/conred1/share/couchdb/server/main.js
[debug] [<0.55.0>] Version: {1,1}
[debug] [<0.55.0>] Method: 'DELETE'
[debug] [<0.55.0>] Request URI: "/test_suite_db/"
[debug] [<0.55.0>] Headers: [{'Accept',"*/*"},
{'Accept-Encoding',"gzip, deflate"},
{'Accept-Language',"en-us"},
{'Connection',"keep-alive"},
{'Content-Length',"0"},
{'Content-Type',"application/xml"},
{'Host',"localhost:5984"},
{'Referer',"http://localhost:5984/_utils/couch_tests.html"},
{'User-Agent',"Mozilla/5.0 (Macintosh; U; Intel Mac OS X
10_5_2; en-us) AppleWebKit/525.18 (KHTML, like Gecko) Version/3.1.1
Safari/525.18"}]
[info] [<0.55.0>] HTTP Error (code 404): not_found
[info] [<0.55.0>] 127.0.0.1 - - "DELETE /test_suite_db/" 404
[debug] [<0.56.0>] Version: {1,1}
[debug] [<0.56.0>] Method: 'PUT'
[debug] [<0.56.0>] Request URI: "/test_suite_db/"
[debug] [<0.56.0>] Headers: [{'Accept',"*/*"},
{'Accept-Encoding',"gzip, deflate"},
{'Accept-Language',"en-us"},
{'Connection',"keep-alive"},
{'Content-Length',"0"},
{'Content-Type',"application/xml"},
{'Host',"localhost:5984"},
{'Referer',"http://localhost:5984/_utils/couch_tests.html"},
{'User-Agent',"Mozilla/5.0 (Macintosh; U; Intel Mac OS X
10_5_2; en-us) AppleWebKit/525.18 (KHTML, like Gecko) Version/3.1.1
Safari/525.18"}]
[info] [<0.56.0>] 127.0.0.1 - - "PUT /test_suite_db/" 201
[debug] [<0.59.0>] Version: {1,1}
[debug] [<0.59.0>] Method: 'GET'
[debug] [<0.59.0>] Request URI: "/test_suite_db/"
[debug] [<0.59.0>] Headers: [{'Accept',"*/*"},
{'Accept-Encoding',"gzip, deflate"},
{'Accept-Language',"en-us"},
{'Connection',"keep-alive"},
{'Host',"localhost:5984"},
{'Referer',"http://localhost:5984/_utils/couch_tests.html"},
{'User-Agent',"Mozilla/5.0 (Macintosh; U; Intel Mac OS X
10_5_2; en-us) AppleWebKit/525.18 (KHTML, like Gecko) Version/3.1.1
Safari/525.18"}]
[info] [<0.59.0>] 127.0.0.1 - - "GET /test_suite_db/" 200
[debug] [<0.59.0>] Version: {1,1}
[debug] [<0.59.0>] Method: 'PUT'
[debug] [<0.59.0>] Request URI: "/test_suite_db/0"
[debug] [<0.59.0>] Headers: [{'Accept',"*/*"},
{'Accept-Encoding',"gzip, deflate"},
{'Accept-Language',"en-us"},
{'Connection',"keep-alive"},
{'Content-Length',"23"},
{'Content-Type',"application/xml"},
{'Host',"localhost:5984"},
{'Referer',"http://localhost:5984/_utils/couch_tests.html"},
{'User-Agent',"Mozilla/5.0 (Macintosh; U; Intel Mac OS X
10_5_2; en-us) AppleWebKit/525.18 (KHTML, like Gecko) Version/3.1.1
Safari/525.18"}]
[error] [<0.46.0>] {error_report,<0.21.0>,
{<0.46.0>,supervisor_report,
[{supervisor,{local,couch_server_sup}},
{errorContext,child_terminated},
{reason,
{'EXIT',
{function_clause,
[{couch_doc,to_doc_info,
[{full_doc_info,"0",1,false,
[{"2812859129",{false,{4096,10000}},
[]}]}]},
{couch_db,new_index_entries,5},
{couch_db,update_docs_int,3},
{couch_db,update_loop,1}]}}},
{offender,
[{pid,<0.61.0>},
{name,"test_suite_db"},
{mfa,
{couch_db,create,
["test_suite_db",
"/Users/jan/Work/runcouch/conred1/var/lib/couchdb/
test_suite_db.couch",
[]]}},
{restart_type,transient},
{shutdown,infinity},
{child_type,supervisor}]}]}}
{"init terminating in do_boot",shutdown}
Crash dump was written to: erl_crash.dump
init terminating in do_boot (shutdown)
>
>
> Modified:
> incubator/couchdb/trunk/share/server/main.js
> incubator/couchdb/trunk/share/www/script/couch.js
> incubator/couchdb/trunk/share/www/script/couch_tests.js
> incubator/couchdb/trunk/src/couchdb/couch_btree.erl
> incubator/couchdb/trunk/src/couchdb/couch_db.erl
> incubator/couchdb/trunk/src/couchdb/couch_db.hrl
> incubator/couchdb/trunk/src/couchdb/couch_httpd.erl
> incubator/couchdb/trunk/src/couchdb/couch_key_tree.erl
> incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl
> incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl
> incubator/couchdb/trunk/src/couchdb/couch_util.erl
> incubator/couchdb/trunk/src/couchdb/couch_view.erl
>
> Modified: incubator/couchdb/trunk/share/server/main.js
> URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/share/server/main.js?rev=656861&r1=656860&r2=656861&view=diff
> =
> =
> =
> =
> =
> =
> =
> =
> ======================================================================
> --- incubator/couchdb/trunk/share/server/main.js [utf-8] (original)
> +++ incubator/couchdb/trunk/share/server/main.js [utf-8] Thu May 15
> 14:51:22 2008
> @@ -11,21 +11,30 @@
> // the License.
>
> var cmd;
> -var map_funs = []; // The map functions to compute against
> documents
> -var map_results = [];
> +var funs = []; // holds functions used for computation
> +var map_results = []; // holds temporary emitted values during doc
> map
>
> -try {
> - var sandbox = evalcx('');
> - sandbox.map = function(key, value) {
> +var sandbox = null;
> +
> +map = function(key, value) {
> map_results.push([key, value]);
> }
> -} catch (e) {
> - // fallback for older versions of spidermonkey that don't have
> evalcx
> - var sandbox = null;
> - map = function(key, value) {
> - map_results.push([key, value]);
> +
> +sum = function(values) {
> + var values_sum=0;
> + for(var i in values) {
> + values_sum += values[i];
> + }
> + return values_sum;
> }
> -}
> +
> +
> +try {
> + // if possible, use evalcx (not always available)
> + sandbox = evalcx('');
> + sandbox.map = map;
> + sandbox.sum = sum;
> +} catch (e) {}
>
> // Commands are in the form of json arrays:
> // ["commandname",..optional args...]\n
> @@ -33,83 +42,132 @@
> // Responses are json values followed by a new line ("\n")
>
> while (cmd = eval(readline())) {
> - switch (cmd[0]) {
> - case "reset":
> - // clear the map_functions and run gc
> - map_funs = [];
> - gc();
> - print("true"); // indicates success
> - break;
> - case "add_fun":
> - // The second arg is a string that will compile to a function.
> - // and then we add it to map_functions array
> - try {
> - var functionObject = sandbox ? evalcx(cmd[1], sandbox) :
> eval(cmd[1]);
> - } catch (err) {
> - print(toJSON({error: {id: "map_compilation_error",
> - reason: err.toString() + " (" + toJSON(cmd[1]) + ")"}}));
> + try {
> + switch (cmd[0]) {
> + case "reset":
> + // clear the globals and run gc
> + funs = [];
> + gc();
> + print("true"); // indicates success
> break;
> - }
> - if (typeof(functionObject) == "function") {
> - map_funs.push(functionObject);
> - print("true");
> - } else {
> - print(toJSON({error: "map_compilation_error",
> - reason: "expression does not eval to a function. (" +
> cmd[1] + ")"}));
> - }
> - break;
> - case "map_doc":
> - // The second arg is a document. We compute all the map
> functions against
> - // it.
> - //
> - // Each function can output multiple keys value, pairs for
> each document
> - //
> - // Example output of map_doc after three functions set by
> add_fun cmds:
> - // [
> - // [["Key","Value"]], <- fun 1 returned 1
> key value
> - // [], <- fun 2 returned 0
> key values
> - // [["Key1","Value1"],["Key2","Value2"]] <- fun 3 returned 2
> key values
> - // ]
> - //
> - var doc = cmd[1];
> - seal(doc); // seal to prevent map functions from changing doc
> - var buf = [];
> - for (var i = 0; i < map_funs.length; i++) {
> - map_results = [];
> - try {
> - map_funs[i](doc);
> - buf.push(map_results.filter(function(pair) {
> - return pair[0] !== undefined && pair[1] !== undefined;
> - }));
> - } catch (err) {
> - if (err == "fatal_error") {
> - // Only if it's a "fatal_error" do we exit. What's a
> fatal error?
> - // That's for the query to decide.
> - //
> - // This will make it possible for queries to completely
> error out,
> - // by catching their own local exception and rethrowing a
> - // fatal_error. But by default if they don't do error
> handling we
> - // just eat the exception and carry on.
> - print(toJSON({error: "map_runtime_error",
> - reason: "function raised fatal exception"}));
> - quit();
> + case "add_fun":
> + // The second arg is a string that will compile to a
> function.
> + // and then we add it to funs array
> + funs.push(safe_compile_function(cmd[1]));
> + print("true");
> + break;
> + case "map_doc":
> + // The second arg is a document. We compute all the map
> functions against
> + // it.
> + //
> + // Each function can output multiple keys value, pairs for
> each document
> + //
> + // Example output of map_doc after three functions set by
> add_fun cmds:
> + // [
> + // [["Key","Value"]], <- fun 1 returned
> 1 key value
> + // [], <- fun 2 returned
> 0 key values
> + // [["Key1","Value1"],["Key2","Value2"]] <- fun 3 returned
> 2 key values
> + // ]
> + //
> + var doc = cmd[1];
> + seal(doc); // seal to prevent map functions from changing doc
> + var buf = [];
> + for (var i = 0; i < funs.length; i++) {
> + map_results = [];
> + try {
> + funs[i](doc);
> + buf.push(map_results.filter(function(pair) {
> + return pair[0] !== undefined && pair[1] !== undefined;
> + }));
> + } catch (err) {
> + if (err == "fatal_error") {
> + // Only if it's a "fatal_error" do we exit. What's a
> fatal error?
> + // That's for the query to decide.
> + //
> + // This will make it possible for queries to
> completely error out,
> + // by catching their own local exception and
> rethrowing a
> + // fatal_error. But by default if they don't do error
> handling we
> + // just eat the exception and carry on.
> + throw {error: "map_runtime_error",
> + reason: "function raised fatal exception"};
> + }
> + print(toJSON({log: "function raised exception (" + err
> + ")"}));
> + buf.push([]);
> }
> - print(toJSON({log: "function raised exception (" + err +
> ")"}));
> - buf.push([]);
> }
> - }
> - print(toJSON(buf));
> - break;
> - default:
> - print(toJSON({error: "query_server_error",
> - reason: "unknown command '" + cmd[0] + "'"}));
> - quit();
> + print(toJSON(buf));
> + break;
> +
> + case "combine":
> + case "reduce":
> + {
> + var keys = null;
> + var values = null;
> + var reduceFuns = cmd[1];
> + var is_combine = false;
> + if (cmd[0] == "reduce") {
> + var kvs = cmd[2];
> + keys = new Array(kvs.length);
> + values = new Array(kvs.length);
> + for (var i = 0; i < kvs.length; i++) {
> + keys[i] = kvs[i][0];
> + values[i] = kvs[i][1];
> + }
> + } else {
> + values = cmd[2];
> + is_combine = true;
> + }
> +
> + for(var i in reduceFuns) {
> + reduceFuns[i] = safe_compile_function(reduceFuns[i]);
> + }
> +
> + var reductions = new Array(funs.length);
> + for (var i = 0; i < reduceFuns.length; i++) {
> + try {
> + reductions[i] = reduceFuns[i](keys, values, is_combine);
> + } catch (err) {
> + if (err == "fatal_error") {
> + throw {error: "reduce_runtime_error",
> + reason: "function raised fatal exception"};
> + }
> + print(toJSON({log: "function raised exception (" + err
> + ")"}));
> + reductions[i] = null;
> + }
> + }
> + print("[true," + toJSON(reductions) + "]");
> + }
> + break;
> +
> + default:
> + print(toJSON({error: "query_server_error",
> + reason: "unknown command '" + cmd[0] + "'"}));
> + quit();
> + }
> + } catch(exception) {
> + print(toJSON(exception));
> + }
> +}
> +
> +
> +function safe_compile_function(Src) {
> + try {
> + var functionObject = sandbox ? evalcx(Src, sandbox) : eval(Src);
> + } catch (err) {
> + throw {error: "compilation_error",
> + reason: err.toString() + " (" + Src + ")"};
> + }
> + if (typeof(functionObject) == "function") {
> + return functionObject;
> + } else {
> + throw {error: "compilation_error",
> + reason: "expression does not eval to a function. (" + Src +
> ")"};
> }
> }
>
> function toJSON(val) {
> if (typeof(val) == "undefined") {
> - throw new TypeError("Cannot encode undefined value as JSON");
> + throw {error:"bad_value", reason:"Cannot encode 'undefined'
> value as JSON"};
> }
> var subs = {'\b': '\\b', '\t': '\\t', '\n': '\\n', '\f': '\\f',
> '\r': '\\r', '"' : '\\"', '\\': '\\\\'};
>
> Modified: incubator/couchdb/trunk/share/www/script/couch.js
> URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/share/www/script/couch.js?rev=656861&r1=656860&r2=656861&view=diff
> =
> =
> =
> =
> =
> =
> =
> =
> ======================================================================
> --- incubator/couchdb/trunk/share/www/script/couch.js [utf-8]
> (original)
> +++ incubator/couchdb/trunk/share/www/script/couch.js [utf-8] Thu
> May 15 14:51:22 2008
> @@ -99,7 +99,23 @@
> mapFun = mapFun.toSource ? mapFun.toSource() : "(" +
> mapFun.toString() + ")";
> var req = request("POST", this.uri + "_temp_view" +
> encodeOptions(options), {
> headers: {"Content-Type": "text/javascript"},
> - body: mapFun
> + body: JSON.stringify(mapFun)
> + });
> + var result = JSON.parse(req.responseText);
> + if (req.status != 200)
> + throw result;
> + return result;
> + }
> +
> + // Applies the map function to the contents of database and
> returns the results.
> + this.reduce_query = function(mapFun, reduceFun, options) {
> + if (typeof(mapFun) != "string")
> + mapFun = mapFun.toSource ? mapFun.toSource() : "(" +
> mapFun.toString() + ")";
> + if (typeof(reduceFun) != "string")
> + reduceFun = reduceFun.toSource ? reduceFun.toSource() : "(" +
> reduceFun.toString() + ")";
> + var req = request("POST", this.uri + "_temp_view" +
> encodeOptions(options), {
> + headers: {"Content-Type": "text/javascript"},
> + body: JSON.stringify({map:mapFun, reduce:reduceFun})
> });
> var result = JSON.parse(req.responseText);
> if (req.status != 200)
>
> Modified: incubator/couchdb/trunk/share/www/script/couch_tests.js
> URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/share/www/script/couch_tests.js?rev=656861&r1=656860&r2=656861&view=diff
> =
> =
> =
> =
> =
> =
> =
> =
> ======================================================================
> --- incubator/couchdb/trunk/share/www/script/couch_tests.js [utf-8]
> (original)
> +++ incubator/couchdb/trunk/share/www/script/couch_tests.js [utf-8]
> Thu May 15 14:51:22 2008
> @@ -91,7 +91,15 @@
> // 1 more document should now be in the result.
> T(results.total_rows == 3);
> T(db.info().doc_count == 6);
> +
> + var reduceFunction = function(keys, values){
> + return sum(values);
> + };
> +
> + result = db.reduce_query(mapFunction, reduceFunction);
>
> + T(result.result == 33);
> +
> // delete a document
> T(db.deleteDoc(existingDoc).ok);
>
> @@ -219,6 +227,39 @@
> T(results.rows[numDocsToCreate-1-i].key==i);
> }
> },
> +
> + reduce: function(debug) {
> + var db = new CouchDB("test_suite_db");
> + db.deleteDb();
> + db.createDb();
> + if (debug) debugger;
> + var numDocs = 500
> + var docs = makeDocs(1,numDocs + 1);
> + T(db.bulkSave(docs).ok);
> + var summate = function(N) {return (N+1)*N/2;};
> +
> + var map = function (doc) {map(doc.integer, doc.integer)};
> + var reduce = function (keys, values) { return sum(values); };
> + var result = db.reduce_query(map, reduce).result;
> + T(result == summate(numDocs));
> +
> + result = db.reduce_query(map, reduce, {startkey:4,endkey:
> 4}).result;
> +
> + T(result == 4);
> +
> + result = db.reduce_query(map, reduce, {startkey:4,endkey:
> 5}).result;
> +
> + T(result == 9);
> +
> + result = db.reduce_query(map, reduce, {startkey:4,endkey:
> 6}).result;
> +
> + T(result == 15);
> +
> + for(var i=1; i<numDocs/2; i+=30) {
> + result = db.reduce_query(map, reduce,
> {startkey:i,endkey:numDocs-i}).result;
> + T(result == summate(numDocs-i) - summate(i-1));
> + }
> + },
>
> multiple_rows: function(debug) {
> var db = new CouchDB("test_suite_db");
> @@ -391,7 +432,7 @@
> db.createDb();
> if (debug) debugger;
>
> - var numDocs = 50;
> + var numDocs = 500;
>
> var designDoc = {
> _id:"_design/test",
> @@ -399,22 +440,46 @@
> views: {
> all_docs: "function(doc) { map(doc.integer, null) }",
> no_docs: "function(doc) {}",
> - single_doc: "function(doc) { if (doc._id == \"1\") { map(1,
> null) }}"
> + single_doc: "function(doc) { if (doc._id == \"1\") { map(1,
> null) }}",
> + summate: {map:"function (doc) {map(doc.integer,
> doc.integer)};",
> + reduce:"function (keys, values) { return
> sum(values); };"}
> }
> }
> T(db.save(designDoc).ok);
>
> - T(db.bulkSave(makeDocs(0, numDocs)).ok);
> + T(db.bulkSave(makeDocs(1, numDocs + 1)).ok);
>
> for (var loop = 0; loop < 2; loop++) {
> var rows = db.view("test/all_docs").rows;
> - for (var i = 0; i < numDocs; i++) {
> - T(rows[i].key == i);
> + for (var i = 1; i <= numDocs; i++) {
> + T(rows[i-1].key == i);
> }
> T(db.view("test/no_docs").total_rows == 0)
> T(db.view("test/single_doc").total_rows == 1)
> restartServer();
> }
> +
> +
> + var summate = function(N) {return (N+1)*N/2;};
> + var result = db.view("test/summate").result;
> + T(result == summate(numDocs));
> +
> + result = db.view("test/summate", {startkey:4,endkey:4}).result;
> +
> + T(result == 4);
> +
> + result = db.view("test/summate", {startkey:4,endkey:5}).result;
> +
> + T(result == 9);
> +
> + result =db.view("test/summate", {startkey:4,endkey:6}).result;
> +
> + T(result == 15);
> +
> + for(var i=1; i<numDocs/2; i+=30) {
> + result = db.view("test/summate", {startkey:i,endkey:numDocs-
> i}).result;
> + T(result == summate(numDocs-i) - summate(i-1));
> + }
>
> T(db.deleteDoc(designDoc).ok);
> T(db.open(designDoc._id) == null);
> @@ -424,8 +489,6 @@
> T(db.open(designDoc._id) == null);
> T(db.view("test/no_docs") == null);
>
> -
> -
> },
>
> view_collation: function(debug) {
>
> Modified: incubator/couchdb/trunk/src/couchdb/couch_btree.erl
> URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_btree.erl?rev=656861&r1=656860&r2=656861&view=diff
> =
> =
> =
> =
> =
> =
> =
> =
> ======================================================================
> --- incubator/couchdb/trunk/src/couchdb/couch_btree.erl (original)
> +++ incubator/couchdb/trunk/src/couchdb/couch_btree.erl Thu May 15
> 14:51:22 2008
> @@ -13,8 +13,8 @@
> -module(couch_btree).
>
> -export([open/2, open/3, query_modify/4, add/2, add_remove/3, foldl/
> 3, foldl/4]).
> --export([foldr/3, foldr/4, fold/4, fold/5, row_count/1]).
> --export([lookup/2, get_state/1, test/1, test/0]).
> +-export([foldr/3, foldr/4, fold/4, fold/5, reduce/3, partial_reduce/
> 3, final_reduce/2]).
> +-export([lookup/2, get_state/1, set_options/2, test/1, test/0]).
>
> -define(CHUNK_THRESHOLD, 16#fff).
>
> @@ -23,7 +23,8 @@
> root,
> extract_kv = fun({Key, Value}) -> {Key, Value} end,
> assemble_kv = fun(Key, Value) -> {Key, Value} end,
> - less = fun(A, B) -> A < B end
> + less = fun(A, B) -> A < B end,
> + reduce = nil
> }).
>
> extract(#btree{extract_kv=Extract}, Value) ->
> @@ -46,7 +47,9 @@
> set_options(Bt, [{join, Assemble}|Rest]) ->
> set_options(Bt#btree{assemble_kv=Assemble}, Rest);
> set_options(Bt, [{less, Less}|Rest]) ->
> - set_options(Bt#btree{less=Less}, Rest).
> + set_options(Bt#btree{less=Less}, Rest);
> +set_options(Bt, [{reduce, Reduce}|Rest]) ->
> + set_options(Bt#btree{reduce=Reduce}, Rest).
>
> open(State, Fd, Options) ->
> {ok, set_options(#btree{root=State, fd=Fd}, Options)}.
> @@ -54,10 +57,36 @@
> get_state(#btree{root=Root}) ->
> Root.
>
> -row_count(#btree{root=nil}) ->
> - 0;
> -row_count(#btree{root={_RootPointer, Count}}) ->
> - Count.
> +final_reduce(#btree{reduce=Reduce}, Val) ->
> + final_reduce(Reduce, Val);
> +final_reduce(Reduce, {[], []}) ->
> + Reduce(reduce, []);
> +final_reduce(_Bt, {[], [Red]}) ->
> + Red;
> +final_reduce(Reduce, {[], Reductions}) ->
> + Reduce(combine, Reductions);
> +final_reduce(Reduce, {KVs, Reductions}) ->
> + Red = Reduce(reduce, KVs),
> + final_reduce(Reduce, {[], [Red | Reductions]}).
> +
> +reduce(Bt, Key1, Key2) ->
> + {ok, Reds} = partial_reduce(Bt, Key1, Key2),
> + {ok, final_reduce(Bt, Reds)}.
> +
> +partial_reduce(#btree{root=Root}=Bt, Key1, Key2) ->
> + {KeyStart, KeyEnd} =
> + case Key1 == nil orelse Key2 == nil orelse less(Bt, Key1, Key2)
> of
> + true -> {Key1, Key2};
> + false -> {Key2, Key1}
> + end,
> + case Root of
> + nil ->
> + {ok, {[], []}};
> + _ ->
> + {KVs, Nodes} = collect_node(Bt, Root, KeyStart, KeyEnd),
> + {ok, {KVs, [Red || {_K,{_P,Red}} <- Nodes]}}
> + end.
> +
>
> foldl(Bt, Fun, Acc) ->
> fold(Bt, fwd, Fun, Acc).
> @@ -73,16 +102,16 @@
>
> % wraps a 2 arity function with the proper 3 arity function
> convert_fun_arity(Fun) when is_function(Fun, 2) ->
> - fun(KV, _Offset, AccIn) -> Fun(KV, AccIn) end;
> + fun(KV, _Reds, AccIn) -> Fun(KV, AccIn) end;
> convert_fun_arity(Fun) when is_function(Fun, 3) ->
> Fun. % Already arity 3
>
> fold(Bt, Dir, Fun, Acc) ->
> - {_ContinueFlag, Acc2} = stream_node(Bt, 0, Bt#btree.root, nil,
> Dir, convert_fun_arity(Fun), Acc),
> + {_ContinueFlag, Acc2} = stream_node(Bt, [], Bt#btree.root, nil,
> Dir, convert_fun_arity(Fun), Acc),
> {ok, Acc2}.
>
> fold(Bt, Key, Dir, Fun, Acc) ->
> - {_ContinueFlag, Acc2} = stream_node(Bt, 0, Bt#btree.root, Key,
> Dir, convert_fun_arity(Fun), Acc),
> + {_ContinueFlag, Acc2} = stream_node(Bt, [], Bt#btree.root, Key,
> Dir, convert_fun_arity(Fun), Acc),
> {ok, Acc2}.
>
> add(Bt, InsertKeyValues) ->
> @@ -136,7 +165,7 @@
>
> lookup(_Bt, nil, Keys) ->
> {ok, [{Key, not_found} || Key <- Keys]};
> -lookup(Bt, {Pointer, _Count}, Keys) ->
> +lookup(Bt, {Pointer, _Reds}, Keys) ->
> {NodeType, NodeList} = get_node(Bt, Pointer),
> case NodeType of
> kp_node ->
> @@ -229,7 +258,7 @@
> nil ->
> NodeType = kv_node,
> NodeList = [];
> - {Pointer, _count} ->
> + {Pointer, _Reds} ->
> {NodeType, NodeList} = get_node(Bt, Pointer)
> end,
> case NodeType of
> @@ -249,14 +278,12 @@
> {ok, ResultList, QueryOutput2, Bt3}
> end.
>
> -
> -count(kv_node, NodeList) ->
> - length(NodeList);
> -count(kp_node, NodeList) ->
> - lists:foldl( fun({_Key, {_Pointer, Count}}, AccCount) ->
> - Count + AccCount
> - end,
> - 0, NodeList).
> +reduce_node(#btree{reduce=nil}, _NodeType, _NodeList) ->
> + [];
> +reduce_node(#btree{reduce=R}, kp_node, NodeList) ->
> + R(combine, [Red || {_K, {_P, Red}} <- NodeList]);
> +reduce_node(#btree{reduce=R}, kv_node, NodeList) ->
> + R(reduce, NodeList).
>
>
> get_node(#btree{fd = Fd}, NodePos) ->
> @@ -267,7 +294,7 @@
> % Validating this prevents infinite loops should
> % a disk corruption occur.
> [throw({error, disk_corruption})
> - || {_Key, {SubNodePos, _Count}}
> + || {_Key, {SubNodePos, _Reds}}
> <- NodeList, SubNodePos >= NodePos];
> kv_node ->
> ok
> @@ -282,7 +309,7 @@
> begin
> {ok, Pointer} = couch_file:append_term(Bt#btree.fd,
> {NodeType, ANodeList}),
> {LastKey, _} = lists:last(ANodeList),
> - {LastKey, {Pointer, count(NodeType, ANodeList)}}
> + {LastKey, {Pointer, reduce_node(Bt, NodeType,
> ANodeList)}}
> end
> ||
> ANodeList <- NodeListList
> @@ -362,93 +389,172 @@
> end
> end.
>
> +
> +collect_node(Bt, {P, R}, KeyStart, KeyEnd) ->
> + case get_node(Bt, P) of
> + {kp_node, NodeList} ->
> + collect_kp_node(Bt, NodeList, KeyStart, KeyEnd);
> + {kv_node, KVs} ->
> + GTEKeyStartKVs =
> + case KeyStart of
> + nil ->
> + KVs;
> + _ ->
> + lists:dropwhile(
> + fun({Key,_}) ->
> + less(Bt, Key, KeyStart)
> + end, KVs)
> + end,
> + KVs2 =
> + case KeyEnd of
> + nil ->
> + GTEKeyStartKVs;
> + _ ->
> + lists:dropwhile(
> + fun({Key,_}) ->
> + less(Bt, KeyEnd, Key)
> + end, lists:reverse(GTEKeyStartKVs))
> + end,
> + case length(KVs2) == length(KVs) of
> + true -> % got full node, return the already calculated
> reduction
> + {[], [{nil, {P, R}}]};
> + false -> % otherwise return the keyvalues for later reduction
> + {KVs2, []}
> + end
> + end.
> +
> +
> +collect_kp_node(Bt, NodeList, KeyStart, KeyEnd) ->
> + Nodes =
> + case KeyStart of
> + nil ->
> + NodeList;
> + _ ->
> + lists:dropwhile(
> + fun({Key,_}) ->
> + less(Bt, Key, KeyStart)
> + end, NodeList)
> + end,
> +
> + case KeyEnd of
> + nil ->
> + case Nodes of
> + [] ->
> + {[], []};
> + [{_, StartNodeInfo}|RestNodes] ->
> + {DownKVs, DownNodes} = collect_node(Bt, StartNodeInfo,
> KeyStart, KeyEnd),
> + {DownKVs, DownNodes ++ RestNodes}
> + end;
> + _ ->
> + {GTEKeyEndNodes, LTKeyEndNodes} = lists:splitwith(
> + fun({Key,_}) ->
> + not less(Bt, Key, KeyEnd)
> + end, lists:reverse(Nodes)),
> +
> + {MatchingKVs, MatchingNodes} =
> + case lists:reverse(LTKeyEndNodes) of
> + [{_, StartNodeInfo}] ->
> + collect_node(Bt, StartNodeInfo, KeyStart, KeyEnd);
> + [{_, StartNodeInfo}|RestLTNodes] ->
> + % optimization, since we have more KP nodes in range,
> we don't need
> + % to provide the endkey when searching the start node,
> making
> + % collecting the node faster.
> + {DownKVs, DownNodes} = collect_node(Bt, StartNodeInfo,
> KeyStart, nil),
> + {DownKVs, DownNodes ++ RestLTNodes};
> + [] ->
> + {[], []}
> + end,
> +
> + case lists:reverse(GTEKeyEndNodes) of
> + [{_, EndNodeInfo} | _] when LTKeyEndNodes == [] ->
> + collect_node(Bt, EndNodeInfo, KeyStart, KeyEnd);
> + [{_, EndNodeInfo} | _] ->
> + {KVs1, DownNodes1} = collect_node(Bt, EndNodeInfo, nil,
> KeyEnd),
> + {KVs1 ++ MatchingKVs, DownNodes1 ++ MatchingNodes};
> + [] ->
> + {MatchingKVs, MatchingNodes}
> + end
> + end.
> +
> +
> adjust_dir(fwd, List) ->
> List;
> adjust_dir(rev, List) ->
> lists:reverse(List).
>
> -stream_node(Bt, Offset, PointerInfo, nil, Dir, Fun, Acc) ->
> - stream_node(Bt, Offset, PointerInfo, Dir, Fun, Acc);
> -stream_node(_Bt, _Offset, nil, _StartKey, _Dir, _Fun, Acc) ->
> +stream_node(Bt, Reds, PointerInfo, nil, Dir, Fun, Acc) ->
> + stream_node(Bt, Reds, PointerInfo, Dir, Fun, Acc);
> +stream_node(_Bt, _Reds, nil, _StartKey, _Dir, _Fun, Acc) ->
> {ok, Acc};
> -stream_node(Bt, Offset, {Pointer, _Count}, StartKey, Dir, Fun, Acc)
> ->
> +stream_node(Bt, Reds, {Pointer, _Reds}, StartKey, Dir, Fun, Acc) ->
> {NodeType, NodeList} = get_node(Bt, Pointer),
> case NodeType of
> kp_node ->
> - stream_kp_node(Bt, Offset, adjust_dir(Dir, NodeList),
> StartKey, Dir, Fun, Acc);
> + stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList),
> StartKey, Dir, Fun, Acc);
> kv_node ->
> - stream_kv_node(Bt, Offset, adjust_dir(Dir, NodeList),
> StartKey, Dir, Fun, Acc)
> + stream_kv_node(Bt, Reds, adjust_dir(Dir, NodeList),
> StartKey, Dir, Fun, Acc)
> end.
>
> -stream_node(_Bt, _Offset, nil, _Dir, _Fun, Acc) ->
> +stream_node(_Bt, _Reds, nil, _Dir, _Fun, Acc) ->
> {ok, Acc};
> -stream_node(Bt, Offset, {Pointer, _Count}, Dir, Fun, Acc) ->
> +stream_node(Bt, Reds, {Pointer, _Reds}, Dir, Fun, Acc) ->
> {NodeType, NodeList} = get_node(Bt, Pointer),
> case NodeType of
> kp_node ->
> - stream_kp_node(Bt, Offset, adjust_dir(Dir, NodeList), Dir,
> Fun, Acc);
> + stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), Dir,
> Fun, Acc);
> kv_node ->
> - stream_kv_node(Bt, Offset, adjust_dir(Dir, NodeList), Dir,
> Fun, Acc)
> + stream_kv_node2(Bt, Reds, [], adjust_dir(Dir, NodeList),
> Dir, Fun, Acc)
> end.
>
> -stream_kp_node(_Bt, _Offset, [], _Dir, _Fun, Acc) ->
> +stream_kp_node(_Bt, _Reds, [], _Dir, _Fun, Acc) ->
> {ok, Acc};
> -stream_kp_node(Bt, Offset, [{_Key, {Pointer, Count}} | Rest], Dir,
> Fun, Acc) ->
> - case stream_node(Bt, Offset, {Pointer, Count}, Dir, Fun, Acc) of
> +stream_kp_node(Bt, Reds, [{_Key, {Pointer, Red}} | Rest], Dir, Fun,
> Acc) ->
> + case stream_node(Bt, Reds, {Pointer, Red}, Dir, Fun, Acc) of
> {ok, Acc2} ->
> - stream_kp_node(Bt, Offset + Count, Rest, Dir, Fun, Acc2);
> + stream_kp_node(Bt, [Red | Reds], Rest, Dir, Fun, Acc2);
> {stop, Acc2} ->
> {stop, Acc2}
> end.
>
> -drop_nodes(_Bt, Offset, _StartKey, []) ->
> - {Offset, []};
> -drop_nodes(Bt, Offset, StartKey, [{NodeKey, {Pointer, Count}} |
> RestKPs]) ->
> +drop_nodes(_Bt, Reds, _StartKey, []) ->
> + {Reds, []};
> +drop_nodes(Bt, Reds, StartKey, [{NodeKey, {Pointer, Red}} |
> RestKPs]) ->
> case less(Bt, NodeKey, StartKey) of
> - true -> drop_nodes(Bt, Offset + Count, StartKey, RestKPs);
> - false -> {Offset, [{NodeKey, {Pointer, Count}} | RestKPs]}
> + true -> drop_nodes(Bt, [Red | Reds], StartKey, RestKPs);
> + false -> {Reds, [{NodeKey, {Pointer, Reds}} | RestKPs]}
> end.
>
> -stream_kp_node(Bt, Offset, KPs, StartKey, Dir, Fun, Acc) ->
> - {NewOffset, NodesToStream} =
> +stream_kp_node(Bt, Reds, KPs, StartKey, Dir, Fun, Acc) ->
> + {NewReds, NodesToStream} =
> case Dir of
> fwd ->
> % drop all nodes sorting before the key
> - drop_nodes(Bt, Offset, StartKey, KPs);
> + drop_nodes(Bt, Reds, StartKey, KPs);
> rev ->
> % keep all nodes sorting before the key, AND the first node
> to sort after
> RevKPs = lists:reverse(KPs),
> case lists:splitwith(fun({Key, _Pointer}) -> less(Bt, Key,
> StartKey) end, RevKPs) of
> {_RevBefore, []} ->
> % everything sorts before it
> - {Offset, KPs};
> + {Reds, KPs};
> {RevBefore, [FirstAfter | Drop]} ->
> - {Offset + count(kp_node, Drop), [FirstAfter |
> lists:reverse(RevBefore)]}
> + {[Red || {_K,{_P,Red}} <- Drop] ++ Reds,
> + [FirstAfter | lists:reverse(RevBefore)]}
> end
> end,
> case NodesToStream of
> [] ->
> {ok, Acc};
> [{_Key, PointerInfo} | Rest] ->
> - case stream_node(Bt, NewOffset, PointerInfo, StartKey, Dir,
> Fun, Acc) of
> + case stream_node(Bt, NewReds, PointerInfo, StartKey, Dir,
> Fun, Acc) of
> {ok, Acc2} ->
> - stream_kp_node(Bt, NewOffset, Rest, Dir, Fun, Acc2);
> + stream_kp_node(Bt, NewReds, Rest, Dir, Fun, Acc2);
> {stop, Acc2} ->
> {stop, Acc2}
> end
> end.
>
> -stream_kv_node(_Bt, _Offset, [], _Dir, _Fun, Acc) ->
> - {ok, Acc};
> -stream_kv_node(Bt, Offset, [{K, V} | RestKVs], Dir, Fun, Acc) ->
> - case Fun(assemble(Bt, K, V), Offset, Acc) of
> - {ok, Acc2} ->
> - stream_kv_node(Bt, Offset + 1, RestKVs, Dir, Fun, Acc2);
> - {stop, Acc2} ->
> - {stop, Acc2}
> - end.
> -
> -stream_kv_node(Bt, Offset, KVs, StartKey, Dir, Fun, Acc) ->
> +stream_kv_node(Bt, Reds, KVs, StartKey, Dir, Fun, Acc) ->
> DropFun =
> case Dir of
> fwd ->
> @@ -456,11 +562,36 @@
> rev ->
> fun({Key, _}) -> less(Bt, StartKey, Key) end
> end,
> - % drop all nodes preceding the key
> - GTEKVs = lists:dropwhile(DropFun, KVs),
> - LenSkipped = length(KVs) - length(GTEKVs),
> - stream_kv_node(Bt, Offset + LenSkipped, GTEKVs, Dir, Fun, Acc).
> + {LTKVs, GTEKVs} = lists:splitwith(DropFun, KVs),
> + stream_kv_node2(Bt, Reds, LTKVs, GTEKVs, Dir, Fun, Acc).
>
> +stream_kv_node2(_Bt, _Reds, _PrevKVs, [], _Dir, _Fun, Acc) ->
> + {ok, Acc};
> +stream_kv_node2(Bt, Reds, PrevKVs, [{K,V} | RestKVs], Dir, Fun,
> Acc) ->
> + case Fun(assemble(Bt, K, V), {PrevKVs, Reds}, Acc) of
> + {ok, Acc2} ->
> + stream_kv_node2(Bt, Reds, [{K,V} | PrevKVs], RestKVs, Dir,
> Fun, Acc2);
> + {stop, Acc2} ->
> + {stop, Acc2}
> + end.
> +
> +shuffle(List) ->
> +%% Determine the log n portion then randomize the list.
> + randomize(round(math:log(length(List)) + 0.5), List).
> +
> +randomize(1, List) ->
> + randomize(List);
> +randomize(T, List) ->
> + lists:foldl(fun(_E, Acc) ->
> + randomize(Acc)
> + end, randomize(List), lists:seq(1, (T - 1))).
> +
> +randomize(List) ->
> + D = lists:map(fun(A) ->
> + {random:uniform(), A}
> + end, List),
> + {_, D1} = lists:unzip(lists:keysort(1, D)),
> + D1.
>
>
>
> @@ -468,20 +599,58 @@
> test(1000).
>
> test(N) ->
> - KeyValues = [{random:uniform(), random:uniform()} || _Seq <-
> lists:seq(1, N)],
> - test_btree(KeyValues), % randomly distributed
> - Sorted = lists:sort(KeyValues),
> + Sorted = [{Seq, random:uniform()} || Seq <- lists:seq(1, N)],
> test_btree(Sorted), % sorted regular
> - test_btree(lists:reverse(Sorted)). % sorted reverse
> + test_btree(lists:reverse(Sorted)), % sorted reverse
> + test_btree(shuffle(Sorted)). % randomly distributed
>
>
> test_btree(KeyValues) ->
> {ok, Fd} = couch_file:open("foo", [create,overwrite]),
> {ok, Btree} = open(nil, Fd),
> + ReduceFun =
> + fun(reduce, KVs) ->
> + length(KVs);
> + (combine, Reds) ->
> + lists:sum(Reds)
> + end,
> + Btree1 = set_options(Btree, [{reduce, ReduceFun}]),
>
> % first dump in all the values in one go
> - {ok, Btree10} = add_remove(Btree, KeyValues, []),
> + {ok, Btree10} = add_remove(Btree1, KeyValues, []),
>
> +
> + Len = length(KeyValues),
> +
> + {ok, Len} = reduce(Btree10, nil, nil),
> +
> + % Count of all from start to Val1
> + Val1 = Len div 3,
> + {ok, Val1} = reduce(Btree10, nil, Val1),
> + % Count of all from Val1 to end
> + CountVal1ToEnd = Len - Val1 + 1,
> + {ok, CountVal1ToEnd} = reduce(Btree10, Val1, nil),
> +
> + % Count of all from Val1 to Val2
> + Val2 = 2*Len div 3,
> + CountValRange = Val2 - Val1 + 1,
> + {ok, CountValRange} = reduce(Btree10, Val1, Val2),
> +
> + % get the leading reduction as we foldl/r
> + {ok, true} = foldl(Btree10, Val1, fun(_X, LeadingReds, _Acc) ->
> + CountToStart = Val1 - 1,
> + CountToStart = final_reduce(Btree10, LeadingReds),
> + {stop, true} % change Acc to 'true'
> + end,
> + false),
> +
> + {ok, true} = foldr(Btree10, Val1, fun(_X, LeadingReds, _Acc) ->
> + CountToEnd = Len - Val1,
> + CountToEnd = final_reduce(Btree10, LeadingReds),
> + {stop, true} % change Acc to 'true'
> + end,
> + false),
> +
> ok = test_keys(Btree10, KeyValues),
>
> % remove everything
>
> Modified: incubator/couchdb/trunk/src/couchdb/couch_db.erl
> URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_db.erl?rev=656861&r1=656860&r2=656861&view=diff
> =
> =
> =
> =
> =
> =
> =
> =
> ======================================================================
> --- incubator/couchdb/trunk/src/couchdb/couch_db.erl (original)
> +++ incubator/couchdb/trunk/src/couchdb/couch_db.erl Thu May 15
> 14:51:22 2008
> @@ -17,10 +17,18 @@
> -export([save_docs/2, save_docs/3, get_db_info/1, update_doc/3,
> update_docs/2, update_docs/3]).
> -export([delete_doc/3,open_doc/2,open_doc/3,enum_docs_since/
> 4,enum_docs_since/5]).
> -export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/
> 2]).
> +-export([enum_docs_since_reduce_to_count/
> 1,enum_docs_reduce_to_count/1]).
> -export([start_update_loop/2]).
> -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/
> 3,handle_info/2]).
> -export([start_copy_compact_int/2]).
>
> +-export([btree_by_id_split/1,
> + btree_by_id_join/2,
> + btree_by_id_reduce/2,
> + btree_by_seq_split/1,
> + btree_by_seq_join/2,
> + btree_by_seq_reduce/2]).
> +
> -include("couch_db.hrl").
>
> -record(db_header,
> @@ -363,6 +371,12 @@
>
> Doc#doc{attachments = NewBins}.
>
> +enum_docs_since_reduce_to_count(Reds) ->
> + couch_btree:final_reduce(fun btree_by_seq_reduce/2, Reds).
> +
> +enum_docs_reduce_to_count(Reds) ->
> + couch_btree:final_reduce(fun btree_by_id_reduce/2, Reds).
> +
> enum_docs_since(MainPid, SinceSeq, Direction, InFun, Ctx) ->
> Db = get_db(MainPid),
> couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1,
> Direction, InFun, Ctx).
> @@ -407,22 +421,38 @@
> deleted_conflict_revs = DelConflicts,
> deleted = Deleted}.
>
> -btree_by_name_split(#full_doc_info{id=Id, update_seq=Seq,
> rev_tree=Tree}) ->
> - {Id, {Seq, Tree}}.
> +btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq,
> + deleted=Deleted, rev_tree=Tree}) ->
> + {Id, {Seq, case Deleted of true -> 1; false-> 0 end, Tree}}.
>
> -btree_by_name_join(Id, {Seq, Tree}) ->
> - #full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}.
> +btree_by_id_join(Id, {Seq, Deleted, Tree}) ->
> + #full_doc_info{id=Id, update_seq=Seq, deleted=Deleted==1,
> rev_tree=Tree}.
>
>
> +
> +btree_by_id_reduce(reduce, FullDocInfos) ->
> + % count the number of deleted documents
> + length([1 || #full_doc_info{deleted=false} <- FullDocInfos]);
> +btree_by_id_reduce(combine, Reds) ->
> + lists:sum(Reds).
> +
> +btree_by_seq_reduce(reduce, DocInfos) ->
> + % count the number of deleted documents
> + length(DocInfos);
> +btree_by_seq_reduce(combine, Reds) ->
> + lists:sum(Reds).
> +
> init_db(DbName, Filepath, Fd, Header) ->
> {ok, SummaryStream} =
> couch_stream:open(Header#db_header.summary_stream_state, Fd),
> ok = couch_stream:set_min_buffer(SummaryStream, 10000),
> {ok, IdBtree} =
> couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
> - [{split, fun(V) -> btree_by_name_split(V) end},
> - {join, fun(K,V) -> btree_by_name_join(K,V) end}] ),
> + [{split, fun btree_by_id_split/1},
> + {join, fun btree_by_id_join/2},
> + {reduce, fun btree_by_id_reduce/2}]),
> {ok, SeqBtree} =
> couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
> - [{split, fun(V) -> btree_by_seq_split(V) end},
> - {join, fun(K,V) -> btree_by_seq_join(K,V) end}] ),
> + [{split, fun btree_by_seq_split/1},
> + {join, fun btree_by_seq_join/2},
> + {reduce, fun btree_by_seq_reduce/2}]),
> {ok, LocalDocsBtree} =
> couch_btree:open(Header#db_header.local_docs_btree_state, Fd),
>
> #db{
> @@ -437,8 +467,7 @@
> doc_count = Header#db_header.doc_count,
> doc_del_count = Header#db_header.doc_del_count,
> name = DbName,
> - filepath=Filepath
> - }.
> + filepath=Filepath }.
>
> close_db(#db{fd=Fd,summary_stream=Ss}) ->
> couch_file:close(Fd),
> @@ -759,7 +788,9 @@
> if Deleted -> {DocCount, DelCount + 1};
> true -> {DocCount + 1, DelCount}
> end,
> - new_index_entries(RestInfos, DocCount2, DelCount2, [FullDocInfo|
> AccById], [DocInfo|AccBySeq]).
> + new_index_entries(RestInfos, DocCount2, DelCount2,
> + [FullDocInfo#full_doc_info{deleted=Deleted}|AccById],
> + [DocInfo|AccBySeq]).
>
> update_docs_int(Db, DocsList, Options) ->
> #db{
>
> Modified: incubator/couchdb/trunk/src/couchdb/couch_db.hrl
> URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_db.hrl?rev=656861&r1=656860&r2=656861&view=diff
> =
> =
> =
> =
> =
> =
> =
> =
> ======================================================================
> --- incubator/couchdb/trunk/src/couchdb/couch_db.hrl (original)
> +++ incubator/couchdb/trunk/src/couchdb/couch_db.hrl Thu May 15
> 14:51:22 2008
> @@ -45,6 +45,7 @@
> -record(full_doc_info,
> {id = "",
> update_seq = 0,
> + deleted = false,
> rev_tree = []
> }).
>
>
> Modified: incubator/couchdb/trunk/src/couchdb/couch_httpd.erl
> URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_httpd.erl?rev=656861&r1=656860&r2=656861&view=diff
> =
> =
> =
> =
> =
> =
> =
> =
> ======================================================================
> --- incubator/couchdb/trunk/src/couchdb/couch_httpd.erl (original)
> +++ incubator/couchdb/trunk/src/couchdb/couch_httpd.erl Thu May 15
> 14:51:22 2008
> @@ -13,7 +13,7 @@
> -module(couch_httpd).
> -include("couch_db.hrl").
>
> --export([start_link/3, stop/0]).
> +-export([start_link/3, stop/0, handle_request/2]).
>
> -record(doc_query_args, {
> options = [],
> @@ -35,7 +35,7 @@
> }).
>
> start_link(BindAddress, Port, DocumentRoot) ->
> - Loop = fun (Req) -> handle_request(Req, DocumentRoot) end,
> + Loop = fun (Req) -> apply(couch_httpd, handle_request, [Req,
> DocumentRoot]) end,
> mochiweb_http:start([
> {loop, Loop},
> {name, ?MODULE},
> @@ -47,6 +47,7 @@
> mochiweb_http:stop(?MODULE).
>
> handle_request(Req, DocumentRoot) ->
> +
> % alias HEAD to GET as mochiweb takes care of stripping the body
> Method = case Req:get(method) of
> 'HEAD' -> 'GET';
> @@ -263,18 +264,19 @@
> true -> StartDocId
> end,
>
> - FoldlFun = make_view_fold_fun(Req, QueryArgs),
> + FoldlFun = make_view_fold_fun(Req, QueryArgs, TotalRowCount,
> + fun couch_db:enum_docs_reduce_to_count/1),
> AdapterFun = fun(#full_doc_info{id=Id}=FullDocInfo, Offset, Acc)
> ->
> case couch_doc:to_doc_info(FullDocInfo) of
> #doc_info{deleted=false, rev=Rev} ->
> - FoldlFun(Id, Id, {obj, [{rev, Rev}]}, Offset,
> TotalRowCount, Acc);
> + FoldlFun({{Id, Id}, {obj, [{rev, Rev}]}}, Offset, Acc);
> #doc_info{deleted=true} ->
> {ok, Acc}
> end
> end,
> {ok, FoldResult} = couch_db:enum_docs(Db, StartId, Dir,
> AdapterFun,
> {Count, SkipCount, undefined, []}),
> - finish_view_fold(Req, {ok, TotalRowCount, FoldResult});
> + finish_view_fold(Req, TotalRowCount, {ok, FoldResult});
>
> handle_db_request(_Req, _Method, {_DbName, _Db, ["_all_docs"]}) ->
> throw({method_not_allowed, "GET,HEAD"});
> @@ -290,7 +292,8 @@
> {ok, Info} = couch_db:get_db_info(Db),
> TotalRowCount = proplists:get_value(doc_count, Info),
>
> - FoldlFun = make_view_fold_fun(Req, QueryArgs),
> + FoldlFun = make_view_fold_fun(Req, QueryArgs, TotalRowCount,
> + fun couch_db:enum_docs_since_reduce_to_count/1),
> StartKey2 = case StartKey of
> nil -> 0;
> <<>> -> 100000000000;
> @@ -321,9 +324,9 @@
> false -> []
> end
> },
> - FoldlFun(Id, UpdateSeq, Json, Offset, TotalRowCount, Acc)
> + FoldlFun({{UpdateSeq, Id}, Json}, Offset, Acc)
> end, {Count, SkipCount, undefined, []}),
> - finish_view_fold(Req, {ok, TotalRowCount, FoldResult});
> + finish_view_fold(Req, TotalRowCount, {ok, FoldResult});
>
> handle_db_request(_Req, _Method, {_DbName, _Db,
> ["_all_docs_by_seq"]}) ->
> throw({method_not_allowed, "GET,HEAD"});
> @@ -331,17 +334,31 @@
> handle_db_request(Req, 'GET', {DbName, _Db, ["_view", DocId,
> ViewName]}) ->
> #view_query_args{
> start_key = StartKey,
> + end_key = EndKey,
> count = Count,
> skip = SkipCount,
> direction = Dir,
> - start_docid = StartDocId
> - } = QueryArgs = parse_view_query(Req),
> - View = {DbName, "_design/" ++ DocId, ViewName},
> - Start = {StartKey, StartDocId},
> - FoldlFun = make_view_fold_fun(Req, QueryArgs),
> - FoldAccInit = {Count, SkipCount, undefined, []},
> - FoldResult = couch_view:fold(View, Start, Dir, FoldlFun,
> FoldAccInit),
> - finish_view_fold(Req, FoldResult);
> + start_docid = StartDocId,
> + end_docid = EndDocId
> + } = QueryArgs = parse_view_query(Req),
> + case couch_view:get_map_view({DbName, "_design/" ++ DocId,
> ViewName}) of
> + {ok, View} ->
> + {ok, RowCount} = couch_view:get_row_count(View),
> + Start = {StartKey, StartDocId},
> + FoldlFun = make_view_fold_fun(Req, QueryArgs, RowCount,
> + fun couch_view:reduce_to_count/1),
> + FoldAccInit = {Count, SkipCount, undefined, []},
> + FoldResult = couch_view:fold(View, Start, Dir, FoldlFun,
> FoldAccInit),
> + finish_view_fold(Req, RowCount, FoldResult);
> + {not_found, Reason} ->
> + case couch_view:get_reduce_view({DbName, "_design/" ++
> DocId, ViewName}) of
> + {ok, View} ->
> + {ok, Value} = couch_view:reduce(View, {StartKey,
> StartDocId}, {EndKey, EndDocId}),
> + send_json(Req, {obj, [{ok,true}, {result, Value}]});
> + _ ->
> + throw({not_found, Reason})
> + end
> + end;
>
> handle_db_request(_Req, _Method, {_DbName, _Db, ["_view", _DocId,
> _ViewName]}) ->
> throw({method_not_allowed, "GET,HEAD"});
> @@ -358,10 +375,12 @@
> handle_db_request(Req, 'POST', {DbName, _Db, ["_temp_view"]}) ->
> #view_query_args{
> start_key = StartKey,
> + end_key = EndKey,
> count = Count,
> skip = SkipCount,
> direction = Dir,
> - start_docid = StartDocId
> + start_docid = StartDocId,
> + end_docid = EndDocId
> } = QueryArgs = parse_view_query(Req),
>
> ContentType = case Req:get_primary_header_value("content-type") of
> @@ -370,13 +389,25 @@
> Else ->
> Else
> end,
> -
> - View = {temp, DbName, ContentType, Req:recv_body()},
> - Start = {StartKey, StartDocId},
> - FoldlFun = make_view_fold_fun(Req, QueryArgs),
> - FoldAccInit = {Count, SkipCount, undefined, []},
> - FoldResult = couch_view:fold(View, Start, Dir, FoldlFun,
> FoldAccInit),
> - finish_view_fold(Req, FoldResult);
> + case cjson:decode(Req:recv_body()) of
> + {obj, Props} ->
> + MapSrc = proplists:get_value("map",Props),
> + RedSrc = proplists:get_value("reduce",Props),
> + {ok, View} = couch_view:get_reduce_view(
> + {temp, DbName, ContentType, MapSrc, RedSrc}),
> + {ok, Value} = couch_view:reduce(View, {StartKey,
> StartDocId}, {EndKey, EndDocId}),
> + send_json(Req, {obj, [{ok,true}, {result, Value}]});
> + Src when is_list(Src) ->
> +
> + {ok, View} = couch_view:get_map_view({temp, DbName,
> ContentType, Src}),
> + Start = {StartKey, StartDocId},
> + {ok, TotalRows} = couch_view:get_row_count(View),
> + FoldlFun = make_view_fold_fun(Req, QueryArgs, TotalRows,
> + fun couch_view:reduce_to_count/1),
> + FoldAccInit = {Count, SkipCount, undefined, []},
> + FoldResult = couch_view:fold(View, Start, Dir, FoldlFun,
> FoldAccInit),
> + finish_view_fold(Req, TotalRows, FoldResult)
> + end;
>
> handle_db_request(_Req, _Method, {_DbName, _Db, ["_temp_view"]}) ->
> throw({method_not_allowed, "POST"});
> @@ -618,7 +649,8 @@
> end
> end, #view_query_args{}, QueryList).
>
> -make_view_fold_fun(Req, QueryArgs) ->
> +
> +make_view_fold_fun(Req, QueryArgs, TotalViewCount, ReduceCountFun) ->
> #view_query_args{
> end_key = EndKey,
> end_docid = EndDocId,
> @@ -626,7 +658,8 @@
> count = Count
> } = QueryArgs,
>
> - PassedEndFun = case Dir of
> + PassedEndFun =
> + case Dir of
> fwd ->
> fun(ViewKey, ViewId) ->
> couch_view:less_json({EndKey, EndDocId}, {ViewKey,
> ViewId})
> @@ -636,10 +669,11 @@
> couch_view:less_json({ViewKey, ViewId}, {EndKey,
> EndDocId})
> end
> end,
> -
> - NegCountFun = fun(Id, Key, Value, Offset, TotalViewCount,
> +
> + NegCountFun = fun({{Key, DocId}, Value}, OffsetReds,
> {AccCount, AccSkip, Resp, AccRevRows}) ->
> - PassedEnd = PassedEndFun(Key, Id),
> + Offset = ReduceCountFun(OffsetReds),
> + PassedEnd = PassedEndFun(Key, DocId),
> case {PassedEnd, AccCount, AccSkip, Resp} of
> {true, _, _, _} -> % The stop key has been passed, stop
> looping.
> {stop, {AccCount, AccSkip, Resp, AccRevRows}};
> @@ -654,17 +688,18 @@
> JsonBegin = io_lib:format("{\"total_rows\":~w,\"offset
> \":~w,\"rows\":[",
> [TotalViewCount, Offset2]),
> Resp2:write_chunk(lists:flatten(JsonBegin)),
> - JsonObj = {obj, [{id, Id}, {key, Key}, {value, Value}]},
> + JsonObj = {obj, [{id, DocId}, {key, Key}, {value,
> Value}]},
> {ok, {AccCount + 1, 0, Resp2, [cjson:encode(JsonObj) |
> AccRevRows]}};
> {_, AccCount, _, Resp} ->
> - JsonObj = {obj, [{id, Id}, {key, Key}, {value, Value}]},
> + JsonObj = {obj, [{id, DocId}, {key, Key}, {value,
> Value}]},
> {ok, {AccCount + 1, 0, Resp, [cjson:encode(JsonObj), ","
> | AccRevRows]}}
> end
> end,
>
> - PosCountFun = fun(Id, Key, Value, Offset, TotalViewCount,
> + PosCountFun = fun({{Key, DocId}, Value}, OffsetReds,
> {AccCount, AccSkip, Resp, AccRevRows}) ->
> - PassedEnd = PassedEndFun(Key, Id),
> + Offset = ReduceCountFun(OffsetReds),
> + PassedEnd = PassedEndFun(Key, DocId),
> case {PassedEnd, AccCount, AccSkip, Resp} of
> {true, _, _, _} ->
> % The stop key has been passed, stop looping.
> @@ -678,11 +713,11 @@
> Resp2 = start_json_response(Req, 200),
> JsonBegin = io_lib:format("{\"total_rows\":~w,\"offset
> \":~w,\"rows\":[\r\n",
> [TotalViewCount, Offset]),
> - JsonObj = {obj, [{id, Id}, {key, Key}, {value, Value}]},
> + JsonObj = {obj, [{id, DocId}, {key, Key}, {value,
> Value}]},
> Resp2:write_chunk(lists:flatten(JsonBegin ++
> cjson:encode(JsonObj))),
> {ok, {AccCount - 1, 0, Resp2, AccRevRows}};
> {_, AccCount, _, Resp} when (AccCount > 0) ->
> - JsonObj = {obj, [{"id", Id}, {"key", Key}, {"value",
> Value}]},
> + JsonObj = {obj, [{"id", DocId}, {"key", Key}, {"value",
> Value}]},
> Resp:write_chunk(",\r\n" ++
> lists:flatten(cjson:encode(JsonObj))),
> {ok, {AccCount - 1, 0, Resp, AccRevRows}}
> end
> @@ -692,16 +727,16 @@
> false -> NegCountFun
> end.
>
> -finish_view_fold(Req, FoldResult) ->
> +finish_view_fold(Req, TotalRows, FoldResult) ->
> case FoldResult of
> - {ok, TotalRows, {_, _, undefined, _}} ->
> + {ok, {_, _, undefined, _}} ->
> % nothing found in the view, nothing has been returned
> % send empty view
> send_json(Req, 200, {obj, [
> {total_rows, TotalRows},
> {rows, {}}
> ]});
> - {ok, _TotalRows, {_, _, Resp, AccRevRows}} ->
> + {ok, {_, _, Resp, AccRevRows}} ->
> % end the view
> Resp:write_chunk(lists:flatten(AccRevRows) ++ "\r\n]}"),
> end_json_response(Resp);
>
> Modified: incubator/couchdb/trunk/src/couchdb/couch_key_tree.erl
> URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_key_tree.erl?rev=656861&r1=656860&r2=656861&view=diff
> =
> =
> =
> =
> =
> =
> =
> =
> ======================================================================
> --- incubator/couchdb/trunk/src/couchdb/couch_key_tree.erl (original)
> +++ incubator/couchdb/trunk/src/couchdb/couch_key_tree.erl Thu May
> 15 14:51:22 2008
> @@ -18,7 +18,7 @@
> -export([map/2, get_all_leafs/1, get_leaf_keys/1, count_leafs/1]).
>
> % a key tree looks like this:
> -% Tree -> [] or [{Key, Value, Tree} | SiblingTree]
> +% Tree -> [] or [{Key, Value, ChildTree} | SiblingTree]
> % ChildTree -> Tree
> % SiblingTree -> [] or [{SiblingKey, Value, Tree} | Tree]
> % And each Key < SiblingKey
>
> Modified: incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl
> URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl?rev=656861&r1=656860&r2=656861&view=diff
> =
> =
> =
> =
> =
> =
> =
> =
> ======================================================================
> --- incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl
> (original)
> +++ incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl Thu
> May 15 14:51:22 2008
> @@ -17,7 +17,7 @@
>
> -export([init/1, terminate/2, handle_call/3, handle_cast/2,
> handle_info/2,code_change/3,stop/0]).
> -export([start_doc_map/2, map_docs/2, stop_doc_map/1]).
> -
> +-export([reduce/3, combine/3]).
> -export([test/0, test/1]).
>
> -include("couch_db.hrl").
> @@ -65,32 +65,21 @@
> % send command and get a response.
> prompt(Port, Json) ->
> writeline(Port, cjson:encode(Json)),
> - read_json(Port).
> + case read_json(Port) of
> + {obj, [{"error", Id}, {"reason", Reason}]} ->
> + throw({list_to_atom(Id),Reason});
> + {obj, [{"reason", Reason}, {"error", Id}]} ->
> + throw({list_to_atom(Id),Reason});
> + Result ->
> + Result
> + end.
>
>
> start_doc_map(Lang, Functions) ->
> - Port =
> - case gen_server:call(couch_query_servers, {get_port, Lang}) of
> - {ok, Port0} ->
> - link(Port0),
> - Port0;
> - {empty, Cmd} ->
> - ?LOG_INFO("Spawning new ~s instance.", [Lang]),
> - open_port({spawn, Cmd}, [stream,
> - {line, 1000},
> - exit_status,
> - hide]);
> - Error ->
> - throw(Error)
> - end,
> - true = prompt(Port, {"reset"}),
> + Port = get_linked_port(Lang),
> % send the functions as json strings
> lists:foreach(fun(FunctionSource) ->
> - case prompt(Port, {"add_fun", FunctionSource}) of
> - true -> ok;
> - {obj, [{"error", Id}, {"reason", Reason}]} ->
> - throw({Id, Reason})
> - end
> + true = prompt(Port, {"add_fun", FunctionSource})
> end,
> Functions),
> {ok, {Lang, Port}}.
> @@ -100,19 +89,13 @@
> Results = lists:map(
> fun(Doc) ->
> Json = couch_doc:to_json_obj(Doc, []),
> - case prompt(Port, {"map_doc", Json}) of
> - {obj, [{"error", Id}, {"reason", Reason}]} ->
> - throw({list_to_atom(Id),Reason});
> - {obj, [{"reason", Reason}, {"error", Id}]} ->
> - throw({list_to_atom(Id),Reason});
> - Results when is_tuple(Results) ->
> - % the results are a json array of function map
> yields like this:
> - % {FunResults1, FunResults2 ...}
> - % where funresults is are json arrays of key value
> pairs:
> - % {{Key1, Value1}, {Key2, Value2}}
> - % Convert to real lists, execept the key, value pairs
> - [tuple_to_list(FunResult) || FunResult <-
> tuple_to_list(Results)]
> - end
> + Results = prompt(Port, {"map_doc", Json}),
> + % the results are a json array of function map yields
> like this:
> + % {FunResults1, FunResults2 ...}
> + % where funresults is are json arrays of key value pairs:
> + % {{Key1, Value1}, {Key2, Value2}}
> + % Convert to real lists, execept the key, value pairs
> + [tuple_to_list(FunResult) || FunResult <-
> tuple_to_list(Results)]
> end,
> Docs),
> {ok, Results}.
> @@ -121,10 +104,68 @@
> stop_doc_map(nil) ->
> ok;
> stop_doc_map({Lang, Port}) ->
> + return_linked_port(Lang, Port).
> +
> +get_linked_port(Lang) ->
> + case gen_server:call(couch_query_servers, {get_port, Lang}) of
> + {ok, Port0} ->
> + link(Port0),
> + true = prompt(Port0, {"reset"}),
> + Port0;
> + {empty, Cmd} ->
> + ?LOG_INFO("Spawning new ~s instance.", [Lang]),
> + open_port({spawn, Cmd}, [stream,
> + {line, 1000},
> + exit_status,
> + hide]);
> + Error ->
> + throw(Error)
> + end.
> +
> +return_linked_port(Lang, Port) ->
> ok = gen_server:call(couch_query_servers, {return_port, {Lang,
> Port}}),
> true = unlink(Port),
> ok.
>
> +group_reductions_results([]) ->
> + [];
> +group_reductions_results(List) ->
> + {Heads, Tails} = lists:foldl(
> + fun([H|T], {HAcc,TAcc}) ->
> + {[H|HAcc], [T|TAcc]}
> + end, {[], []}, List),
> + case Tails of
> + [[]|_] -> % no tails left
> + [Heads];
> + _ ->
> + [Heads | group_reductions_results(Tails)]
> + end.
> +
> +combine(_Lang, [], _ReducedValues) ->
> + {ok, []};
> +combine(Lang, RedSrcs, ReducedValues) ->
> + Port = get_linked_port(Lang),
> + Grouped = group_reductions_results(ReducedValues),
> + Results = lists:zipwith(
> + fun(FunSrc, Values) ->
> + {true, {Result}} =
> + prompt(Port, {"combine", {FunSrc},
> list_to_tuple(Values)}),
> + Result
> + end, RedSrcs, Grouped),
> +
> + return_linked_port(Lang, Port),
> + {ok, Results}.
> +
> +reduce(_Lang, [], _KVs) ->
> + {ok, []};
> +reduce(Lang, RedSrcs, KVs) ->
> + Port = get_linked_port(Lang),
> + {true, Results} = prompt(Port,
> + {"reduce", list_to_tuple(RedSrcs), list_to_tuple(KVs)}),
> + return_linked_port(Lang, Port),
> + {ok, tuple_to_list(Results)}.
> +
> +
> init(QueryServerList) ->
> {ok, {QueryServerList, []}}.
>
>
> Modified: incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl
> URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl?rev=656861&r1=656860&r2=656861&view=diff
> =
> =
> =
> =
> =
> =
> =
> =
> ======================================================================
> --- incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl
> (original)
> +++ incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl Thu May
> 15 14:51:22 2008
> @@ -176,7 +176,7 @@
> case StartResult of
> {ok,_} ->
> % only output when startup was successful
> - io:format("Find Futon, the management interface,
> at:~nhttp://~s:~s/_utils/index.html~n~n", [BindAddress, Port]),
> + %io:format("Find Futon, the management interface,
> at:~nhttp://~s:~s/_utils/index.html~n~n", [BindAddress, Port]),
> io:format("Apache CouchDB has started. Time to relax.~n");
> _ ->
> % Since we failed startup, unconditionally dump
> configuration data to console
>
> Modified: incubator/couchdb/trunk/src/couchdb/couch_util.erl
> URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_util.erl?rev=656861&r1=656860&r2=656861&view=diff
> =
> =
> =
> =
> =
> =
> =
> =
> ======================================================================
> --- incubator/couchdb/trunk/src/couchdb/couch_util.erl (original)
> +++ incubator/couchdb/trunk/src/couchdb/couch_util.erl Thu May 15
> 14:51:22 2008
> @@ -16,7 +16,7 @@
> -export([parse_ini/1,should_flush/0, should_flush/1]).
> -export([new_uuid/0, rand32/0, implode/2, collate/2, collate/3]).
> -export([abs_pathname/1,abs_pathname/2, trim/1, ascii_lower/1]).
> --export([encodeBase64/1, decodeBase64/1]).
> +-export([encodeBase64/1, decodeBase64/1, to_hex/1]).
>
>
> % arbitrarily chosen amount of memory to use before flushing to disk
> @@ -30,20 +30,21 @@
> {error, already_loaded} -> ok;
> Error -> exit(Error)
> end.
> -
> -
> +
> +
> +
> new_uuid() ->
> - to_hex(binary_to_list(crypto:rand_bytes(16))).
> -
> + to_hex(crypto:rand_bytes(16)).
> +
> to_hex([]) ->
> [];
> +to_hex(Bin) when is_binary(Bin) ->
> + to_hex(binary_to_list(Bin));
> to_hex([H|T]) ->
> [to_digit(H div 16), to_digit(H rem 16) | to_hex(T)].
>
> -to_digit(N) when N < 10 ->
> - $0 + N;
> -to_digit(N) ->
> - $a + N-10.
> +to_digit(N) when N < 10 -> $0 + N;
> +to_digit(N) -> $a + N-10.
>
>
> % returns a random integer
>
> Modified: incubator/couchdb/trunk/src/couchdb/couch_view.erl
> URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_view.erl?rev=656861&r1=656860&r2=656861&view=diff
> =
> =
> =
> =
> =
> =
> =
> =
> ======================================================================
> --- incubator/couchdb/trunk/src/couchdb/couch_view.erl (original)
> +++ incubator/couchdb/trunk/src/couchdb/couch_view.erl Thu May 15
> 14:51:22 2008
> @@ -15,8 +15,9 @@
> -module(couch_view).
> -behaviour(gen_server).
>
> --export([start_link/1,fold/4,fold/5,less_json/2, start_update_loop/
> 3, start_temp_update_loop/4]).
> +-export([start_link/1,fold/4,fold/5,less_json/2, start_update_loop/
> 3, start_temp_update_loop/5]).
> -export([init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/
> 2,code_change/3]).
> +-export([get_reduce_view/1, get_map_view/1,get_row_count/
> 1,reduce_to_count/1, reduce/3]).
>
> -include("couch_db.hrl").
>
> @@ -26,16 +27,18 @@
> name,
> def_lang,
> views,
> - id_btree,
> - current_seq,
> + reductions=[], % list of reduction names and id_num of view
> that contains it.
> + id_btree=nil,
> + current_seq=0,
> query_server=nil
> }).
>
> -record(view,
> {id_num,
> - name,
> - btree,
> - def
> + map_names=[],
> + def,
> + btree=nil,
> + reduce_funs=[]
> }).
>
> -record(server,
> @@ -47,8 +50,8 @@
>
>
>
> -get_temp_updater(DbName, Type, Src) ->
> - {ok, Pid} = gen_server:call(couch_view, {start_temp_updater,
> DbName, Type, Src}),
> +get_temp_updater(DbName, Type, MapSrc, RedSrc) ->
> + {ok, Pid} = gen_server:call(couch_view, {start_temp_updater,
> DbName, Type, MapSrc, RedSrc}),
> Pid.
>
> get_updater(DbName, GroupId) ->
> @@ -75,44 +78,135 @@
> end
> end.
>
> -fold(ViewInfo, Dir, Fun, Acc) ->
> - fold(ViewInfo, nil, Dir, Fun, Acc).
> -
> -fold({temp, DbName, Type, Src}, StartKey, Dir, Fun, Acc) ->
> - {ok, #group{views=[View]}} =
> get_updated_group(get_temp_updater(DbName, Type, Src)),
> - fold_view(View#view.btree, StartKey, Dir, Fun, Acc);
> -fold({DbName, GroupId, ViewName}, StartKey, Dir, Fun, Acc) ->
> +get_row_count(#view{btree=Bt}) ->
> + {ok, Reds} = couch_btree:partial_reduce(Bt, nil, nil),
> + {ok, reduce_to_count(Reds)}.
> +
> +get_reduce_view({temp, DbName, Type, MapSrc, RedSrc}) ->
> + {ok, #group{views=[View]}} =
> get_updated_group(get_temp_updater(DbName, Type, MapSrc, RedSrc)),
> + {ok, {temp_reduce, View}};
> +get_reduce_view({DbName, GroupId, Name}) ->
> + {ok, #group{views=Views,def_lang=Lang}} =
> + get_updated_group(get_updater(DbName, GroupId)),
> + get_reduce_view0(Name, Lang, Views).
> +
> +get_reduce_view0(_Name, _Lang, []) ->
> + {not_found, missing_named_view};
> +get_reduce_view0(Name, Lang, [#view{reduce_funs=RedFuns}=View|
> Rest]) ->
> + case get_key_pos(Name, RedFuns, 0) of
> + 0 -> get_reduce_view0(Name, Lang, Rest);
> + N -> {ok, {reduce, N, Lang, View}}
> + end.
> +
> +reduce({temp_reduce, #view{btree=Bt}}, Key1, Key2) ->
> + {ok, {_Count, [Reduction]}} = couch_btree:reduce(Bt, Key1, Key2),
> + {ok, Reduction};
> +
> +reduce({reduce, NthRed, Lang, #view{btree=Bt,
> reduce_funs=RedFuns}}, Key1, Key2) ->
> + {ok, PartialReductions} = couch_btree:partial_reduce(Bt, Key1,
> Key2),
> + PreResultPadding = lists:duplicate(NthRed - 1, []),
> + PostResultPadding = lists:duplicate(length(RedFuns) - NthRed,
> []),
> + {_Name, FunSrc} = lists:nth(NthRed,RedFuns),
> + ReduceFun =
> + fun(reduce, KVs) ->
> + {ok, Reduced} = couch_query_servers:reduce(Lang,
> [FunSrc], KVs),
> + {0, PreResultPadding ++ Reduced ++ PostResultPadding};
> + (combine, Reds) ->
> + UserReds = [[lists:nth(NthRed, UserRedsList)] || {_,
> UserRedsList} <- Reds],
> + {ok, Reduced} = couch_query_servers:combine(Lang,
> [FunSrc], UserReds),
> + {0, PreResultPadding ++ Reduced ++ PostResultPadding}
> + end,
> + {0, [FinalReduction]} = couch_btree:final_reduce(ReduceFun,
> PartialReductions),
> + {ok, FinalReduction}.
> +
> +get_key_pos(_Key, [], _N) ->
> + 0;
> +get_key_pos(Key, [{Key1,_Value}|_], N) when Key == Key1 ->
> + N + 1;
> +get_key_pos(Key, [_|Rest], N) ->
> + get_key_pos(Key, Rest, N+1).
> +
> +get_map_view({temp, DbName, Type, Src}) ->
> + {ok, #group{views=[View]}} =
> get_updated_group(get_temp_updater(DbName, Type, Src, [])),
> + {ok, View};
> +get_map_view({DbName, GroupId, Name}) ->
> {ok, #group{views=Views}} =
> get_updated_group(get_updater(DbName, GroupId)),
> - Btree = get_view_btree(Views, ViewName),
> - fold_view(Btree, StartKey, Dir, Fun, Acc).
> + get_map_view0(Name, Views).
> +
> +get_map_view0(_Name, []) ->
> + {not_found, missing_named_view};
> +get_map_view0(Name, [#view{map_names=MapNames}=View|Rest]) ->
> + case lists:member(Name, MapNames) of
> + true -> {ok, View};
> + false -> get_map_view0(Name, Rest)
> + end.
> +
> +reduce_to_count(Reductions) ->
> + {Count, _} =
> + couch_btree:final_reduce(
> + fun(reduce, KVs) ->
> + {length(KVs), []};
> + (combine, Reds) ->
> + {lists:sum([Count0 || {Count0, _} <- Reds]), []}
> + end, Reductions),
> + Count.
> +
> +
> +design_doc_to_view_group(#doc{id=Id,body={obj, Fields}}) ->
> + Language = proplists:get_value("language", Fields, "text/
> javascript"),
> + {obj, RawViews} = proplists:get_value("views", Fields, {obj,
> []}),
> +
> + % extract the map/reduce views from the json fields and into
> lists
> + MapViewsRaw = [{Name, Src, nil} || {Name, Src} <- RawViews,
> is_list(Src)],
> + MapReduceViewsRaw =
> + [{Name,
> + proplists:get_value("map", MRFuns),
> + proplists:get_value("reduce", MRFuns)}
> + || {Name, {obj, MRFuns}} <- RawViews],
> +
> + % add the views to a dictionary object, with the map source as
> the key
> + DictBySrc =
> + lists:foldl(
> + fun({Name, MapSrc, RedSrc}, DictBySrcAcc) ->
> + View =
> + case dict:find(MapSrc, DictBySrcAcc) of
> + {ok, View0} -> View0;
> + error -> #view{def=MapSrc} % create new view object
> + end,
> + View2 =
> + if RedSrc == nil ->
> + View#view{map_names=[Name|View#view.map_names]};
> + true ->
> + View#view{reduce_funs=[{Name,RedSrc}|
> View#view.reduce_funs]}
> + end,
> + dict:store(MapSrc, View2, DictBySrcAcc)
> + end, dict:new(), MapViewsRaw ++ MapReduceViewsRaw),
> + % number the views
> + {Views, _N} = lists:mapfoldl(
> + fun({_Src, View}, N) ->
> + {View#view{id_num=N},N+1}
> + end, 0, dict:to_list(DictBySrc)),
>
> -fold_view(Btree, StartKey, Dir, Fun, Acc) ->
> - TotalRowCount = couch_btree:row_count(Btree),
> - WrapperFun = fun({{Key, DocId}, Value}, Offset, WrapperAcc) ->
> - Fun(DocId, Key, Value, Offset, TotalRowCount, WrapperAcc)
> - end,
> - {ok, AccResult} = couch_btree:fold(Btree, StartKey, Dir,
> WrapperFun, Acc),
> - {ok, TotalRowCount, AccResult}.
> + reset_group(#group{name=Id, views=Views, def_lang=Language}).
> +
> +
>
> +fold(#view{btree=Btree}, Dir, Fun, Acc) ->
> + {ok, _AccResult} = couch_btree:fold(Btree, Dir, Fun, Acc).
>
> -get_view_btree([], _ViewName) ->
> - throw({not_found, missing_named_view});
> -get_view_btree([View | _RestViews], ViewName) when View#view.name
> == ViewName ->
> - View#view.btree;
> -get_view_btree([_View | RestViews], ViewName) ->
> - get_view_btree(RestViews, ViewName).
> +fold(#view{btree=Btree}, StartKey, Dir, Fun, Acc) ->
> + {ok, _AccResult} = couch_btree:fold(Btree, StartKey, Dir, Fun,
> Acc).
>
>
> init(RootDir) ->
> - UpdateNotifierFun =
> + couch_db_update_notifier:start_link(
> fun({deleted, DbName}) ->
> gen_server:cast(couch_view, {reset_indexes, DbName});
> ({created, DbName}) ->
> gen_server:cast(couch_view, {reset_indexes, DbName});
> (_Else) ->
> ok
> - end,
> - couch_db_update_notifier:start_link(UpdateNotifierFun),
> + end),
> ets:new(couch_views_by_db, [bag, private, named_table]),
> ets:new(couch_views_by_name, [set, protected, named_table]),
> ets:new(couch_views_by_updater, [set, private, named_table]),
> @@ -127,8 +221,8 @@
> catch ets:delete(couch_views_temp_fd_by_db).
>
>
> -handle_call({start_temp_updater, DbName, Lang, Query}, _From,
> #server{root_dir=Root}=Server) ->
> - <<SigInt:128/integer>> = erlang:md5(Lang ++ Query),
> +handle_call({start_temp_updater, DbName, Lang, MapSrc, RedSrc},
> _From, #server{root_dir=Root}=Server) ->
> + <<SigInt:128/integer>> = erlang:md5(Lang ++ [0] ++ MapSrc ++
> [0] ++ RedSrc),
> Name = lists:flatten(io_lib:format("_temp_~.36B",[SigInt])),
> Pid =
> case ets:lookup(couch_views_by_name, {DbName, Name}) of
> @@ -142,7 +236,8 @@
> ok
> end,
> ?LOG_DEBUG("Spawning new temp update process for db ~s.",
> [DbName]),
> - NewPid = spawn_link(couch_view, start_temp_update_loop,
> [DbName, Fd, Lang, Query]),
> + NewPid = spawn_link(couch_view, start_temp_update_loop,
> + [DbName, Fd, Lang, MapSrc, RedSrc]),
> true = ets:insert(couch_views_temp_fd_by_db, {DbName, Fd,
> Count + 1}),
> add_to_ets(NewPid, DbName, Name),
> NewPid;
> @@ -219,18 +314,22 @@
> {ok, State}.
>
>
> -start_temp_update_loop(DbName, Fd, Lang, Query) ->
> +start_temp_update_loop(DbName, Fd, Lang, MapSrc, RedSrc) ->
> NotifyPids = get_notify_pids(1000),
> case couch_server:open(DbName) of
> {ok, Db} ->
> - View = #view{name="_temp", id_num=0, btree=nil, def=Query},
> + View = #view{map_names=["_temp"],
> + id_num=0,
> + btree=nil,
> + def=MapSrc,
> + reduce_funs= if RedSrc==[] -> []; true -> [{"_temp",
> RedSrc}] end},
> Group = #group{name="_temp",
> db=Db,
> views=[View],
> current_seq=0,
> def_lang=Lang,
> id_btree=nil},
> - Group2 = disk_group_to_mem(Fd, Group),
> + Group2 = disk_group_to_mem(Db, Fd, Group),
> temp_update_loop(Group2, NotifyPids);
> Else ->
> exit(Else)
> @@ -242,24 +341,24 @@
> garbage_collect(),
> temp_update_loop(Group2, get_notify_pids(10000)).
>
> +
> +reset_group(#group{views=Views}=Group) ->
> + Views2 = [View#view{btree=nil} || View <- Views],
> + Group#group{db=nil,fd=nil,query_server=nil,current_seq=0,
> + id_btree=nil,views=Views2}.
> +
> start_update_loop(RootDir, DbName, GroupId) ->
> % wait for a notify request before doing anything. This way, we
> can just
> % exit and any exits will be noticed by the callers.
> start_update_loop(RootDir, DbName, GroupId,
> get_notify_pids(1000)).
>
> start_update_loop(RootDir, DbName, GroupId, NotifyPids) ->
> - {Db, DefLang, Defs} =
> - case couch_server:open(DbName) of
> + {Db, DbGroup} =
> + case (catch couch_server:open(DbName)) of
> {ok, Db0} ->
> - case couch_db:open_doc(Db0, GroupId) of
> + case (catch couch_db:open_doc(Db0, GroupId)) of
> {ok, Doc} ->
> - case couch_doc:get_view_functions(Doc) of
> - none ->
> - delete_index_file(RootDir, DbName, GroupId),
> - exit({not_found, no_views_found});
> - {DefLang0, Defs0} ->
> - {Db0, DefLang0, Defs0}
> - end;
> + {Db0, design_doc_to_view_group(Doc)};
> Else ->
> delete_index_file(RootDir, DbName, GroupId),
> exit(Else)
> @@ -268,26 +367,48 @@
> delete_index_file(RootDir, DbName, GroupId),
> exit(Else)
> end,
> - Group = open_index_file(RootDir, DbName, GroupId, DefLang, Defs),
> + FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view",
> + Group =
> + case couch_file:open(FileName) of
> + {ok, Fd} ->
> + case couch_file:read_header(Fd, <<$r, $c, $k, 0>>) of
> + {ok, ExistingDiskGroup} ->
> + % validate all the view definitions in the index are
> correct.
> + case reset_group(ExistingDiskGroup) ==
> reset_group(DbGroup) of
> + true -> disk_group_to_mem(Db, Fd, ExistingDiskGroup);
> + false -> reset_file(Db, Fd, DbName, DbGroup)
> + end;
> + _ ->
> + reset_file(Db, Fd, DbName, DbGroup)
> + end;
> + {error, enoent} ->
> + case couch_file:open(FileName, [create]) of
> + {ok, Fd} -> reset_file(Db, Fd, DbName, DbGroup);
> + Error -> throw(Error)
> + end
> + end,
>
> - try update_loop(Group#group{db=Db}, NotifyPids) of
> - _ -> ok
> + update_loop(RootDir, DbName, GroupId, Group, NotifyPids).
> +
> +reset_file(Db, Fd, DbName, #group{name=Name} = DiskReadyGroup) ->
> + ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name,
> DbName]),
> + ok = couch_file:truncate(Fd, 0),
> + ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>,
> DiskReadyGroup),
> + disk_group_to_mem(Db, Fd, DiskReadyGroup).
> +
> +update_loop(RootDir, DbName, GroupId, #group{fd=Fd}=Group,
> NotifyPids) ->
> + try update_group(Group) of
> + {ok, Group2} ->
> + ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>,
> mem_group_to_disk(Group2)),
> + [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
> + garbage_collect(),
> + update_loop(RootDir, DbName, GroupId, Group2,
> get_notify_pids(100000))
> catch
> restart ->
> couch_file:close(Group#group.fd),
> start_update_loop(RootDir, DbName, GroupId, NotifyPids ++
> get_notify_pids())
> end.
>
> -update_loop(#group{fd=Fd}=Group, NotifyPids) ->
> - {ok, Group2} = update_group(Group),
> - ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>,
> mem_group_to_disk(Group2)),
> - [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
> - garbage_collect(),
> - update_loop(Group2).
> -
> -update_loop(Group) ->
> - update_loop(Group, get_notify_pids(100000)).
> -
> % wait for the first request to come in.
> get_notify_pids(Wait) ->
> receive
> @@ -351,51 +472,29 @@
>
> delete_index_file(RootDir, DbName, GroupId) ->
> file:delete(RootDir ++ "/." ++ DbName ++ GroupId ++ ".view").
> -
> -open_index_file(RootDir, DbName, GroupId, ViewLang, ViewDefs) ->
> - FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view",
> - case couch_file:open(FileName) of
> - {ok, Fd} ->
> - case couch_file:read_header(Fd, <<$r, $c, $k, 0>>) of
> - {ok, #group{views=Views}=Group} ->
> - % validate all the view definitions in the index are
> correct.
> - case same_view_def(Views, ViewDefs) of
> - true -> disk_group_to_mem(Fd, Group);
> - false -> reset_header(GroupId, Fd, ViewLang, ViewDefs)
> - end;
> - _ ->
> - reset_header(GroupId, Fd, ViewLang, ViewDefs)
> - end;
> - _ ->
> - case couch_file:open(FileName, [create]) of
> - {ok, Fd} ->
> - reset_header(GroupId, Fd, ViewLang, ViewDefs);
> - Error ->
> - throw(Error)
> - end
> - end.
> -
> -same_view_def([], []) ->
> - true;
> -same_view_def(DiskViews, ViewDefs) when DiskViews == [] orelse
> ViewDefs == []->
> - false;
> -same_view_def([#view{name=DiskName,def=DiskDef}|RestViews], [{Name,
> Def}|RestDefs]) ->
> - if DiskName == Name andalso DiskDef == Def ->
> - same_view_def(RestViews, RestDefs);
> - true ->
> - false
> - end.
>
> % Given a disk ready group structure, return an initialized, in-
> memory version.
> -disk_group_to_mem(Fd, #group{id_btree=IdState,views=Views}=Group) ->
> +disk_group_to_mem(Db, Fd,
> #group{id_btree=IdState,def_lang=Lang,views=Views}=Group) ->
> {ok, IdBtree} = couch_btree:open(IdState, Fd),
> Views2 = lists:map(
> - fun(#view{btree=BtreeState}=View) ->
> - {ok, Btree} = couch_btree:open(BtreeState, Fd, [{less,
> fun less_json/2}]),
> + fun(#view{btree=BtreeState,reduce_funs=RedFuns}=View) ->
> + FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns],
> + ReduceFun =
> + fun(reduce, KVs) ->
> + {ok, Reduced} =
> couch_query_servers:reduce(Lang, FunSrcs, KVs),
> + {length(KVs), Reduced};
> + (combine, Reds) ->
> + Count = lists:sum([Count0 || {Count0, _} <-
> Reds]),
> + UserReds = [UserRedsList || {_, UserRedsList}
> <- Reds],
> + {ok, Reduced} =
> couch_query_servers:combine(Lang, FunSrcs, UserReds),
> + {Count, Reduced}
> + end,
> + {ok, Btree} = couch_btree:open(BtreeState, Fd,
> + [{less, fun less_json/2},{reduce,
> ReduceFun}]),
> View#view{btree=Btree}
> end,
> Views),
> - Group#group{fd=Fd, id_btree=IdBtree, views=Views2}.
> + Group#group{db=Db, fd=Fd, id_btree=IdBtree, views=Views2}.
>
> % Given an initialized, in-memory group structure, return a disk
> ready version.
> mem_group_to_disk(#group{id_btree=IdBtree,views=Views}=Group) ->
> @@ -405,23 +504,7 @@
> View#view{btree=State}
> end,
> Views),
> - Group#group{fd=nil, id_btree=couch_btree:get_state(IdBtree),
> views=Views2}.
> -
> -reset_header(GroupId, Fd, DefLanguage, NamedViews) ->
> - couch_file:truncate(Fd, 0),
> - {Views, _N} = lists:mapfoldl(
> - fun({Name, Definiton}, N) ->
> - {#view{name=Name, id_num=N, btree=nil, def=Definiton}, N
> +1}
> - end,
> - 0, NamedViews),
> - Group = #group{name=GroupId,
> - fd=Fd,
> - views=Views,
> - current_seq=0,
> - def_lang=DefLanguage,
> - id_btree=nil},
> - ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, Group),
> - disk_group_to_mem(Fd, Group).
> + Group#group{db=nil, fd=nil,
> id_btree=couch_btree:get_state(IdBtree), views=Views2}.
>
>
>
> @@ -506,17 +589,12 @@
> % anything in the definition changed.
> case couch_db:open_doc(Db, DocInfo) of
> {ok, Doc} ->
> - case couch_doc:get_view_functions(Doc) of
> - none ->
> - throw(restart);
> - {DefLang, NewDefs} ->
> - case Group#group.def_lang == DefLang andalso
> same_view_def(Group#group.views, NewDefs) of
> - true ->
> - % nothing changed, keeping on computing
> - {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys,
> Seq}};
> - false ->
> - throw(restart)
> - end
> + case design_doc_to_view_group(Doc) ==
> reset_group(Group) of
> + true ->
> + % nothing changed, keeping on computing
> + {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}};
> + false ->
> + throw(restart)
> end;
> {not_found, deleted} ->
> throw(restart)
>
>
>
|