couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject [26/50] [abbrv] couch commit: updated refs/heads/import-rcouch to e2dbc79
Date Thu, 06 Feb 2014 17:40:08 GMT
inital move to rebar compilation

- move src/apps
- download dependencies using rebar
- replace ejson by jiffy
- replace couch_drv & couch_ejson_compare by couch_collate


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/a6816bff
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/a6816bff
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/a6816bff

Branch: refs/heads/import-rcouch
Commit: a6816bffb2c827d1c250a119540d626d3e925f94
Parents: ae8612b
Author: benoitc <benoitc@apache.org>
Authored: Mon Jan 6 21:12:45 2014 +0100
Committer: Paul J. Davis <paul.joseph.davis@gmail.com>
Committed: Thu Feb 6 11:38:28 2014 -0600

----------------------------------------------------------------------
 include/couch_db.hrl                 |  286 ++++++
 src/Makefile.am                      |  198 +++++
 src/couch.app.src                    |   23 +
 src/couch.erl                        |   58 ++
 src/couch_app.erl                    |   36 +
 src/couch_auth_cache.erl             |  425 +++++++++
 src/couch_btree.erl                  |  714 +++++++++++++++
 src/couch_changes.erl                |  577 ++++++++++++
 src/couch_compaction_daemon.erl      |  504 +++++++++++
 src/couch_compress.erl               |   84 ++
 src/couch_config.erl                 |  251 ++++++
 src/couch_config_writer.erl          |   88 ++
 src/couch_db.erl                     | 1358 +++++++++++++++++++++++++++++
 src/couch_db_update_notifier.erl     |   82 ++
 src/couch_db_update_notifier_sup.erl |   61 ++
 src/couch_db_updater.erl             | 1035 ++++++++++++++++++++++
 src/couch_doc.erl                    |  650 ++++++++++++++
 src/couch_ejson_compare.erl          |   81 ++
 src/couch_event_sup.erl              |   73 ++
 src/couch_external_manager.erl       |  101 +++
 src/couch_external_server.erl        |   70 ++
 src/couch_file.erl                   |  532 +++++++++++
 src/couch_httpd.erl                  | 1114 +++++++++++++++++++++++
 src/couch_httpd_auth.erl             |  380 ++++++++
 src/couch_httpd_cors.erl             |  351 ++++++++
 src/couch_httpd_db.erl               | 1226 ++++++++++++++++++++++++++
 src/couch_httpd_external.erl         |  177 ++++
 src/couch_httpd_misc_handlers.erl    |  318 +++++++
 src/couch_httpd_oauth.erl            |  387 ++++++++
 src/couch_httpd_proxy.erl            |  426 +++++++++
 src/couch_httpd_rewrite.erl          |  484 ++++++++++
 src/couch_httpd_stats_handlers.erl   |   56 ++
 src/couch_httpd_vhost.erl            |  383 ++++++++
 src/couch_js_functions.hrl           |  170 ++++
 src/couch_key_tree.erl               |  422 +++++++++
 src/couch_log.erl                    |  254 ++++++
 src/couch_native_process.erl         |  409 +++++++++
 src/couch_os_daemons.erl             |  374 ++++++++
 src/couch_os_process.erl             |  216 +++++
 src/couch_passwords.erl              |  119 +++
 src/couch_primary_sup.erl            |   66 ++
 src/couch_query_servers.erl          |  616 +++++++++++++
 src/couch_ref_counter.erl            |  111 +++
 src/couch_secondary_sup.erl          |   49 ++
 src/couch_server.erl                 |  499 +++++++++++
 src/couch_server_sup.erl             |  164 ++++
 src/couch_stats_aggregator.erl       |  297 +++++++
 src/couch_stats_collector.erl        |  136 +++
 src/couch_stream.erl                 |  299 +++++++
 src/couch_task_status.erl            |  151 ++++
 src/couch_users_db.erl               |  121 +++
 src/couch_util.erl                   |  487 +++++++++++
 src/couch_uuids.erl                  |  103 +++
 src/couch_work_queue.erl             |  187 ++++
 src/json_stream_parse.erl            |  432 +++++++++
 55 files changed, 18271 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/include/couch_db.hrl
----------------------------------------------------------------------
diff --git a/include/couch_db.hrl b/include/couch_db.hrl
new file mode 100644
index 0000000..e0a1c82
--- /dev/null
+++ b/include/couch_db.hrl
@@ -0,0 +1,286 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-define(LOCAL_DOC_PREFIX, "_local/").
+-define(DESIGN_DOC_PREFIX0, "_design").
+-define(DESIGN_DOC_PREFIX, "_design/").
+-define(DEFAULT_COMPRESSION, snappy).
+
+-define(MIN_STR, <<"">>).
+-define(MAX_STR, <<255>>). % illegal utf string
+
+% the lowest possible database sequence number
+-define(LOWEST_SEQ, 0).
+
+-define(REWRITE_COUNT, couch_rewrite_count).
+
+-define(JSON_ENCODE(V), jiffy:encode(V, [uescape])).
+-define(JSON_DECODE(V), couch_util:json_decode(V)).
+
+-define(b2l(V), binary_to_list(V)).
+-define(l2b(V), list_to_binary(V)).
+-define(term_to_bin(T), term_to_binary(T, [{minor_version, 1}])).
+-define(term_size(T),
+    try
+        erlang:external_size(T)
+    catch _:_ ->
+        byte_size(?term_to_bin(T))
+    end).
+
+-define(DEFAULT_ATTACHMENT_CONTENT_TYPE, <<"application/octet-stream">>).
+
+-define(LOG_DEBUG(Format, Args),
+    case couch_log:debug_on(?MODULE) of
+        true ->
+            couch_log:debug(Format, Args);
+        false -> ok
+    end).
+
+-define(LOG_INFO(Format, Args),
+    case couch_log:info_on(?MODULE) of
+        true ->
+            couch_log:info(Format, Args);
+        false -> ok
+    end).
+
+-define(LOG_WARN(Format, Args),
+    case couch_log:warn_on(?MODULE) of
+        true ->
+            couch_log:warn(Format, Args);
+        false -> ok
+    end).
+
+-define(LOG_ERROR(Format, Args), couch_log:error(Format, Args)).
+
+% Tree::term() is really a tree(), but we don't want to require R13B04 yet
+-type branch() :: {Key::term(), Value::term(), Tree::term()}.
+-type path() :: {Start::pos_integer(), branch()}.
+-type tree() :: [branch()]. % sorted by key
+
+-record(rev_info,
+    {
+    rev,
+    seq = 0,
+    deleted = false,
+    body_sp = nil % stream pointer
+    }).
+
+-record(doc_info,
+    {
+    id = <<"">>,
+    high_seq = 0,
+    revs = [] % rev_info
+    }).
+
+-record(full_doc_info,
+    {id = <<"">>,
+    update_seq = 0,
+    deleted = false,
+    rev_tree = [],
+    leafs_size = 0
+    }).
+
+-record(httpd,
+    {mochi_req,
+    peer,
+    method,
+    requested_path_parts,
+    path_parts,
+    db_url_handlers,
+    user_ctx,
+    req_body = undefined,
+    design_url_handlers,
+    auth,
+    default_fun,
+    url_handlers
+    }).
+
+
+-record(doc,
+    {
+    id = <<"">>,
+    revs = {0, []},
+
+    % the json body object.
+    body = {[]},
+
+    atts = [], % attachments
+
+    deleted = false,
+
+    % key/value tuple of meta information, provided when using special options:
+    % couch_db:open_doc(Db, Id, Options).
+    meta = []
+    }).
+
+
+-record(att,
+    {
+    name,
+    type,
+    att_len,
+    disk_len, % length of the attachment in its identity form
+              % (that is, without a content encoding applied to it)
+              % differs from att_len when encoding /= identity
+    md5= <<>>,
+    revpos=0,
+    data,
+    encoding=identity % currently supported values are:
+                      %     identity, gzip
+                      % additional values to support in the future:
+                      %     deflate, compress
+    }).
+
+
+-record(user_ctx,
+    {
+    name=null,
+    roles=[],
+    handler
+    }).
+
+% This should be updated anytime a header change happens that requires more
+% than filling in new defaults.
+%
+% As long the changes are limited to new header fields (with inline
+% defaults) added to the end of the record, then there is no need to increment
+% the disk revision number.
+%
+% if the disk revision is incremented, then new upgrade logic will need to be
+% added to couch_db_updater:init_db.
+
+-define(LATEST_DISK_VERSION, 6).
+
+-record(db_header,
+    {disk_version = ?LATEST_DISK_VERSION,
+     update_seq = 0,
+     unused = 0,
+     fulldocinfo_by_id_btree_state = nil,
+     docinfo_by_seq_btree_state = nil,
+     local_docs_btree_state = nil,
+     purge_seq = 0,
+     purged_docs = nil,
+     security_ptr = nil,
+     revs_limit = 1000
+    }).
+
+-record(db,
+    {main_pid = nil,
+    update_pid = nil,
+    compactor_pid = nil,
+    instance_start_time, % number of microsecs since jan 1 1970 as a binary string
+    fd,
+    updater_fd,
+    fd_ref_counter,
+    header = #db_header{},
+    committed_update_seq,
+    fulldocinfo_by_id_btree,
+    docinfo_by_seq_btree,
+    local_docs_btree,
+    update_seq,
+    name,
+    filepath,
+    validate_doc_funs = [],
+    security = [],
+    security_ptr = nil,
+    user_ctx = #user_ctx{},
+    waiting_delayed_commit = nil,
+    revs_limit = 1000,
+    fsync_options = [],
+    options = [],
+    compression,
+    before_doc_update = nil, % nil | fun(Doc, Db) -> NewDoc
+    after_doc_read = nil     % nil | fun(Doc, Db) -> NewDoc
+    }).
+
+
+-record(view_query_args, {
+    start_key,
+    end_key,
+    start_docid = ?MIN_STR,
+    end_docid = ?MAX_STR,
+
+    direction = fwd,
+    inclusive_end=true, % aka a closed-interval
+
+    limit = 10000000000, % Huge number to simplify logic
+    skip = 0,
+
+    group_level = 0,
+
+    view_type = nil,
+    include_docs = false,
+    doc_options = [],
+    conflicts = false,
+    stale = false,
+    multi_get = false,
+    callback = nil,
+    list = nil
+}).
+
+-record(view_fold_helper_funs, {
+    reduce_count,
+    passed_end,
+    start_response,
+    send_row
+}).
+
+-record(reduce_fold_helper_funs, {
+    start_response,
+    send_row
+}).
+
+-record(extern_resp_args, {
+    code = 200,
+    stop = false,
+    data = <<>>,
+    ctype = "application/json",
+    headers = [],
+    json = nil
+}).
+
+-record(index_header,
+    {seq=0,
+    purge_seq=0,
+    id_btree_state=nil,
+    view_states=nil
+    }).
+
+% small value used in revision trees to indicate the revision isn't stored
+-define(REV_MISSING, []).
+
+-record(changes_args, {
+    feed = "normal",
+    dir = fwd,
+    since = 0,
+    limit = 1000000000000000,
+    style = main_only,
+    heartbeat,
+    timeout,
+    filter = "",
+    filter_fun,
+    filter_args = [],
+    include_docs = false,
+    doc_options = [],
+    conflicts = false,
+    db_open_options = []
+}).
+
+-record(btree, {
+    fd,
+    root,
+    extract_kv = fun({_Key, _Value} = KV) -> KV end,
+    assemble_kv = fun(Key, Value) -> {Key, Value} end,
+    less = fun(A, B) -> A < B end,
+    reduce = nil,
+    compression = ?DEFAULT_COMPRESSION
+}).

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
new file mode 100644
index 0000000..9fe19bc
--- /dev/null
+++ b/src/Makefile.am
@@ -0,0 +1,198 @@
+## Licensed under the Apache License, Version 2.0 (the "License"); you may not
+## use this file except in compliance with the License. You may obtain a copy of
+## the License at
+##
+##   http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+## WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+## License for the specific language governing permissions and limitations under
+## the License.
+
+SUBDIRS = priv
+
+# devdocdir = $(localdocdir)/developer/couchdb
+couchlibdir = $(localerlanglibdir)/couch-$(version)
+couchincludedir = $(couchlibdir)/include
+couchebindir = $(couchlibdir)/ebin
+
+couchinclude_DATA = couch_db.hrl couch_js_functions.hrl
+couchebin_DATA = $(compiled_files)
+
+# dist_devdoc_DATA = $(doc_base) $(doc_modules)
+
+CLEANFILES = $(compiled_files) $(doc_base)
+
+# CLEANFILES = $(doc_modules) edoc-info
+
+source_files = \
+    couch.erl \
+    couch_app.erl \
+    couch_auth_cache.erl \
+    couch_btree.erl \
+    couch_changes.erl \
+    couch_compaction_daemon.erl \
+    couch_compress.erl \
+    couch_config.erl \
+    couch_config_writer.erl \
+    couch_db.erl \
+    couch_db_update_notifier.erl \
+    couch_db_update_notifier_sup.erl \
+    couch_doc.erl \
+    couch_drv.erl \
+    couch_ejson_compare.erl \
+    couch_event_sup.erl \
+    couch_external_manager.erl \
+    couch_external_server.erl \
+    couch_file.erl \
+    couch_httpd.erl \
+    couch_httpd_db.erl \
+    couch_httpd_auth.erl \
+    couch_httpd_cors.erl \
+    couch_httpd_oauth.erl \
+    couch_httpd_external.erl \
+    couch_httpd_misc_handlers.erl \
+    couch_httpd_proxy.erl \
+    couch_httpd_rewrite.erl \
+    couch_httpd_stats_handlers.erl \
+    couch_httpd_vhost.erl \
+    couch_key_tree.erl \
+    couch_log.erl \
+    couch_native_process.erl \
+    couch_os_daemons.erl \
+    couch_os_process.erl \
+    couch_passwords.erl \
+    couch_primary_sup.erl \
+    couch_query_servers.erl \
+    couch_ref_counter.erl \
+    couch_secondary_sup.erl \
+    couch_server.erl \
+    couch_server_sup.erl \
+    couch_stats_aggregator.erl \
+    couch_stats_collector.erl \
+    couch_stream.erl \
+    couch_task_status.erl \
+    couch_users_db.erl \
+    couch_util.erl \
+    couch_uuids.erl \
+    couch_db_updater.erl \
+    couch_work_queue.erl \
+    json_stream_parse.erl
+
+EXTRA_DIST = $(source_files) couch_db.hrl couch_js_functions.hrl
+
+compiled_files = \
+    couch.app \
+    couch.beam \
+    couch_app.beam \
+    couch_auth_cache.beam \
+    couch_btree.beam \
+    couch_changes.beam \
+    couch_compaction_daemon.beam \
+    couch_compress.beam \
+    couch_config.beam \
+    couch_config_writer.beam \
+    couch_db.beam \
+    couch_db_update_notifier.beam \
+    couch_db_update_notifier_sup.beam \
+    couch_doc.beam \
+    couch_drv.beam \
+    couch_ejson_compare.beam \
+    couch_event_sup.beam \
+    couch_external_manager.beam \
+    couch_external_server.beam \
+    couch_file.beam \
+    couch_httpd.beam \
+    couch_httpd_db.beam \
+    couch_httpd_auth.beam \
+    couch_httpd_oauth.beam \
+    couch_httpd_cors.beam \
+    couch_httpd_proxy.beam \
+    couch_httpd_external.beam \
+    couch_httpd_misc_handlers.beam \
+    couch_httpd_rewrite.beam \
+    couch_httpd_stats_handlers.beam \
+    couch_httpd_vhost.beam \
+    couch_key_tree.beam \
+    couch_log.beam \
+    couch_native_process.beam \
+    couch_os_daemons.beam \
+    couch_os_process.beam \
+    couch_passwords.beam \
+    couch_primary_sup.beam \
+    couch_query_servers.beam \
+    couch_ref_counter.beam \
+    couch_secondary_sup.beam \
+    couch_server.beam \
+    couch_server_sup.beam \
+    couch_stats_aggregator.beam \
+    couch_stats_collector.beam \
+    couch_stream.beam \
+    couch_task_status.beam \
+    couch_users_db.beam \
+    couch_util.beam \
+    couch_uuids.beam \
+    couch_db_updater.beam \
+    couch_work_queue.beam \
+    json_stream_parse.beam
+
+# doc_base = \
+#     erlang.png \
+#     index.html \
+#     modules-frame.html \
+#     overview-summary.html \
+#     packages-frame.html \
+#     stylesheet.css
+
+# doc_modules = \
+#     couch_btree.html \
+#     couch_config.html \
+#     couch_config_writer.html \
+#     couch_db.html \
+#     couch_db_update_notifier.html \
+#     couch_db_update_notifier_sup.html \
+#     couch_doc.html \
+#     couch_event_sup.html \
+#     couch_file.html \
+#     couch_httpd.html \
+#     couch_key_tree.html \
+#     couch_log.html \
+#     couch_query_servers.html \
+#     couch_rep.html \
+#     couch_rep_sup.html \
+#     couch_server.html \
+#     couch_server_sup.html \
+#     couch_stream.html \
+#     couch_util.html
+
+if WINDOWS
+couch.app: couch.app.tpl
+	modules=`find . -name "*.erl" \! -name ".*" -exec basename {} .erl \; | tr '\n' ',' | sed "s/,$$//"`; \
+	sed -e "s|%package_name%|@package_name@|g" \
+			-e "s|%version%|@version@|g" \
+			-e "s|@modules@|$$modules|g" \
+			-e "s|%localconfdir%|../etc/couchdb|g" \
+			-e "s|@defaultini@|default.ini|g" \
+			-e "s|@localini@|local.ini|g" > \
+	$@ < $<
+else
+couch.app: couch.app.tpl
+	modules=`{ find . -name "*.erl" \! -name ".*" -exec basename {} .erl \; | tr '\n' ','; echo ''; } | sed "s/,$$//"`; \
+	sed -e "s|%package_name%|@package_name@|g" \
+			-e "s|%version%|@version@|g" \
+			-e "s|@modules@|$$modules|g" \
+			-e "s|%localconfdir%|@localconfdir@|g" \
+			-e "s|@defaultini@|default.ini|g" \
+			-e "s|@localini@|local.ini|g" > \
+	$@ < $<
+	chmod +x $@
+endif
+
+# $(dist_devdoc_DATA): edoc-info
+
+# $(ERL) -noshell -run edoc_run files [\"$<\"]
+
+%.beam: %.erl couch_db.hrl couch_js_functions.hrl
+	$(ERLC) $(ERLC_FLAGS) ${TEST} $<;
+

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch.app.src
----------------------------------------------------------------------
diff --git a/src/couch.app.src b/src/couch.app.src
new file mode 100644
index 0000000..2d14148
--- /dev/null
+++ b/src/couch.app.src
@@ -0,0 +1,23 @@
+{application, couch, [
+    {description, "@package_name@"},
+    {vsn, "@version@"},
+    {registered, [
+        couch_config,
+        couch_db_update,
+        couch_db_update_notifier_sup,
+        couch_external_manager,
+        couch_httpd,
+        couch_log,
+        couch_primary_services,
+        couch_query_servers,
+        couch_secondary_services,
+        couch_server,
+        couch_server_sup,
+        couch_stats_aggregator,
+        couch_stats_collector,
+        couch_task_status
+    ]},
+    {mod, {couch_app, []}},
+    {applications, [kernel, stdlib, crypto, sasl, public_key, ssl,
+                    inets, oauth, ibrowse, mochiweb, os_mon]}
+]}.

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch.erl
----------------------------------------------------------------------
diff --git a/src/couch.erl b/src/couch.erl
new file mode 100644
index 0000000..80e3261
--- /dev/null
+++ b/src/couch.erl
@@ -0,0 +1,58 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch).
+
+-export([get_app_env/2,
+         version/0,
+         start/0,
+         stop/0,
+         restart/0,
+         reload/0]).
+
+get_app_env(Env, Default) ->
+    case application:get_env(couch, Env) of
+        {ok, Val} -> Val;
+        undefined -> Default
+    end.
+
+version() ->
+    case application:get_key(couch, vsn) of
+        {ok, FullVersion} ->
+            hd(string:tokens(FullVersion, "-"));
+        _ ->
+            "0.0.0"
+    end.
+
+start() ->
+    application:start(couch).
+
+stop() ->
+    application:stop(couch).
+
+restart() ->
+    case stop() of
+    ok ->
+        start();
+    {error, {not_started,couch}} ->
+        start();
+    {error, Reason} ->
+        {error, Reason}
+    end.
+
+reload() ->
+    case supervisor:terminate_child(couch_server_sup, couch_config) of
+    ok ->
+        supervisor:restart_child(couch_server_sup, couch_config);
+    {error, Reason} ->
+        {error, Reason}
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch_app.erl
----------------------------------------------------------------------
diff --git a/src/couch_app.erl b/src/couch_app.erl
new file mode 100644
index 0000000..a8d215e
--- /dev/null
+++ b/src/couch_app.erl
@@ -0,0 +1,36 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_app).
+
+-behaviour(application).
+
+-include("couch_db.hrl").
+
+-export([start/2, stop/1]).
+
+-define(CONF_FILES, ["couch.ini", "couch_httpd.ini", "local.ini"]).
+
+start(_Type, _Args) ->
+    couch_util:start_app_deps(couch),
+    IniFiles = get_ini_files(),
+    couch_server_sup:start_link(IniFiles).
+
+stop(_) ->
+    ok.
+
+get_ini_files() ->
+    DefaultConfDir =  filename:join([code:root_dir(), "./etc"]),
+    Defaults = lists:map(fun(FName) ->
+                    filename:join(DefaultConfDir, FName)
+            end, ?CONF_FILES),
+    couch:get_app_env(config_files, Defaults).

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch_auth_cache.erl
----------------------------------------------------------------------
diff --git a/src/couch_auth_cache.erl b/src/couch_auth_cache.erl
new file mode 100644
index 0000000..42ccd44
--- /dev/null
+++ b/src/couch_auth_cache.erl
@@ -0,0 +1,425 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_auth_cache).
+-behaviour(gen_server).
+
+% public API
+-export([get_user_creds/1]).
+
+% gen_server API
+-export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
+-export([code_change/3, terminate/2]).
+
+-include("couch_db.hrl").
+-include("couch_js_functions.hrl").
+
+-define(STATE, auth_state_ets).
+-define(BY_USER, auth_by_user_ets).
+-define(BY_ATIME, auth_by_atime_ets).
+
+-record(state, {
+    max_cache_size = 0,
+    cache_size = 0,
+    db_notifier = nil,
+    db_mon_ref = nil
+}).
+
+
+-spec get_user_creds(UserName::string() | binary()) ->
+    Credentials::list() | nil.
+
+get_user_creds(UserName) when is_list(UserName) ->
+    get_user_creds(?l2b(UserName));
+
+get_user_creds(UserName) ->
+    UserCreds = case couch_config:get("admins", ?b2l(UserName)) of
+    "-hashed-" ++ HashedPwdAndSalt ->
+        % the name is an admin, now check to see if there is a user doc
+        % which has a matching name, salt, and password_sha
+        [HashedPwd, Salt] = string:tokens(HashedPwdAndSalt, ","),
+        case get_from_cache(UserName) of
+        nil ->
+            make_admin_doc(HashedPwd, Salt, []);
+        UserProps when is_list(UserProps) ->
+            make_admin_doc(HashedPwd, Salt, couch_util:get_value(<<"roles">>, UserProps))
+        end;
+    "-pbkdf2-" ++ HashedPwdSaltAndIterations ->
+        [HashedPwd, Salt, Iterations] = string:tokens(HashedPwdSaltAndIterations, ","),
+        case get_from_cache(UserName) of
+        nil ->
+            make_admin_doc(HashedPwd, Salt, Iterations, []);
+        UserProps when is_list(UserProps) ->
+            make_admin_doc(HashedPwd, Salt, Iterations, couch_util:get_value(<<"roles">>, UserProps))
+    end;
+    _Else ->
+        get_from_cache(UserName)
+    end,
+    validate_user_creds(UserCreds).
+
+make_admin_doc(HashedPwd, Salt, ExtraRoles) ->
+    [{<<"roles">>, [<<"_admin">>|ExtraRoles]},
+     {<<"salt">>, ?l2b(Salt)},
+     {<<"password_scheme">>, <<"simple">>},
+     {<<"password_sha">>, ?l2b(HashedPwd)}].
+
+make_admin_doc(DerivedKey, Salt, Iterations, ExtraRoles) ->
+    [{<<"roles">>, [<<"_admin">>|ExtraRoles]},
+     {<<"salt">>, ?l2b(Salt)},
+     {<<"iterations">>, list_to_integer(Iterations)},
+     {<<"password_scheme">>, <<"pbkdf2">>},
+     {<<"derived_key">>, ?l2b(DerivedKey)}].
+
+get_from_cache(UserName) ->
+    exec_if_auth_db(
+        fun(_AuthDb) ->
+            maybe_refresh_cache(),
+            case ets:lookup(?BY_USER, UserName) of
+            [] ->
+                gen_server:call(?MODULE, {fetch, UserName}, infinity);
+            [{UserName, {Credentials, _ATime}}] ->
+                couch_stats_collector:increment({couchdb, auth_cache_hits}),
+                gen_server:cast(?MODULE, {cache_hit, UserName}),
+                Credentials
+            end
+        end,
+        nil
+    ).
+
+
+validate_user_creds(nil) ->
+    nil;
+validate_user_creds(UserCreds) ->
+    case couch_util:get_value(<<"_conflicts">>, UserCreds) of
+    undefined ->
+        ok;
+    _ConflictList ->
+        throw({unauthorized,
+            <<"User document conflicts must be resolved before the document",
+              " is used for authentication purposes.">>
+        })
+    end,
+    UserCreds.
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init(_) ->
+    ?STATE = ets:new(?STATE, [set, protected, named_table]),
+    ?BY_USER = ets:new(?BY_USER, [set, protected, named_table]),
+    ?BY_ATIME = ets:new(?BY_ATIME, [ordered_set, private, named_table]),
+    process_flag(trap_exit, true),
+    ok = couch_config:register(
+        fun("couch_httpd_auth", "auth_cache_size", SizeList) ->
+            Size = list_to_integer(SizeList),
+            ok = gen_server:call(?MODULE, {new_max_cache_size, Size}, infinity);
+        ("couch_httpd_auth", "authentication_db", _DbName) ->
+            ok = gen_server:call(?MODULE, reinit_cache, infinity)
+        end
+    ),
+    {ok, Notifier} = couch_db_update_notifier:start_link(fun handle_db_event/1),
+    State = #state{
+        db_notifier = Notifier,
+        max_cache_size = list_to_integer(
+            couch_config:get("couch_httpd_auth", "auth_cache_size", "50")
+        )
+    },
+    {ok, reinit_cache(State)}.
+
+
+handle_db_event({Event, DbName}) ->
+    [{auth_db_name, AuthDbName}] = ets:lookup(?STATE, auth_db_name),
+    case DbName =:= AuthDbName of
+    true ->
+        case Event of
+        created -> gen_server:call(?MODULE, reinit_cache, infinity);
+        compacted -> gen_server:call(?MODULE, auth_db_compacted, infinity);
+        _Else   -> ok
+        end;
+    false ->
+        ok
+    end.
+
+
+handle_call(reinit_cache, _From, State) ->
+    catch erlang:demonitor(State#state.db_mon_ref, [flush]),
+    exec_if_auth_db(fun(AuthDb) -> catch couch_db:close(AuthDb) end),
+    {reply, ok, reinit_cache(State)};
+
+handle_call(auth_db_compacted, _From, State) ->
+    exec_if_auth_db(
+        fun(AuthDb) ->
+            true = ets:insert(?STATE, {auth_db, reopen_auth_db(AuthDb)})
+        end
+    ),
+    {reply, ok, State};
+
+handle_call({new_max_cache_size, NewSize},
+        _From, #state{cache_size = Size} = State) when NewSize >= Size ->
+    {reply, ok, State#state{max_cache_size = NewSize}};
+
+handle_call({new_max_cache_size, NewSize}, _From, State) ->
+    free_mru_cache_entries(State#state.cache_size - NewSize),
+    {reply, ok, State#state{max_cache_size = NewSize, cache_size = NewSize}};
+
+handle_call({fetch, UserName}, _From, State) ->
+    {Credentials, NewState} = case ets:lookup(?BY_USER, UserName) of
+    [{UserName, {Creds, ATime}}] ->
+        couch_stats_collector:increment({couchdb, auth_cache_hits}),
+        cache_hit(UserName, Creds, ATime),
+        {Creds, State};
+    [] ->
+        couch_stats_collector:increment({couchdb, auth_cache_misses}),
+        Creds = get_user_props_from_db(UserName),
+        State1 = add_cache_entry(UserName, Creds, erlang:now(), State),
+        {Creds, State1}
+    end,
+    {reply, Credentials, NewState};
+
+handle_call(refresh, _From, State) ->
+    exec_if_auth_db(fun refresh_entries/1),
+    {reply, ok, State}.
+
+
+handle_cast({cache_hit, UserName}, State) ->
+    case ets:lookup(?BY_USER, UserName) of
+    [{UserName, {Credentials, ATime}}] ->
+        cache_hit(UserName, Credentials, ATime);
+    _ ->
+        ok
+    end,
+    {noreply, State}.
+
+
+handle_info({'DOWN', Ref, _, _, _Reason}, #state{db_mon_ref = Ref} = State) ->
+    {noreply, reinit_cache(State)}.
+
+
+terminate(_Reason, #state{db_notifier = Notifier}) ->
+    couch_db_update_notifier:stop(Notifier),
+    exec_if_auth_db(fun(AuthDb) -> catch couch_db:close(AuthDb) end),
+    true = ets:delete(?BY_USER),
+    true = ets:delete(?BY_ATIME),
+    true = ets:delete(?STATE).
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+clear_cache(State) ->
+    exec_if_auth_db(fun(AuthDb) -> catch couch_db:close(AuthDb) end),
+    true = ets:delete_all_objects(?BY_USER),
+    true = ets:delete_all_objects(?BY_ATIME),
+    State#state{cache_size = 0}.
+
+
+reinit_cache(State) ->
+    NewState = clear_cache(State),
+    AuthDbName = ?l2b(couch_config:get("couch_httpd_auth", "authentication_db")),
+    true = ets:insert(?STATE, {auth_db_name, AuthDbName}),
+    AuthDb = open_auth_db(),
+    true = ets:insert(?STATE, {auth_db, AuthDb}),
+    NewState#state{db_mon_ref = couch_db:monitor(AuthDb)}.
+
+
+add_cache_entry(_, _, _, #state{max_cache_size = 0} = State) ->
+    State;
+add_cache_entry(UserName, Credentials, ATime, State) ->
+    case State#state.cache_size >= State#state.max_cache_size of
+    true ->
+        free_mru_cache_entry();
+    false ->
+        ok
+    end,
+    true = ets:insert(?BY_ATIME, {ATime, UserName}),
+    true = ets:insert(?BY_USER, {UserName, {Credentials, ATime}}),
+    State#state{cache_size = couch_util:get_value(size, ets:info(?BY_USER))}.
+
+free_mru_cache_entries(0) ->
+    ok;
+free_mru_cache_entries(N) when N > 0 ->
+    free_mru_cache_entry(),
+    free_mru_cache_entries(N - 1).
+
+free_mru_cache_entry() ->
+    MruTime = ets:last(?BY_ATIME),
+    [{MruTime, UserName}] = ets:lookup(?BY_ATIME, MruTime),
+    true = ets:delete(?BY_ATIME, MruTime),
+    true = ets:delete(?BY_USER, UserName).
+
+
+cache_hit(UserName, Credentials, ATime) ->
+    NewATime = erlang:now(),
+    true = ets:delete(?BY_ATIME, ATime),
+    true = ets:insert(?BY_ATIME, {NewATime, UserName}),
+    true = ets:insert(?BY_USER, {UserName, {Credentials, NewATime}}).
+
+
+refresh_entries(AuthDb) ->
+    case reopen_auth_db(AuthDb) of
+    nil ->
+        ok;
+    AuthDb2 ->
+        case AuthDb2#db.update_seq > AuthDb#db.update_seq of
+        true ->
+            {ok, _, _} = couch_db:enum_docs_since(
+                AuthDb2,
+                AuthDb#db.update_seq,
+                fun(DocInfo, _, _) -> refresh_entry(AuthDb2, DocInfo) end,
+                AuthDb#db.update_seq,
+                []
+            ),
+            true = ets:insert(?STATE, {auth_db, AuthDb2});
+        false ->
+            ok
+        end
+    end.
+
+
+refresh_entry(Db, #doc_info{high_seq = DocSeq} = DocInfo) ->
+    case is_user_doc(DocInfo) of
+    {true, UserName} ->
+        case ets:lookup(?BY_USER, UserName) of
+        [] ->
+            ok;
+        [{UserName, {_OldCreds, ATime}}] ->
+            {ok, Doc} = couch_db:open_doc(Db, DocInfo, [conflicts, deleted]),
+            NewCreds = user_creds(Doc),
+            true = ets:insert(?BY_USER, {UserName, {NewCreds, ATime}})
+        end;
+    false ->
+        ok
+    end,
+    {ok, DocSeq}.
+
+
+user_creds(#doc{deleted = true}) ->
+    nil;
+user_creds(#doc{} = Doc) ->
+    {Creds} = couch_doc:to_json_obj(Doc, []),
+    Creds.
+
+
+is_user_doc(#doc_info{id = <<"org.couchdb.user:", UserName/binary>>}) ->
+    {true, UserName};
+is_user_doc(_) ->
+    false.
+
+
+maybe_refresh_cache() ->
+    case cache_needs_refresh() of
+    true ->
+        ok = gen_server:call(?MODULE, refresh, infinity);
+    false ->
+        ok
+    end.
+
+
+cache_needs_refresh() ->
+    exec_if_auth_db(
+        fun(AuthDb) ->
+            case reopen_auth_db(AuthDb) of
+            nil ->
+                false;
+            AuthDb2 ->
+                AuthDb2#db.update_seq > AuthDb#db.update_seq
+            end
+        end,
+        false
+    ).
+
+
+reopen_auth_db(AuthDb) ->
+    case (catch couch_db:reopen(AuthDb)) of
+    {ok, AuthDb2} ->
+        AuthDb2;
+    _ ->
+        nil
+    end.
+
+
+exec_if_auth_db(Fun) ->
+    exec_if_auth_db(Fun, ok).
+
+exec_if_auth_db(Fun, DefRes) ->
+    case ets:lookup(?STATE, auth_db) of
+    [{auth_db, #db{} = AuthDb}] ->
+        Fun(AuthDb);
+    _ ->
+        DefRes
+    end.
+
+
+open_auth_db() ->
+    [{auth_db_name, DbName}] = ets:lookup(?STATE, auth_db_name),
+    {ok, AuthDb} = ensure_users_db_exists(DbName, [sys_db]),
+    AuthDb.
+
+
+get_user_props_from_db(UserName) ->
+    exec_if_auth_db(
+        fun(AuthDb) ->
+            Db = reopen_auth_db(AuthDb),
+            DocId = <<"org.couchdb.user:", UserName/binary>>,
+            try
+                {ok, Doc} = couch_db:open_doc(Db, DocId, [conflicts]),
+                {DocProps} = couch_doc:to_json_obj(Doc, []),
+                DocProps
+            catch
+            _:_Error ->
+                nil
+            end
+        end,
+        nil
+    ).
+
+ensure_users_db_exists(DbName, Options) ->
+    Options1 = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}, nologifmissing | Options],
+    case couch_db:open(DbName, Options1) of
+    {ok, Db} ->
+        ensure_auth_ddoc_exists(Db, <<"_design/_auth">>),
+        {ok, Db};
+    _Error ->
+        {ok, Db} = couch_db:create(DbName, Options1),
+        ok = ensure_auth_ddoc_exists(Db, <<"_design/_auth">>),
+        {ok, Db}
+    end.
+
+ensure_auth_ddoc_exists(Db, DDocId) ->
+    case couch_db:open_doc(Db, DDocId) of
+    {not_found, _Reason} ->
+        {ok, AuthDesign} = auth_design_doc(DDocId),
+        {ok, _Rev} = couch_db:update_doc(Db, AuthDesign, []);
+    {ok, Doc} ->
+        {Props} = couch_doc:to_json_obj(Doc, []),
+        case couch_util:get_value(<<"validate_doc_update">>, Props, []) of
+            ?AUTH_DB_DOC_VALIDATE_FUNCTION ->
+                ok;
+            _ ->
+                Props1 = lists:keyreplace(<<"validate_doc_update">>, 1, Props,
+                    {<<"validate_doc_update">>,
+                    ?AUTH_DB_DOC_VALIDATE_FUNCTION}),
+                couch_db:update_doc(Db, couch_doc:from_json_obj({Props1}), [])
+        end
+    end,
+    ok.
+
+auth_design_doc(DocId) ->
+    DocProps = [
+        {<<"_id">>, DocId},
+        {<<"language">>,<<"javascript">>},
+        {<<"validate_doc_update">>, ?AUTH_DB_DOC_VALIDATE_FUNCTION}
+    ],
+    {ok, couch_doc:from_json_obj({DocProps})}.

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch_btree.erl
----------------------------------------------------------------------
diff --git a/src/couch_btree.erl b/src/couch_btree.erl
new file mode 100644
index 0000000..789819e
--- /dev/null
+++ b/src/couch_btree.erl
@@ -0,0 +1,714 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_btree).
+
+-export([open/2, open/3, query_modify/4, add/2, add_remove/3]).
+-export([fold/4, full_reduce/1, final_reduce/2, size/1, foldl/3, foldl/4]).
+-export([fold_reduce/4, lookup/2, get_state/1, set_options/2]).
+-export([less/3]).
+
+-include("couch_db.hrl").
+-define(CHUNK_THRESHOLD, 16#4ff).
+
+extract(#btree{extract_kv=Extract}, Value) ->
+    Extract(Value).
+
+assemble(#btree{assemble_kv=Assemble}, Key, Value) ->
+    Assemble(Key, Value).
+
+less(#btree{less=Less}, A, B) ->
+    Less(A, B).
+
+% pass in 'nil' for State if a new Btree.
+open(State, Fd) ->
+    {ok, #btree{root=State, fd=Fd}}.
+
+set_options(Bt, []) ->
+    Bt;
+set_options(Bt, [{split, Extract}|Rest]) ->
+    set_options(Bt#btree{extract_kv=Extract}, Rest);
+set_options(Bt, [{join, Assemble}|Rest]) ->
+    set_options(Bt#btree{assemble_kv=Assemble}, Rest);
+set_options(Bt, [{less, Less}|Rest]) ->
+    set_options(Bt#btree{less=Less}, Rest);
+set_options(Bt, [{reduce, Reduce}|Rest]) ->
+    set_options(Bt#btree{reduce=Reduce}, Rest);
+set_options(Bt, [{compression, Comp}|Rest]) ->
+    set_options(Bt#btree{compression=Comp}, Rest).
+
+open(State, Fd, Options) ->
+    {ok, set_options(#btree{root=State, fd=Fd}, Options)}.
+
+get_state(#btree{root=Root}) ->
+    Root.
+
+final_reduce(#btree{reduce=Reduce}, Val) ->
+    final_reduce(Reduce, Val);
+final_reduce(Reduce, {[], []}) ->
+    Reduce(reduce, []);
+final_reduce(_Bt, {[], [Red]}) ->
+    Red;
+final_reduce(Reduce, {[], Reductions}) ->
+    Reduce(rereduce, Reductions);
+final_reduce(Reduce, {KVs, Reductions}) ->
+    Red = Reduce(reduce, KVs),
+    final_reduce(Reduce, {[], [Red | Reductions]}).
+
+fold_reduce(#btree{root=Root}=Bt, Fun, Acc, Options) ->
+    Dir = couch_util:get_value(dir, Options, fwd),
+    StartKey = couch_util:get_value(start_key, Options),
+    InEndRangeFun = make_key_in_end_range_function(Bt, Dir, Options),
+    KeyGroupFun = couch_util:get_value(key_group_fun, Options, fun(_,_) -> true end),
+    try
+        {ok, Acc2, GroupedRedsAcc2, GroupedKVsAcc2, GroupedKey2} =
+            reduce_stream_node(Bt, Dir, Root, StartKey, InEndRangeFun, undefined, [], [],
+            KeyGroupFun, Fun, Acc),
+        if GroupedKey2 == undefined ->
+            {ok, Acc2};
+        true ->
+            case Fun(GroupedKey2, {GroupedKVsAcc2, GroupedRedsAcc2}, Acc2) of
+            {ok, Acc3} -> {ok, Acc3};
+            {stop, Acc3} -> {ok, Acc3}
+            end
+        end
+    catch
+        throw:{stop, AccDone} -> {ok, AccDone}
+    end.
+
+full_reduce(#btree{root=nil,reduce=Reduce}) ->
+    {ok, Reduce(reduce, [])};
+full_reduce(#btree{root=Root}) ->
+    {ok, element(2, Root)}.
+
+size(#btree{root = nil}) ->
+    0;
+size(#btree{root = {_P, _Red}}) ->
+    % pre 1.2 format
+    nil;
+size(#btree{root = {_P, _Red, Size}}) ->
+    Size.
+
+% wraps a 2 arity function with the proper 3 arity function
+convert_fun_arity(Fun) when is_function(Fun, 2) ->
+    fun
+        (visit, KV, _Reds, AccIn) -> Fun(KV, AccIn);
+        (traverse, _K, _Red, AccIn) -> {ok, AccIn}
+    end;
+convert_fun_arity(Fun) when is_function(Fun, 3) ->
+    fun
+        (visit, KV, Reds, AccIn) -> Fun(KV, Reds, AccIn);
+        (traverse, _K, _Red, AccIn) -> {ok, AccIn}
+    end;
+convert_fun_arity(Fun) when is_function(Fun, 4) ->
+    Fun.    % Already arity 4
+
+make_key_in_end_range_function(#btree{less=Less}, fwd, Options) ->
+    case couch_util:get_value(end_key_gt, Options) of
+    undefined ->
+        case couch_util:get_value(end_key, Options) of
+        undefined ->
+            fun(_Key) -> true end;
+        LastKey ->
+            fun(Key) -> not Less(LastKey, Key) end
+        end;
+    EndKey ->
+        fun(Key) -> Less(Key, EndKey) end
+    end;
+make_key_in_end_range_function(#btree{less=Less}, rev, Options) ->
+    case couch_util:get_value(end_key_gt, Options) of
+    undefined ->
+        case couch_util:get_value(end_key, Options) of
+        undefined ->
+            fun(_Key) -> true end;
+        LastKey ->
+            fun(Key) -> not Less(Key, LastKey) end
+        end;
+    EndKey ->
+        fun(Key) -> Less(EndKey, Key) end
+    end.
+
+
+foldl(Bt, Fun, Acc) ->
+    fold(Bt, Fun, Acc, []).
+
+foldl(Bt, Fun, Acc, Options) ->
+    fold(Bt, Fun, Acc, Options).
+
+
+fold(#btree{root=nil}, _Fun, Acc, _Options) ->
+    {ok, {[], []}, Acc};
+fold(#btree{root=Root}=Bt, Fun, Acc, Options) ->
+    Dir = couch_util:get_value(dir, Options, fwd),
+    InRange = make_key_in_end_range_function(Bt, Dir, Options),
+    Result =
+    case couch_util:get_value(start_key, Options) of
+    undefined ->
+        stream_node(Bt, [], Bt#btree.root, InRange, Dir,
+                convert_fun_arity(Fun), Acc);
+    StartKey ->
+        stream_node(Bt, [], Bt#btree.root, StartKey, InRange, Dir,
+                convert_fun_arity(Fun), Acc)
+    end,
+    case Result of
+    {ok, Acc2}->
+        FullReduction = element(2, Root),
+        {ok, {[], [FullReduction]}, Acc2};
+    {stop, LastReduction, Acc2} ->
+        {ok, LastReduction, Acc2}
+    end.
+
+add(Bt, InsertKeyValues) ->
+    add_remove(Bt, InsertKeyValues, []).
+
+add_remove(Bt, InsertKeyValues, RemoveKeys) ->
+    {ok, [], Bt2} = query_modify(Bt, [], InsertKeyValues, RemoveKeys),
+    {ok, Bt2}.
+
+query_modify(Bt, LookupKeys, InsertValues, RemoveKeys) ->
+    #btree{root=Root} = Bt,
+    InsertActions = lists:map(
+        fun(KeyValue) ->
+            {Key, Value} = extract(Bt, KeyValue),
+            {insert, Key, Value}
+        end, InsertValues),
+    RemoveActions = [{remove, Key, nil} || Key <- RemoveKeys],
+    FetchActions = [{fetch, Key, nil} || Key <- LookupKeys],
+    SortFun =
+        fun({OpA, A, _}, {OpB, B, _}) ->
+            case A == B of
+            % A and B are equal, sort by op.
+            true -> op_order(OpA) < op_order(OpB);
+            false ->
+                less(Bt, A, B)
+            end
+        end,
+    Actions = lists:sort(SortFun, lists:append([InsertActions, RemoveActions, FetchActions])),
+    {ok, KeyPointers, QueryResults} = modify_node(Bt, Root, Actions, []),
+    {ok, NewRoot} = complete_root(Bt, KeyPointers),
+    {ok, QueryResults, Bt#btree{root=NewRoot}}.
+
+% for ordering different operations with the same key.
+% fetch < remove < insert
+op_order(fetch) -> 1;
+op_order(remove) -> 2;
+op_order(insert) -> 3.
+
+lookup(#btree{root=Root, less=Less}=Bt, Keys) ->
+    SortedKeys = lists:sort(Less, Keys),
+    {ok, SortedResults} = lookup(Bt, Root, SortedKeys),
+    % We want to return the results in the same order as the keys were input
+    % but we may have changed the order when we sorted. So we need to put the
+    % order back into the results.
+    couch_util:reorder_results(Keys, SortedResults).
+
+lookup(_Bt, nil, Keys) ->
+    {ok, [{Key, not_found} || Key <- Keys]};
+lookup(Bt, Node, Keys) ->
+    Pointer = element(1, Node),
+    {NodeType, NodeList} = get_node(Bt, Pointer),
+    case NodeType of
+    kp_node ->
+        lookup_kpnode(Bt, list_to_tuple(NodeList), 1, Keys, []);
+    kv_node ->
+        lookup_kvnode(Bt, list_to_tuple(NodeList), 1, Keys, [])
+    end.
+
+lookup_kpnode(_Bt, _NodeTuple, _LowerBound, [], Output) ->
+    {ok, lists:reverse(Output)};
+lookup_kpnode(_Bt, NodeTuple, LowerBound, Keys, Output) when tuple_size(NodeTuple) < LowerBound ->
+    {ok, lists:reverse(Output, [{Key, not_found} || Key <- Keys])};
+lookup_kpnode(Bt, NodeTuple, LowerBound, [FirstLookupKey | _] = LookupKeys, Output) ->
+    N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), FirstLookupKey),
+    {Key, PointerInfo} = element(N, NodeTuple),
+    SplitFun = fun(LookupKey) -> not less(Bt, Key, LookupKey) end,
+    case lists:splitwith(SplitFun, LookupKeys) of
+    {[], GreaterQueries} ->
+        lookup_kpnode(Bt, NodeTuple, N + 1, GreaterQueries, Output);
+    {LessEqQueries, GreaterQueries} ->
+        {ok, Results} = lookup(Bt, PointerInfo, LessEqQueries),
+        lookup_kpnode(Bt, NodeTuple, N + 1, GreaterQueries, lists:reverse(Results, Output))
+    end.
+
+
+lookup_kvnode(_Bt, _NodeTuple, _LowerBound, [], Output) ->
+    {ok, lists:reverse(Output)};
+lookup_kvnode(_Bt, NodeTuple, LowerBound, Keys, Output) when tuple_size(NodeTuple) < LowerBound ->
+    % keys not found
+    {ok, lists:reverse(Output, [{Key, not_found} || Key <- Keys])};
+lookup_kvnode(Bt, NodeTuple, LowerBound, [LookupKey | RestLookupKeys], Output) ->
+    N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), LookupKey),
+    {Key, Value} = element(N, NodeTuple),
+    case less(Bt, LookupKey, Key) of
+    true ->
+        % LookupKey is less than Key
+        lookup_kvnode(Bt, NodeTuple, N, RestLookupKeys, [{LookupKey, not_found} | Output]);
+    false ->
+        case less(Bt, Key, LookupKey) of
+        true ->
+            % LookupKey is greater than Key
+            lookup_kvnode(Bt, NodeTuple, N+1, RestLookupKeys, [{LookupKey, not_found} | Output]);
+        false ->
+            % LookupKey is equal to Key
+            lookup_kvnode(Bt, NodeTuple, N, RestLookupKeys, [{LookupKey, {ok, assemble(Bt, LookupKey, Value)}} | Output])
+        end
+    end.
+
+
+complete_root(_Bt, []) ->
+    {ok, nil};
+complete_root(_Bt, [{_Key, PointerInfo}])->
+    {ok, PointerInfo};
+complete_root(Bt, KPs) ->
+    {ok, ResultKeyPointers} = write_node(Bt, kp_node, KPs),
+    complete_root(Bt, ResultKeyPointers).
+
+%%%%%%%%%%%%% The chunkify function sucks! %%%%%%%%%%%%%
+% It is inaccurate as it does not account for compression when blocks are
+% written. Plus with the "case byte_size(term_to_binary(InList)) of" code
+% it's probably really inefficient.
+
+chunkify(InList) ->
+    case ?term_size(InList) of
+    Size when Size > ?CHUNK_THRESHOLD ->
+        NumberOfChunksLikely = ((Size div ?CHUNK_THRESHOLD) + 1),
+        ChunkThreshold = Size div NumberOfChunksLikely,
+        chunkify(InList, ChunkThreshold, [], 0, []);
+    _Else ->
+        [InList]
+    end.
+
+chunkify([], _ChunkThreshold, [], 0, OutputChunks) ->
+    lists:reverse(OutputChunks);
+chunkify([], _ChunkThreshold, OutList, _OutListSize, OutputChunks) ->
+    lists:reverse([lists:reverse(OutList) | OutputChunks]);
+chunkify([InElement | RestInList], ChunkThreshold, OutList, OutListSize, OutputChunks) ->
+    case ?term_size(InElement) of
+    Size when (Size + OutListSize) > ChunkThreshold andalso OutList /= [] ->
+        chunkify(RestInList, ChunkThreshold, [], 0, [lists:reverse([InElement | OutList]) | OutputChunks]);
+    Size ->
+        chunkify(RestInList, ChunkThreshold, [InElement | OutList], OutListSize + Size, OutputChunks)
+    end.
+
+modify_node(Bt, RootPointerInfo, Actions, QueryOutput) ->
+    case RootPointerInfo of
+    nil ->
+        NodeType = kv_node,
+        NodeList = [];
+    _Tuple ->
+        Pointer = element(1, RootPointerInfo),
+        {NodeType, NodeList} = get_node(Bt, Pointer)
+    end,
+    NodeTuple = list_to_tuple(NodeList),
+
+    {ok, NewNodeList, QueryOutput2} =
+    case NodeType of
+    kp_node -> modify_kpnode(Bt, NodeTuple, 1, Actions, [], QueryOutput);
+    kv_node -> modify_kvnode(Bt, NodeTuple, 1, Actions, [], QueryOutput)
+    end,
+    case NewNodeList of
+    [] ->  % no nodes remain
+        {ok, [], QueryOutput2};
+    NodeList ->  % nothing changed
+        {LastKey, _LastValue} = element(tuple_size(NodeTuple), NodeTuple),
+        {ok, [{LastKey, RootPointerInfo}], QueryOutput2};
+    _Else2 ->
+        {ok, ResultList} = write_node(Bt, NodeType, NewNodeList),
+        {ok, ResultList, QueryOutput2}
+    end.
+
+reduce_node(#btree{reduce=nil}, _NodeType, _NodeList) ->
+    [];
+reduce_node(#btree{reduce=R}, kp_node, NodeList) ->
+    R(rereduce, [element(2, Node) || {_K, Node} <- NodeList]);
+reduce_node(#btree{reduce=R}=Bt, kv_node, NodeList) ->
+    R(reduce, [assemble(Bt, K, V) || {K, V} <- NodeList]).
+
+reduce_tree_size(kv_node, NodeSize, _KvList) ->
+    NodeSize;
+reduce_tree_size(kp_node, NodeSize, []) ->
+    NodeSize;
+reduce_tree_size(kp_node, _NodeSize, [{_K, {_P, _Red}} | _]) ->
+    % pre 1.2 format
+    nil;
+reduce_tree_size(kp_node, _NodeSize, [{_K, {_P, _Red, nil}} | _]) ->
+    nil;
+reduce_tree_size(kp_node, NodeSize, [{_K, {_P, _Red, Sz}} | NodeList]) ->
+    reduce_tree_size(kp_node, NodeSize + Sz, NodeList).
+
+get_node(#btree{fd = Fd}, NodePos) ->
+    {ok, {NodeType, NodeList}} = couch_file:pread_term(Fd, NodePos),
+    {NodeType, NodeList}.
+
+write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) ->
+    % split up nodes into smaller sizes
+    NodeListList = chunkify(NodeList),
+    % now write out each chunk and return the KeyPointer pairs for those nodes
+    ResultList = [
+        begin
+            {ok, Pointer, Size} = couch_file:append_term(
+                Fd, {NodeType, ANodeList}, [{compression, Comp}]),
+            {LastKey, _} = lists:last(ANodeList),
+            SubTreeSize = reduce_tree_size(NodeType, Size, ANodeList),
+            {LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList), SubTreeSize}}
+        end
+    ||
+        ANodeList <- NodeListList
+    ],
+    {ok, ResultList}.
+
+modify_kpnode(Bt, {}, _LowerBound, Actions, [], QueryOutput) ->
+    modify_node(Bt, nil, Actions, QueryOutput);
+modify_kpnode(_Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput) ->
+    {ok, lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound,
+            tuple_size(NodeTuple), [])), QueryOutput};
+modify_kpnode(Bt, NodeTuple, LowerBound,
+        [{_, FirstActionKey, _}|_]=Actions, ResultNode, QueryOutput) ->
+    Sz = tuple_size(NodeTuple),
+    N = find_first_gteq(Bt, NodeTuple, LowerBound, Sz, FirstActionKey),
+    case N =:= Sz of
+    true  ->
+        % perform remaining actions on last node
+        {_, PointerInfo} = element(Sz, NodeTuple),
+        {ok, ChildKPs, QueryOutput2} =
+            modify_node(Bt, PointerInfo, Actions, QueryOutput),
+        NodeList = lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound,
+            Sz - 1, ChildKPs)),
+        {ok, NodeList, QueryOutput2};
+    false ->
+        {NodeKey, PointerInfo} = element(N, NodeTuple),
+        SplitFun = fun({_ActionType, ActionKey, _ActionValue}) ->
+                not less(Bt, NodeKey, ActionKey)
+            end,
+        {LessEqQueries, GreaterQueries} = lists:splitwith(SplitFun, Actions),
+        {ok, ChildKPs, QueryOutput2} =
+                modify_node(Bt, PointerInfo, LessEqQueries, QueryOutput),
+        ResultNode2 = lists:reverse(ChildKPs, bounded_tuple_to_revlist(NodeTuple,
+                LowerBound, N - 1, ResultNode)),
+        modify_kpnode(Bt, NodeTuple, N+1, GreaterQueries, ResultNode2, QueryOutput2)
+    end.
+
+bounded_tuple_to_revlist(_Tuple, Start, End, Tail) when Start > End ->
+    Tail;
+bounded_tuple_to_revlist(Tuple, Start, End, Tail) ->
+    bounded_tuple_to_revlist(Tuple, Start+1, End, [element(Start, Tuple)|Tail]).
+
+bounded_tuple_to_list(Tuple, Start, End, Tail) ->
+    bounded_tuple_to_list2(Tuple, Start, End, [], Tail).
+
+bounded_tuple_to_list2(_Tuple, Start, End, Acc, Tail) when Start > End ->
+    lists:reverse(Acc, Tail);
+bounded_tuple_to_list2(Tuple, Start, End, Acc, Tail) ->
+    bounded_tuple_to_list2(Tuple, Start + 1, End, [element(Start, Tuple) | Acc], Tail).
+
+find_first_gteq(_Bt, _Tuple, Start, End, _Key) when Start == End ->
+    End;
+find_first_gteq(Bt, Tuple, Start, End, Key) ->
+    Mid = Start + ((End - Start) div 2),
+    {TupleKey, _} = element(Mid, Tuple),
+    case less(Bt, TupleKey, Key) of
+    true ->
+        find_first_gteq(Bt, Tuple, Mid+1, End, Key);
+    false ->
+        find_first_gteq(Bt, Tuple, Start, Mid, Key)
+    end.
+
+modify_kvnode(_Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput) ->
+    {ok, lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound, tuple_size(NodeTuple), [])), QueryOutput};
+modify_kvnode(Bt, NodeTuple, LowerBound, [{ActionType, ActionKey, ActionValue} | RestActions], ResultNode, QueryOutput) when LowerBound > tuple_size(NodeTuple) ->
+    case ActionType of
+    insert ->
+        modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput);
+    remove ->
+        % just drop the action
+        modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, QueryOutput);
+    fetch ->
+        % the key/value must not exist in the tree
+        modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, [{not_found, {ActionKey, nil}} | QueryOutput])
+    end;
+modify_kvnode(Bt, NodeTuple, LowerBound, [{ActionType, ActionKey, ActionValue} | RestActions], AccNode, QueryOutput) ->
+    N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), ActionKey),
+    {Key, Value} = element(N, NodeTuple),
+    ResultNode =  bounded_tuple_to_revlist(NodeTuple, LowerBound, N - 1, AccNode),
+    case less(Bt, ActionKey, Key) of
+    true ->
+        case ActionType of
+        insert ->
+            % ActionKey is less than the Key, so insert
+            modify_kvnode(Bt, NodeTuple, N, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput);
+        remove ->
+            % ActionKey is less than the Key, just drop the action
+            modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, QueryOutput);
+        fetch ->
+            % ActionKey is less than the Key, the key/value must not exist in the tree
+            modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [{not_found, {ActionKey, nil}} | QueryOutput])
+        end;
+    false ->
+        % ActionKey and Key are maybe equal.
+        case less(Bt, Key, ActionKey) of
+        false ->
+            case ActionType of
+            insert ->
+                modify_kvnode(Bt, NodeTuple, N+1, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput);
+            remove ->
+                modify_kvnode(Bt, NodeTuple, N+1, RestActions, ResultNode, QueryOutput);
+            fetch ->
+                % ActionKey is equal to the Key, insert into the QueryOuput, but re-process the node
+                % since an identical action key can follow it.
+                modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [{ok, assemble(Bt, Key, Value)} | QueryOutput])
+            end;
+        true ->
+            modify_kvnode(Bt, NodeTuple, N + 1, [{ActionType, ActionKey, ActionValue} | RestActions], [{Key, Value} | ResultNode], QueryOutput)
+        end
+    end.
+
+
+reduce_stream_node(_Bt, _Dir, nil, _KeyStart, _InEndRangeFun, GroupedKey, GroupedKVsAcc,
+        GroupedRedsAcc, _KeyGroupFun, _Fun, Acc) ->
+    {ok, Acc, GroupedRedsAcc, GroupedKVsAcc, GroupedKey};
+reduce_stream_node(Bt, Dir, Node, KeyStart, InEndRangeFun, GroupedKey, GroupedKVsAcc,
+        GroupedRedsAcc, KeyGroupFun, Fun, Acc) ->
+    P = element(1, Node),
+    case get_node(Bt, P) of
+    {kp_node, NodeList} ->
+        NodeList2 = adjust_dir(Dir, NodeList),
+        reduce_stream_kp_node(Bt, Dir, NodeList2, KeyStart, InEndRangeFun, GroupedKey,
+                GroupedKVsAcc, GroupedRedsAcc, KeyGroupFun, Fun, Acc);
+    {kv_node, KVs} ->
+        KVs2 = adjust_dir(Dir, KVs),
+        reduce_stream_kv_node(Bt, Dir, KVs2, KeyStart, InEndRangeFun, GroupedKey,
+                GroupedKVsAcc, GroupedRedsAcc, KeyGroupFun, Fun, Acc)
+    end.
+
+reduce_stream_kv_node(Bt, Dir, KVs, KeyStart, InEndRangeFun,
+                        GroupedKey, GroupedKVsAcc, GroupedRedsAcc,
+                        KeyGroupFun, Fun, Acc) ->
+
+    GTEKeyStartKVs =
+    case KeyStart of
+    undefined ->
+        KVs;
+    _ ->
+        DropFun = case Dir of
+        fwd ->
+            fun({Key, _}) -> less(Bt, Key, KeyStart) end;
+        rev ->
+            fun({Key, _}) -> less(Bt, KeyStart, Key) end
+        end,
+        lists:dropwhile(DropFun, KVs)
+    end,
+    KVs2 = lists:takewhile(
+        fun({Key, _}) -> InEndRangeFun(Key) end, GTEKeyStartKVs),
+    reduce_stream_kv_node2(Bt, KVs2, GroupedKey, GroupedKVsAcc, GroupedRedsAcc,
+                        KeyGroupFun, Fun, Acc).
+
+
+reduce_stream_kv_node2(_Bt, [], GroupedKey, GroupedKVsAcc, GroupedRedsAcc,
+        _KeyGroupFun, _Fun, Acc) ->
+    {ok, Acc, GroupedRedsAcc, GroupedKVsAcc, GroupedKey};
+reduce_stream_kv_node2(Bt, [{Key, Value}| RestKVs], GroupedKey, GroupedKVsAcc,
+        GroupedRedsAcc, KeyGroupFun, Fun, Acc) ->
+    case GroupedKey of
+    undefined ->
+        reduce_stream_kv_node2(Bt, RestKVs, Key,
+                [assemble(Bt,Key,Value)], [], KeyGroupFun, Fun, Acc);
+    _ ->
+
+        case KeyGroupFun(GroupedKey, Key) of
+        true ->
+            reduce_stream_kv_node2(Bt, RestKVs, GroupedKey,
+                [assemble(Bt,Key,Value)|GroupedKVsAcc], GroupedRedsAcc, KeyGroupFun,
+                Fun, Acc);
+        false ->
+            case Fun(GroupedKey, {GroupedKVsAcc, GroupedRedsAcc}, Acc) of
+            {ok, Acc2} ->
+                reduce_stream_kv_node2(Bt, RestKVs, Key, [assemble(Bt,Key,Value)],
+                    [], KeyGroupFun, Fun, Acc2);
+            {stop, Acc2} ->
+                throw({stop, Acc2})
+            end
+        end
+    end.
+
+reduce_stream_kp_node(Bt, Dir, NodeList, KeyStart, InEndRangeFun,
+                        GroupedKey, GroupedKVsAcc, GroupedRedsAcc,
+                        KeyGroupFun, Fun, Acc) ->
+    Nodes =
+    case KeyStart of
+    undefined ->
+        NodeList;
+    _ ->
+        case Dir of
+        fwd ->
+            lists:dropwhile(fun({Key, _}) -> less(Bt, Key, KeyStart) end, NodeList);
+        rev ->
+            RevKPs = lists:reverse(NodeList),
+            case lists:splitwith(fun({Key, _}) -> less(Bt, Key, KeyStart) end, RevKPs) of
+            {_Before, []} ->
+                NodeList;
+            {Before, [FirstAfter | _]} ->
+                [FirstAfter | lists:reverse(Before)]
+            end
+        end
+    end,
+    {InRange, MaybeInRange} = lists:splitwith(
+        fun({Key, _}) -> InEndRangeFun(Key) end, Nodes),
+    NodesInRange = case MaybeInRange of
+    [FirstMaybeInRange | _] when Dir =:= fwd ->
+        InRange ++ [FirstMaybeInRange];
+    _ ->
+        InRange
+    end,
+    reduce_stream_kp_node2(Bt, Dir, NodesInRange, KeyStart, InEndRangeFun,
+        GroupedKey, GroupedKVsAcc, GroupedRedsAcc, KeyGroupFun, Fun, Acc).
+
+
+reduce_stream_kp_node2(Bt, Dir, [{_Key, NodeInfo} | RestNodeList], KeyStart, InEndRangeFun,
+                        undefined, [], [], KeyGroupFun, Fun, Acc) ->
+    {ok, Acc2, GroupedRedsAcc2, GroupedKVsAcc2, GroupedKey2} =
+            reduce_stream_node(Bt, Dir, NodeInfo, KeyStart, InEndRangeFun, undefined,
+                [], [], KeyGroupFun, Fun, Acc),
+    reduce_stream_kp_node2(Bt, Dir, RestNodeList, KeyStart, InEndRangeFun, GroupedKey2,
+            GroupedKVsAcc2, GroupedRedsAcc2, KeyGroupFun, Fun, Acc2);
+reduce_stream_kp_node2(Bt, Dir, NodeList, KeyStart, InEndRangeFun,
+        GroupedKey, GroupedKVsAcc, GroupedRedsAcc, KeyGroupFun, Fun, Acc) ->
+    {Grouped0, Ungrouped0} = lists:splitwith(fun({Key,_}) ->
+        KeyGroupFun(GroupedKey, Key) end, NodeList),
+    {GroupedNodes, UngroupedNodes} =
+    case Grouped0 of
+    [] ->
+        {Grouped0, Ungrouped0};
+    _ ->
+        [FirstGrouped | RestGrouped] = lists:reverse(Grouped0),
+        {RestGrouped, [FirstGrouped | Ungrouped0]}
+    end,
+    GroupedReds = [element(2, Node) || {_, Node} <- GroupedNodes],
+    case UngroupedNodes of
+    [{_Key, NodeInfo}|RestNodes] ->
+        {ok, Acc2, GroupedRedsAcc2, GroupedKVsAcc2, GroupedKey2} =
+            reduce_stream_node(Bt, Dir, NodeInfo, KeyStart, InEndRangeFun, GroupedKey,
+                GroupedKVsAcc, GroupedReds ++ GroupedRedsAcc, KeyGroupFun, Fun, Acc),
+        reduce_stream_kp_node2(Bt, Dir, RestNodes, KeyStart, InEndRangeFun, GroupedKey2,
+                GroupedKVsAcc2, GroupedRedsAcc2, KeyGroupFun, Fun, Acc2);
+    [] ->
+        {ok, Acc, GroupedReds ++ GroupedRedsAcc, GroupedKVsAcc, GroupedKey}
+    end.
+
+adjust_dir(fwd, List) ->
+    List;
+adjust_dir(rev, List) ->
+    lists:reverse(List).
+
+stream_node(Bt, Reds, Node, StartKey, InRange, Dir, Fun, Acc) ->
+    Pointer = element(1, Node),
+    {NodeType, NodeList} = get_node(Bt, Pointer),
+    case NodeType of
+    kp_node ->
+        stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, InRange, Dir, Fun, Acc);
+    kv_node ->
+        stream_kv_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, InRange, Dir, Fun, Acc)
+    end.
+
+stream_node(Bt, Reds, Node, InRange, Dir, Fun, Acc) ->
+    Pointer = element(1, Node),
+    {NodeType, NodeList} = get_node(Bt, Pointer),
+    case NodeType of
+    kp_node ->
+        stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), InRange, Dir, Fun, Acc);
+    kv_node ->
+        stream_kv_node2(Bt, Reds, [], adjust_dir(Dir, NodeList), InRange, Dir, Fun, Acc)
+    end.
+
+stream_kp_node(_Bt, _Reds, [], _InRange, _Dir, _Fun, Acc) ->
+    {ok, Acc};
+stream_kp_node(Bt, Reds, [{Key, Node} | Rest], InRange, Dir, Fun, Acc) ->
+    Red = element(2, Node),
+    case Fun(traverse, Key, Red, Acc) of
+    {ok, Acc2} ->
+        case stream_node(Bt, Reds, Node, InRange, Dir, Fun, Acc2) of
+        {ok, Acc3} ->
+            stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc3);
+        {stop, LastReds, Acc3} ->
+            {stop, LastReds, Acc3}
+        end;
+    {skip, Acc2} ->
+        stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc2)
+    end.
+
+drop_nodes(_Bt, Reds, _StartKey, []) ->
+    {Reds, []};
+drop_nodes(Bt, Reds, StartKey, [{NodeKey, Node} | RestKPs]) ->
+    case less(Bt, NodeKey, StartKey) of
+    true ->
+        drop_nodes(Bt, [element(2, Node) | Reds], StartKey, RestKPs);
+    false ->
+        {Reds, [{NodeKey, Node} | RestKPs]}
+    end.
+
+stream_kp_node(Bt, Reds, KPs, StartKey, InRange, Dir, Fun, Acc) ->
+    {NewReds, NodesToStream} =
+    case Dir of
+    fwd ->
+        % drop all nodes sorting before the key
+        drop_nodes(Bt, Reds, StartKey, KPs);
+    rev ->
+        % keep all nodes sorting before the key, AND the first node to sort after
+        RevKPs = lists:reverse(KPs),
+         case lists:splitwith(fun({Key, _Pointer}) -> less(Bt, Key, StartKey) end, RevKPs) of
+        {_RevsBefore, []} ->
+            % everything sorts before it
+            {Reds, KPs};
+        {RevBefore, [FirstAfter | Drop]} ->
+            {[element(2, Node) || {_K, Node} <- Drop] ++ Reds,
+                 [FirstAfter | lists:reverse(RevBefore)]}
+        end
+    end,
+    case NodesToStream of
+    [] ->
+        {ok, Acc};
+    [{_Key, Node} | Rest] ->
+        case stream_node(Bt, NewReds, Node, StartKey, InRange, Dir, Fun, Acc) of
+        {ok, Acc2} ->
+            Red = element(2, Node),
+            stream_kp_node(Bt, [Red | NewReds], Rest, InRange, Dir, Fun, Acc2);
+        {stop, LastReds, Acc2} ->
+            {stop, LastReds, Acc2}
+        end
+    end.
+
+stream_kv_node(Bt, Reds, KVs, StartKey, InRange, Dir, Fun, Acc) ->
+    DropFun =
+    case Dir of
+    fwd ->
+        fun({Key, _}) -> less(Bt, Key, StartKey) end;
+    rev ->
+        fun({Key, _}) -> less(Bt, StartKey, Key) end
+    end,
+    {LTKVs, GTEKVs} = lists:splitwith(DropFun, KVs),
+    AssembleLTKVs = [assemble(Bt,K,V) || {K,V} <- LTKVs],
+    stream_kv_node2(Bt, Reds, AssembleLTKVs, GTEKVs, InRange, Dir, Fun, Acc).
+
+stream_kv_node2(_Bt, _Reds, _PrevKVs, [], _InRange, _Dir, _Fun, Acc) ->
+    {ok, Acc};
+stream_kv_node2(Bt, Reds, PrevKVs, [{K,V} | RestKVs], InRange, Dir, Fun, Acc) ->
+    case InRange(K) of
+    false ->
+        {stop, {PrevKVs, Reds}, Acc};
+    true ->
+        AssembledKV = assemble(Bt, K, V),
+        case Fun(visit, AssembledKV, {PrevKVs, Reds}, Acc) of
+        {ok, Acc2} ->
+            stream_kv_node2(Bt, Reds, [AssembledKV | PrevKVs], RestKVs, InRange, Dir, Fun, Acc2);
+        {stop, Acc2} ->
+            {stop, {PrevKVs, Reds}, Acc2}
+        end
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_changes.erl b/src/couch_changes.erl
new file mode 100644
index 0000000..6edde32
--- /dev/null
+++ b/src/couch_changes.erl
@@ -0,0 +1,577 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_changes).
+-include("couch_db.hrl").
+
+-export([handle_changes/3]).
+
+% For the builtin filter _docs_ids, this is the maximum number
+% of documents for which we trigger the optimized code path.
+-define(MAX_DOC_IDS, 100).
+
+-record(changes_acc, {
+    db,
+    seq,
+    prepend,
+    filter,
+    callback,
+    user_acc,
+    resp_type,
+    limit,
+    include_docs,
+    doc_options,
+    conflicts,
+    timeout,
+    timeout_fun
+}).
+
+%% @type Req -> #httpd{} | {json_req, JsonObj()}
+handle_changes(Args1, Req, Db0) ->
+    #changes_args{
+        style = Style,
+        filter = FilterName,
+        feed = Feed,
+        dir = Dir,
+        since = Since
+    } = Args1,
+    {FilterFun, FilterArgs} = make_filter_fun(FilterName, Style, Req, Db0),
+    Args = Args1#changes_args{filter_fun = FilterFun, filter_args = FilterArgs},
+    Start = fun() ->
+        {ok, Db} = couch_db:reopen(Db0),
+        StartSeq = case Dir of
+        rev ->
+            couch_db:get_update_seq(Db);
+        fwd ->
+            Since
+        end,
+        {Db, StartSeq}
+    end,
+    % begin timer to deal with heartbeat when filter function fails
+    case Args#changes_args.heartbeat of
+    undefined ->
+        erlang:erase(last_changes_heartbeat);
+    Val when is_integer(Val); Val =:= true ->
+        put(last_changes_heartbeat, now())
+    end,
+
+    case lists:member(Feed, ["continuous", "longpoll", "eventsource"]) of
+    true ->
+        fun(CallbackAcc) ->
+            {Callback, UserAcc} = get_callback_acc(CallbackAcc),
+            Self = self(),
+            {ok, Notify} = couch_db_update_notifier:start_link(
+                fun({_, DbName}) when  Db0#db.name == DbName ->
+                    Self ! db_updated;
+                (_) ->
+                    ok
+                end
+            ),
+            {Db, StartSeq} = Start(),
+            UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
+            {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
+            Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq,
+                             <<"">>, Timeout, TimeoutFun),
+            try
+                keep_sending_changes(
+                    Args#changes_args{dir=fwd},
+                    Acc0,
+                    true)
+            after
+                couch_db_update_notifier:stop(Notify),
+                get_rest_db_updated(ok) % clean out any remaining update messages
+            end
+        end;
+    false ->
+        fun(CallbackAcc) ->
+            {Callback, UserAcc} = get_callback_acc(CallbackAcc),
+            UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
+            {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
+            {Db, StartSeq} = Start(),
+            Acc0 = build_acc(Args#changes_args{feed="normal"}, Callback,
+                             UserAcc2, Db, StartSeq, <<>>, Timeout, TimeoutFun),
+            {ok, #changes_acc{seq = LastSeq, user_acc = UserAcc3}} =
+                send_changes(
+                    Args#changes_args{feed="normal"},
+                    Acc0,
+                    true),
+            end_sending_changes(Callback, UserAcc3, LastSeq, Feed)
+        end
+    end.
+
+get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) ->
+    Pair;
+get_callback_acc(Callback) when is_function(Callback, 2) ->
+    {fun(Ev, Data, _) -> Callback(Ev, Data) end, ok}.
+
+%% @type Req -> #httpd{} | {json_req, JsonObj()}
+make_filter_fun([$_ | _] = FilterName, Style, Req, Db) ->
+    builtin_filter_fun(FilterName, Style, Req, Db);
+make_filter_fun(FilterName, Style, Req, Db) ->
+    {os_filter_fun(FilterName, Style, Req, Db), []}.
+
+os_filter_fun(FilterName, Style, Req, Db) ->
+    case [list_to_binary(couch_httpd:unquote(Part))
+            || Part <- string:tokens(FilterName, "/")] of
+    [] ->
+        fun(_Db2, #doc_info{revs=Revs}) ->
+                builtin_results(Style, Revs)
+        end;
+    [DName, FName] ->
+        DesignId = <<"_design/", DName/binary>>,
+        DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, [ejson_body]),
+        % validate that the ddoc has the filter fun
+        #doc{body={Props}} = DDoc,
+        couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]),
+        fun(Db2, DocInfo) ->
+            DocInfos =
+            case Style of
+            main_only ->
+                [DocInfo];
+            all_docs ->
+                [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs]
+            end,
+            Docs = [Doc || {ok, Doc} <- [
+                    couch_db:open_doc(Db2, DocInfo2, [deleted, conflicts])
+                        || DocInfo2 <- DocInfos]],
+            {ok, Passes} = couch_query_servers:filter_docs(
+                Req, Db2, DDoc, FName, Docs
+            ),
+            [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]}
+                || {Pass, #doc{revs={RevPos,[RevId|_]}}}
+                <- lists:zip(Passes, Docs), Pass == true]
+        end;
+    _Else ->
+        throw({bad_request,
+            "filter parameter must be of the form `designname/filtername`"})
+    end.
+
+builtin_filter_fun("_doc_ids", Style, {json_req, {Props}}, _Db) ->
+    DocIds = couch_util:get_value(<<"doc_ids">>, Props),
+    {filter_docids(DocIds, Style), DocIds};
+builtin_filter_fun("_doc_ids", Style, #httpd{method='POST'}=Req, _Db) ->
+    {Props} = couch_httpd:json_body_obj(Req),
+    DocIds =  couch_util:get_value(<<"doc_ids">>, Props, nil),
+    {filter_docids(DocIds, Style), DocIds};
+builtin_filter_fun("_doc_ids", Style, #httpd{method='GET'}=Req, _Db) ->
+    DocIds = ?JSON_DECODE(couch_httpd:qs_value(Req, "doc_ids", "null")),
+    {filter_docids(DocIds, Style), DocIds};
+builtin_filter_fun("_design", Style, _Req, _Db) ->
+    {filter_designdoc(Style), []};
+builtin_filter_fun("_view", Style, Req, Db) ->
+    ViewName = couch_httpd:qs_value(Req, "view", ""),
+    {filter_view(ViewName, Style, Db), []};
+builtin_filter_fun(_FilterName, _Style, _Req, _Db) ->
+    throw({bad_request, "unknown builtin filter name"}).
+
+filter_docids(DocIds, Style) when is_list(DocIds)->
+    fun(_Db, #doc_info{id=DocId, revs=Revs}) ->
+            case lists:member(DocId, DocIds) of
+                true ->
+                    builtin_results(Style, Revs);
+                _ -> []
+            end
+    end;
+filter_docids(_, _) ->
+    throw({bad_request, "`doc_ids` filter parameter is not a list."}).
+
+filter_designdoc(Style) ->
+    fun(_Db, #doc_info{id=DocId, revs=Revs}) ->
+            case DocId of
+            <<"_design", _/binary>> ->
+                    builtin_results(Style, Revs);
+                _ -> []
+            end
+    end.
+
+filter_view("", _Style, _Db) ->
+    throw({bad_request, "`view` filter parameter is not provided."});
+filter_view(ViewName, Style, Db) ->
+    case [list_to_binary(couch_httpd:unquote(Part))
+            || Part <- string:tokens(ViewName, "/")] of
+        [] ->
+            throw({bad_request, "Invalid `view` parameter."});
+        [DName, VName] ->
+            DesignId = <<"_design/", DName/binary>>,
+            DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, [ejson_body]),
+            % validate that the ddoc has the filter fun
+            #doc{body={Props}} = DDoc,
+            couch_util:get_nested_json_value({Props}, [<<"views">>, VName]),
+            fun(Db2, DocInfo) ->
+                DocInfos =
+                case Style of
+                main_only ->
+                    [DocInfo];
+                all_docs ->
+                    [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs]
+                end,
+                Docs = [Doc || {ok, Doc} <- [
+                        couch_db:open_doc(Db2, DocInfo2, [deleted, conflicts])
+                            || DocInfo2 <- DocInfos]],
+                {ok, Passes} = couch_query_servers:filter_view(
+                    DDoc, VName, Docs
+                ),
+                [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]}
+                    || {Pass, #doc{revs={RevPos,[RevId|_]}}}
+                    <- lists:zip(Passes, Docs), Pass == true]
+            end
+        end.
+
+builtin_results(Style, [#rev_info{rev=Rev}|_]=Revs) ->
+    case Style of
+        main_only ->
+            [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
+        all_docs ->
+            [{[{<<"rev">>, couch_doc:rev_to_str(R)}]}
+                || #rev_info{rev=R} <- Revs]
+    end.
+
+get_changes_timeout(Args, Callback) ->
+    #changes_args{
+        heartbeat = Heartbeat,
+        timeout = Timeout,
+        feed = ResponseType
+    } = Args,
+    DefaultTimeout = list_to_integer(
+        couch_config:get("httpd", "changes_timeout", "60000")
+    ),
+    case Heartbeat of
+    undefined ->
+        case Timeout of
+        undefined ->
+            {DefaultTimeout, fun(UserAcc) -> {stop, UserAcc} end};
+        infinity ->
+            {infinity, fun(UserAcc) -> {stop, UserAcc} end};
+        _ ->
+            {lists:min([DefaultTimeout, Timeout]),
+                fun(UserAcc) -> {stop, UserAcc} end}
+        end;
+    true ->
+        {DefaultTimeout,
+            fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end};
+    _ ->
+        {lists:min([DefaultTimeout, Heartbeat]),
+            fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}
+    end.
+
+start_sending_changes(_Callback, UserAcc, ResponseType)
+        when ResponseType =:= "continuous"
+        orelse ResponseType =:= "eventsource" ->
+    UserAcc;
+start_sending_changes(Callback, UserAcc, ResponseType) ->
+    Callback(start, ResponseType, UserAcc).
+
+build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) ->
+    #changes_args{
+        include_docs = IncludeDocs,
+        doc_options = DocOpts,
+        conflicts = Conflicts,
+        limit = Limit,
+        feed = ResponseType,
+        filter_fun = FilterFun
+    } = Args,
+    #changes_acc{
+        db = Db,
+        seq = StartSeq,
+        prepend = Prepend,
+        filter = FilterFun,
+        callback = Callback,
+        user_acc = UserAcc,
+        resp_type = ResponseType,
+        limit = Limit,
+        include_docs = IncludeDocs,
+        doc_options = DocOpts,
+        conflicts = Conflicts,
+        timeout = Timeout,
+        timeout_fun = TimeoutFun
+    }.
+
+send_changes(Args, Acc0, FirstRound) ->
+    #changes_args{
+        dir = Dir,
+        filter = FilterName,
+        filter_args = FilterArgs
+    } = Args,
+    #changes_acc{
+        db = Db,
+        seq = StartSeq
+    } = Acc0,
+    case FirstRound of
+    true ->
+        case FilterName of
+        "_doc_ids" when length(FilterArgs) =< ?MAX_DOC_IDS ->
+            send_changes_doc_ids(
+                FilterArgs, Db, StartSeq, Dir, fun changes_enumerator/2, Acc0);
+        "_design" ->
+            send_changes_design_docs(
+                Db, StartSeq, Dir, fun changes_enumerator/2, Acc0);
+        _ ->
+            couch_db:changes_since(
+                Db, StartSeq, fun changes_enumerator/2, [{dir, Dir}], Acc0)
+        end;
+    false ->
+        couch_db:changes_since(
+            Db, StartSeq, fun changes_enumerator/2, [{dir, Dir}], Acc0)
+    end.
+
+
+send_changes_doc_ids(DocIds, Db, StartSeq, Dir, Fun, Acc0) ->
+    Lookups = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, DocIds),
+    FullDocInfos = lists:foldl(
+        fun({ok, FDI}, Acc) ->
+            [FDI | Acc];
+        (not_found, Acc) ->
+            Acc
+        end,
+        [], Lookups),
+    send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0).
+
+
+send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0) ->
+    FoldFun = fun(FullDocInfo, _, Acc) ->
+        {ok, [FullDocInfo | Acc]}
+    end,
+    KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}],
+    {ok, _, FullDocInfos} = couch_btree:fold(
+        Db#db.fulldocinfo_by_id_btree, FoldFun, [], KeyOpts),
+    send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0).
+
+
+send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
+    FoldFun = case Dir of
+    fwd ->
+        fun lists:foldl/3;
+    rev ->
+        fun lists:foldr/3
+    end,
+    GreaterFun = case Dir of
+    fwd ->
+        fun(A, B) -> A > B end;
+    rev ->
+        fun(A, B) -> A =< B end
+    end,
+    DocInfos = lists:foldl(
+        fun(FDI, Acc) ->
+            DI = couch_doc:to_doc_info(FDI),
+            case GreaterFun(DI#doc_info.high_seq, StartSeq) of
+            true ->
+                [DI | Acc];
+            false ->
+                Acc
+            end
+        end,
+        [], FullDocInfos),
+    SortedDocInfos = lists:keysort(#doc_info.high_seq, DocInfos),
+    FinalAcc = try
+        FoldFun(
+            fun(DocInfo, Acc) ->
+                case Fun(DocInfo, Acc) of
+                {ok, NewAcc} ->
+                    NewAcc;
+                {stop, NewAcc} ->
+                    throw({stop, NewAcc})
+                end
+            end,
+            Acc0, SortedDocInfos)
+    catch
+    throw:{stop, Acc} ->
+        Acc
+    end,
+    case Dir of
+    fwd ->
+        {ok, FinalAcc#changes_acc{seq = couch_db:get_update_seq(Db)}};
+    rev ->
+        {ok, FinalAcc}
+    end.
+
+
+keep_sending_changes(Args, Acc0, FirstRound) ->
+    #changes_args{
+        feed = ResponseType,
+        limit = Limit,
+        db_open_options = DbOptions
+    } = Args,
+
+    {ok, ChangesAcc} = send_changes(
+        Args#changes_args{dir=fwd},
+        Acc0,
+        FirstRound),
+    #changes_acc{
+        db = Db, callback = Callback, timeout = Timeout, timeout_fun = TimeoutFun,
+        seq = EndSeq, prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit
+    } = ChangesAcc,
+
+    couch_db:close(Db),
+    if Limit > NewLimit, ResponseType == "longpoll" ->
+        end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType);
+    true ->
+        case wait_db_updated(Timeout, TimeoutFun, UserAcc2) of
+        {updated, UserAcc4} ->
+            DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions],
+            case couch_db:open(Db#db.name, DbOptions1) of
+            {ok, Db2} ->
+                keep_sending_changes(
+                  Args#changes_args{limit=NewLimit},
+                  ChangesAcc#changes_acc{
+                    db = Db2,
+                    user_acc = UserAcc4,
+                    seq = EndSeq,
+                    prepend = Prepend2,
+                    timeout = Timeout,
+                    timeout_fun = TimeoutFun},
+                  false);
+            _Else ->
+                end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType)
+            end;
+        {stop, UserAcc4} ->
+            end_sending_changes(Callback, UserAcc4, EndSeq, ResponseType)
+        end
+    end.
+
+end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
+    Callback({stop, EndSeq}, ResponseType, UserAcc).
+
+changes_enumerator(DocInfo, #changes_acc{resp_type = ResponseType} = Acc)
+        when ResponseType =:= "continuous"
+        orelse ResponseType =:= "eventsource" ->
+    #changes_acc{
+        filter = FilterFun, callback = Callback,
+        user_acc = UserAcc, limit = Limit, db = Db,
+        timeout = Timeout, timeout_fun = TimeoutFun
+    } = Acc,
+    #doc_info{high_seq = Seq} = DocInfo,
+    Results0 = FilterFun(Db, DocInfo),
+    Results = [Result || Result <- Results0, Result /= null],
+    %% TODO: I'm thinking this should be < 1 and not =< 1
+    Go = if Limit =< 1 -> stop; true -> ok end,
+    case Results of
+    [] ->
+        {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+        case Done of
+        stop ->
+            {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}};
+        ok ->
+            {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}
+        end;
+    _ ->
+        ChangesRow = changes_row(Results, DocInfo, Acc),
+        UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
+        reset_heartbeat(),
+        {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}}
+    end;
+changes_enumerator(DocInfo, Acc) ->
+    #changes_acc{
+        filter = FilterFun, callback = Callback, prepend = Prepend,
+        user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
+        timeout = Timeout, timeout_fun = TimeoutFun
+    } = Acc,
+    #doc_info{high_seq = Seq} = DocInfo,
+    Results0 = FilterFun(Db, DocInfo),
+    Results = [Result || Result <- Results0, Result /= null],
+    Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
+    case Results of
+    [] ->
+        {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+        case Done of
+        stop ->
+            {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}};
+        ok ->
+            {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}
+        end;
+    _ ->
+        ChangesRow = changes_row(Results, DocInfo, Acc),
+        UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+        reset_heartbeat(),
+        {Go, Acc#changes_acc{
+            seq = Seq, prepend = <<",\n">>,
+            user_acc = UserAcc2, limit = Limit - 1}}
+    end.
+
+
+changes_row(Results, DocInfo, Acc) ->
+    #doc_info{
+        id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]
+    } = DocInfo,
+    #changes_acc{
+        db = Db,
+        include_docs = IncDoc,
+        doc_options = DocOpts,
+        conflicts = Conflicts
+    } = Acc,
+    {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++
+        deleted_item(Del) ++ case IncDoc of
+            true ->
+                Opts = case Conflicts of
+                    true -> [deleted, conflicts];
+                    false -> [deleted]
+                end,
+                Doc = couch_index_util:load_doc(Db, DocInfo, Opts),
+                case Doc of
+                    null ->
+                        [{doc, null}];
+                    _ ->
+                        [{doc, couch_doc:to_json_obj(Doc, DocOpts)}]
+                end;
+            false ->
+                []
+        end}.
+
+deleted_item(true) -> [{<<"deleted">>, true}];
+deleted_item(_) -> [].
+
+% waits for a db_updated msg, if there are multiple msgs, collects them.
+wait_db_updated(Timeout, TimeoutFun, UserAcc) ->
+    receive
+    db_updated ->
+        get_rest_db_updated(UserAcc)
+    after Timeout ->
+        {Go, UserAcc2} = TimeoutFun(UserAcc),
+        case Go of
+        ok ->
+            wait_db_updated(Timeout, TimeoutFun, UserAcc2);
+        stop ->
+            {stop, UserAcc2}
+        end
+    end.
+
+get_rest_db_updated(UserAcc) ->
+    receive
+    db_updated ->
+        get_rest_db_updated(UserAcc)
+    after 0 ->
+        {updated, UserAcc}
+    end.
+
+reset_heartbeat() ->
+    case get(last_changes_heartbeat) of
+    undefined ->
+        ok;
+    _ ->
+        put(last_changes_heartbeat, now())
+    end.
+
+maybe_heartbeat(Timeout, TimeoutFun, Acc) ->
+    Before = get(last_changes_heartbeat),
+    case Before of
+    undefined ->
+        {ok, Acc};
+    _ ->
+        Now = now(),
+        case timer:now_diff(Now, Before) div 1000 >= Timeout of
+        true ->
+            Acc2 = TimeoutFun(Acc),
+            put(last_changes_heartbeat, Now),
+            Acc2;
+        false ->
+            {ok, Acc}
+        end
+    end.


Mime
View raw message