couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rnew...@apache.org
Subject [01/10] git commit: Add the initial version of the global_changes app.
Date Thu, 07 Aug 2014 16:01:03 GMT
Repository: couchdb-global-changes
Updated Branches:
  refs/heads/windsor-merge 9035f4da4 -> 0324bf43e (forced update)


Add the initial version of the global_changes app.

BugzID: 17681


Project: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/commit/f429cd0e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/tree/f429cd0e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/diff/f429cd0e

Branch: refs/heads/windsor-merge
Commit: f429cd0e0d10939778c56ca4acd4652cdb5990d2
Parents: fed1844
Author: Benjamin Bastian <benjamin.bastian@gmail.com>
Authored: Thu Sep 5 18:33:38 2013 -0700
Committer: Robert Newson <rnewson@apache.org>
Committed: Thu Aug 7 17:00:32 2014 +0100

----------------------------------------------------------------------
 .gitignore                             |   2 +
 README.md                              |  27 ++++
 src/global_changes.app.src             |  18 +++
 src/global_changes_app.erl             |  18 +++
 src/global_changes_config_listener.erl |  94 +++++++++++++
 src/global_changes_httpd.erl           | 203 +++++++++++++++++++++++++++
 src/global_changes_listener.erl        | 146 +++++++++++++++++++
 src/global_changes_server.erl          | 209 ++++++++++++++++++++++++++++
 src/global_changes_sup.erl             |  35 +++++
 src/global_changes_util.erl            |  17 +++
 10 files changed, 769 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/f429cd0e/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..e1b16d5
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+.eunit/
+ebin/

http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/f429cd0e/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index e69de29..f22ee2c 100644
--- a/README.md
+++ b/README.md
@@ -0,0 +1,27 @@
+### global\_changes
+
+This app supplies the functionality for the `/_db_updates` endpoint.
+
+When a database is created, deleted, or updated, a corresponding event will be persisted
to disk (Note: This was designed without the guarantee that a DB event will be persisted or
ever occur in the `_db_updates` feed. It probably will, but it isn't guaranteed). Users can
subscribe to a `_changes`-like feed of these database events by querying the `_db_updates`
endpoint.
+
+When an admin user queries the `/_db_updates` endpoint, they will see the account name associated
with the DB update as well as update
+
+### Captured Metrics
+
+1: `global_changes`, `db_writes`: The number of doc updates caused by global\_changes.
+
+2: `global_changes`, `server_pending_updates`: The number of documents aggregated into the
pending write batch.
+
+3: `global_changes`, `listener_pending_updates`: The number of documents aggregated into
the pending event batch.
+
+4: `global_changes`, `event_doc_conflict`: The number of rev tree branches in event docs
encountered by global\_changes. Should never happen.
+
+5: `global_changes`, `rpcs`: The number of non-fabric RPCs caused by global\_changes.
+
+### Important Configs
+
+1: `global_changes`, `max_event_delay`: (integer, milliseconds) The total timed added before
an event is forwarded to the writer.
+
+2: `global_changes`, `max_write_delay`: (integer, milliseconds) The time added before an
event is sent to disk.
+
+3: `global_changes`, `update_db`: (true/false) A flag setting whether to update the global\_changes
database. If false, changes will be lost and there will be no performance impact of global\_changes
on the cluster.

http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/f429cd0e/src/global_changes.app.src
----------------------------------------------------------------------
diff --git a/src/global_changes.app.src b/src/global_changes.app.src
new file mode 100644
index 0000000..8586d7c
--- /dev/null
+++ b/src/global_changes.app.src
@@ -0,0 +1,18 @@
+{application, global_changes, [
+    {description, "_changes-like feeds for multiple DBs"},
+    {vsn, git},
+    {registered, [global_changes_config_listener, global_changes_server]},
+    {applications, [
+        kernel,
+        stdlib,
+        config,
+        couch_log,
+        couch,
+        mem3,
+        fabric
+    ]},
+    {mod, {global_changes_app, []}},
+    {env, [
+        {dbname, <<"global_changes">>}
+    ]}
+]}.

http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/f429cd0e/src/global_changes_app.erl
----------------------------------------------------------------------
diff --git a/src/global_changes_app.erl b/src/global_changes_app.erl
new file mode 100644
index 0000000..a722155
--- /dev/null
+++ b/src/global_changes_app.erl
@@ -0,0 +1,18 @@
+%% Copyright 2013 Cloudant
+
+-module(global_changes_app).
+-behavior(application).
+
+
+-export([
+    start/2,
+    stop/1
+]).
+
+
+start(_StartType, _StartArgs) ->
+    global_changes_sup:start_link().
+
+
+stop(_State) ->
+    ok.

http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/f429cd0e/src/global_changes_config_listener.erl
----------------------------------------------------------------------
diff --git a/src/global_changes_config_listener.erl b/src/global_changes_config_listener.erl
new file mode 100644
index 0000000..58f69bb
--- /dev/null
+++ b/src/global_changes_config_listener.erl
@@ -0,0 +1,94 @@
+% Copyright 2013 Cloudant. All rights reserved.
+
+-module(global_changes_config_listener).
+-behavior(gen_server).
+-behavior(config_listener).
+
+
+-export([
+    start_link/0
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+-export([
+    handle_config_change/5
+]).
+
+
+-define(LISTENER, global_changes_listener).
+-define(SERVER, global_changes_server).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init([]) ->
+    ok = config:listen_for_changes(?MODULE, nil),
+    {ok, nil}.
+
+
+terminate(_, _St) ->
+    ok.
+
+
+handle_call(Msg, _From, St) ->
+    {stop, {invalid_call, Msg}, {invalid_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+    {stop, {invalid_cast, Msg}, St}.
+
+
+handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, St) ->
+    erlang:send_after(5000, self(), restart_config_listener),
+    {noreply, St};
+handle_info(restart_config_listener, St) ->
+    ok = config:listen_for_changes(?MODULE, St),
+    {noreply, St};
+handle_info(_Msg, St) ->
+    {noreply, St}.
+
+
+code_change(_, St, _) ->
+    {ok, St}.
+
+
+handle_config_change("global_changes", "max_event_delay", MaxDelayStr, _, _) ->
+    try list_to_integer(MaxDelayStr) of
+        MaxDelay ->
+            gen_server:cast(?LISTENER, {set_max_event_delay, MaxDelay})
+    catch error:badarg ->
+        ok
+    end,
+    {ok, nil};
+
+handle_config_change("global_changes", "max_write_delay", MaxDelayStr, _, _) ->
+    try list_to_integer(MaxDelayStr) of
+        MaxDelay ->
+            gen_server:cast(?SERVER, {set_max_write_delay, MaxDelay})
+    catch error:badarg ->
+        ok
+    end,
+    {ok, nil};
+
+handle_config_change("global_changes", "update_db", "false", _, _) ->
+    gen_server:cast(?LISTENER, {set_update_db, false}),
+    gen_server:cast(?SERVER, {set_update_db, false}),
+    {ok, nil};
+
+handle_config_change("global_changes", "update_db", _, _, _) ->
+    gen_server:cast(?LISTENER, {set_update_db, true}),
+    gen_server:cast(?SERVER, {set_update_db, true}),
+    {ok, nil};
+
+handle_config_change(_, _, _, _, _) ->
+    {ok, nil}.

http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/f429cd0e/src/global_changes_httpd.erl
----------------------------------------------------------------------
diff --git a/src/global_changes_httpd.erl b/src/global_changes_httpd.erl
new file mode 100644
index 0000000..f4d01a3
--- /dev/null
+++ b/src/global_changes_httpd.erl
@@ -0,0 +1,203 @@
+%% Copyright 2013 Cloudant
+
+-module(global_changes_httpd).
+
+-export([handle_global_changes_req/1]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-record(acc, {
+    heartbeat_interval,
+    last_data_sent_time,
+    feed,
+    prepend,
+    resp,
+    etag,
+    username
+}).
+
+handle_global_changes_req(#httpd{method='GET'}=Req) ->
+    Db = global_changes_util:get_dbname(),
+    Feed = couch_httpd:qs_value(Req, "feed", "normal"),
+    Options = parse_global_changes_query(Req),
+    Heartbeat = case lists:keyfind(heartbeat, 1, Options) of
+        {heartbeat, Other} -> Other;
+        {heartbeat, true} -> 60000;
+        false -> false
+    end,
+    chttpd:verify_is_server_admin(Req),
+    Acc = #acc{
+        username=admin,
+        feed=Feed,
+        resp=Req,
+        heartbeat_interval=Heartbeat
+    },
+    case Feed of
+        "normal" ->
+            {ok, Info} = fabric:get_db_info(Db),
+            Etag = chttpd:make_etag(Info),
+            chttpd:etag_respond(Req, Etag, fun() ->
+                fabric:changes(Db, fun changes_callback/2, Acc#acc{etag=Etag}, Options)
+            end);
+        Feed when Feed =:= "continuous"; Feed =:= "longpoll" ->
+            fabric:changes(Db, fun changes_callback/2, Acc, Options);
+        _ ->
+            Msg = <<"Supported `feed` types: normal, continuous, longpoll">>,
+            throw({bad_request, Msg})
+    end;
+handle_global_changes_req(Req) ->
+    chttpd:send_method_not_allowed(Req, "GET").
+
+
+transform_change(Username, _Resp, {Props}) ->
+    {id, Id} = lists:keyfind(id, 1, Props),
+    {seq, Seq} = lists:keyfind(seq, 1, Props),
+    Info = case binary:split(Id, <<":">>) of
+        [Event0, DbNameAndUsername] ->
+            case binary:split(DbNameAndUsername, <<"/">>) of
+                [AccountName0, DbName0] ->
+                    {Event0, AccountName0, DbName0};
+                [DbName0] ->
+                    {Event0, '_admin', DbName0}
+            end;
+        _ ->
+            skip
+    end,
+    case Info of
+        % Matches the client's username
+        {Event, Username, DbName} when Username /= admin ->
+            {[
+                {dbname, DbName},
+                {type, Event},
+                {seq, Seq}
+            ]};
+        % Client is an admin, show them everything.
+        {Event, AccountName, DbName} when Username == admin ->
+            {[
+                {dbname, DbName},
+                {type, Event},
+                {account, AccountName},
+                {seq, Seq}
+            ]};
+        _ ->
+            skip
+    end.
+
+
+% callbacks for continuous feed (newline-delimited JSON Objects)
+changes_callback(start, #acc{feed="continuous"}=Acc) ->
+    #acc{resp=Req} = Acc,
+    {ok, Resp} = chttpd:start_delayed_json_response(Req, 200),
+    {ok, Acc#acc{resp=Resp, last_data_sent_time=os:timestamp()}};
+changes_callback({change, Change0}, #acc{feed="continuous"}=Acc) ->
+    #acc{resp=Resp, username=Username} = Acc,
+    case transform_change(Username, Resp, Change0) of
+        skip ->
+            {ok, maybe_send_heartbeat(Acc)};
+        Change ->
+            Line = [?JSON_ENCODE(Change) | "\n"],
+            {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Line),
+            {ok, Acc#acc{resp=Resp1, last_data_sent_time=os:timestamp()}}
+    end;
+changes_callback({stop, EndSeq}, #acc{feed="continuous"}=Acc) ->
+    #acc{resp=Resp} = Acc,
+    {ok, Resp1} = chttpd:send_delayed_chunk(Resp,
+        [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"]),
+    chttpd:end_delayed_json_response(Resp1);
+
+% callbacks for longpoll and normal (single JSON Object)
+changes_callback(start, #acc{feed="normal", etag=Etag}=Acc)
+        when Etag =/= undefined ->
+    #acc{resp=Req} = Acc,
+    FirstChunk = "{\"results\":[\n",
+    {ok, Resp} = chttpd:start_delayed_json_response(Req, 200,
+       [{"Etag",Etag}], FirstChunk),
+    {ok, Acc#acc{resp=Resp, prepend="", last_data_sent_time=os:timestamp()}};
+changes_callback(start, Acc) ->
+    #acc{resp=Req} = Acc,
+    FirstChunk = "{\"results\":[\n",
+    {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [], FirstChunk),
+    {ok, Acc#acc{resp=Resp, prepend="", last_data_sent_time=os:timestamp()}};
+changes_callback({change, Change0}, Acc) ->
+    #acc{resp=Resp, prepend=Prepend, username=Username} = Acc,
+    case transform_change(Username, Resp, Change0) of
+        skip ->
+            {ok, maybe_send_heartbeat(Acc)};
+        Change ->
+            #acc{resp=Resp, prepend=Prepend} = Acc,
+            Line = [Prepend, ?JSON_ENCODE(Change)],
+            {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Line),
+            Acc1 = Acc#acc{
+                prepend=",\r\n",
+                resp=Resp1,
+                last_data_sent_time=os:timestamp()
+            },
+            {ok, Acc1}
+    end;
+changes_callback({stop, EndSeq}, Acc) ->
+    #acc{resp=Resp} = Acc,
+    {ok, Resp1} = chttpd:send_delayed_chunk(Resp,
+        ["\n],\n\"last_seq\":", ?JSON_ENCODE(EndSeq), "}\n"]),
+    chttpd:end_delayed_json_response(Resp1);
+
+changes_callback(timeout, Acc) ->
+    {ok, maybe_send_heartbeat(Acc)};
+
+changes_callback({error, Reason}, #acc{resp=Req=#httpd{}}) ->
+    chttpd:send_error(Req, Reason);
+changes_callback({error, Reason}, Acc) ->
+    #acc{etag=Etag, feed=Feed, resp=Resp} = Acc,
+    case {Feed, Etag} of
+        {"normal", Etag} when Etag =/= undefined ->
+            chttpd:send_error(Resp, Reason);
+        _ ->
+            chttpd:send_delayed_error(Resp, Reason)
+    end.
+
+
+maybe_send_heartbeat(#acc{heartbeat_interval=false}=Acc) ->
+    Acc;
+maybe_send_heartbeat(Acc) ->
+    #acc{last_data_sent_time=LastSentTime, heartbeat_interval=Interval, resp=Resp} = Acc,
+    Now = os:timestamp(),
+    case timer:now_diff(Now, LastSentTime) div 1000 > Interval of
+        true ->
+            {ok, Resp1} = chttpd:send_delayed_chunk(Resp, "\n"),
+            Acc#acc{last_data_sent_time=Now, resp=Resp1};
+        false ->
+            Acc
+    end.
+
+
+parse_global_changes_query(Req) ->
+    lists:foldl(fun({Key, Value}, Args) ->
+        case {Key, Value} of
+        {"feed", _} ->
+            [{feed, Value} | Args];
+        {"descending", "true"} ->
+            [{dir, rev} | Args];
+        {"since", _} ->
+            [{since, Value} | Args];
+        {"limit", _} ->
+            [{limit, to_non_neg_int(Value)} | Args];
+        {"heartbeat", "true"} ->
+            [{heartbeat, true} | Args];
+        {"heartbeat", _} ->
+            [{heartbeat, to_non_neg_int(Value)} | Args];
+        {"timeout", _} ->
+            [{timeout, to_non_neg_int(Value)} | Args];
+        _Else -> % unknown key value pair, ignore.
+            Args
+        end
+    end, [], couch_httpd:qs(Req)).
+
+
+to_non_neg_int(Value) ->
+    try list_to_integer(Value) of
+        V when V >= 0 ->
+            V;
+        _ ->
+            throw({bad_request, invalid_integer})
+    catch error:badarg ->
+        throw({bad_request, invalid_integer})
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/f429cd0e/src/global_changes_listener.erl
----------------------------------------------------------------------
diff --git a/src/global_changes_listener.erl b/src/global_changes_listener.erl
new file mode 100644
index 0000000..ae38245
--- /dev/null
+++ b/src/global_changes_listener.erl
@@ -0,0 +1,146 @@
+% Copyright 2013 Cloudant. All rights reserved.
+
+-module(global_changes_listener).
+-behavior(couch_event_listener).
+
+
+-export([
+    start/0
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_event/3,
+    handle_cast/2,
+    handle_info/2
+]).
+
+-record(state, {
+    update_db,
+    pending_update_count,
+    pending_updates,
+    last_update_time,
+    max_event_delay,
+    dbname
+}).
+
+
+-include_lib("mem3/include/mem3.hrl").
+
+
+start() ->
+    couch_event_listener:start(?MODULE, nil, [all_dbs]).
+
+
+init(_) ->
+    % get configs as strings
+    UpdateDb0 = config:get("global_changes", "update_db", "true"),
+    MaxEventDelay0 = config:get("global_changes", "max_event_delay", "25"),
+
+    % make config strings into other data types
+    UpdateDb = case UpdateDb0 of "false" -> false; _ -> true end,
+    MaxEventDelay = list_to_integer(MaxEventDelay0),
+
+    State = #state{
+        update_db=UpdateDb,
+        pending_update_count=0,
+        pending_updates=sets:new(),
+        max_event_delay=MaxEventDelay,
+        dbname=global_changes_util:get_dbname()
+    },
+    {ok, State}.
+
+
+terminate(_Reason, _State) ->
+    ok.
+
+
+handle_event(_ShardName, _Event, #state{update_db=false}=State) ->
+    {ok, State};
+handle_event(ShardName, Event, State0)
+        when Event =:= updated orelse Event =:= deleted
+        orelse Event =:= created ->
+    #state{dbname=ChangesDbName} = State0,
+    State = case mem3:dbname(ShardName) of
+        ChangesDbName ->
+            State0;
+        DbName ->
+            #state{pending_update_count=Count} = State0,
+            EventBin = erlang:atom_to_binary(Event, latin1),
+            Key = <<EventBin/binary, <<":">>/binary, DbName/binary>>,
+            Pending = sets:add_element(Key, State0#state.pending_updates),
+            State0#state{pending_updates=Pending, pending_update_count=Count+1}
+    end,
+    maybe_send_updates(State);
+handle_event(_DbName, _Event, State) ->
+    maybe_send_updates(State).
+
+
+handle_cast({set_max_event_delay, MaxEventDelay}, State) ->
+    maybe_send_updates(State#state{max_event_delay=MaxEventDelay});
+handle_cast({set_update_db, Boolean}, State0) ->
+    % If turning update_db off, clear out server state
+    State = case {Boolean, State0#state.update_db} of
+        {false, true} ->
+            State0#state{
+                update_db=Boolean,
+                pending_updates=sets:new(),
+                pending_update_count=0,
+                last_update_time=undefined
+            };
+        _ ->
+            State0#state{update_db=Boolean}
+    end,
+    maybe_send_updates(State);
+handle_cast(_Msg, State) ->
+    maybe_send_updates(State).
+
+
+maybe_send_updates(#state{pending_update_count=0}=State) ->
+    {ok, State};
+maybe_send_updates(#state{update_db=true}=State) ->
+    #state{max_event_delay=MaxEventDelay, last_update_time=LastUpdateTime} = State,
+    Now = os:timestamp(),
+    case LastUpdateTime of
+    undefined ->
+        {ok, State#state{last_update_time=Now}, MaxEventDelay};
+    _ ->
+        Delta = timer:now_diff(Now, LastUpdateTime) div 1000,
+        if Delta >= MaxEventDelay ->
+            Updates = sets:to_list(State#state.pending_updates),
+            try group_updates_by_node(State#state.dbname, Updates) of
+                Grouped ->
+                    dict:map(fun(Node, Docs) ->
+                        MFA = {global_changes_server, update_docs, [Docs]},
+                        rexi:cast(Node, MFA)
+                    end, Grouped)
+            catch error:database_does_not_exist ->
+                ok
+            end,
+            State1 = State#state{
+                pending_updates=sets:new(),
+                pending_update_count=0,
+                last_update_time=undefined
+            },
+            {ok, State1};
+        true ->
+            {ok, State, MaxEventDelay-Delta}
+        end
+    end;
+maybe_send_updates(State) ->
+    {ok, State}.
+
+
+handle_info(_Msg, State) ->
+    maybe_send_updates(State).
+
+
+-spec group_updates_by_node(binary(), [binary()]) -> dict().
+group_updates_by_node(DbName, Updates) ->
+    lists:foldl(fun(Key, OuterAcc) ->
+        Shards = mem3:shards(DbName, Key),
+        lists:foldl(fun(#shard{node=Node}, InnerAcc) ->
+            dict:append(Node, Key, InnerAcc)
+        end, OuterAcc, Shards)
+    end, dict:new(), Updates).

http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/f429cd0e/src/global_changes_server.erl
----------------------------------------------------------------------
diff --git a/src/global_changes_server.erl b/src/global_changes_server.erl
new file mode 100644
index 0000000..53bb15c
--- /dev/null
+++ b/src/global_changes_server.erl
@@ -0,0 +1,209 @@
+% Copyright 2013 Cloudant. All rights reserved.
+
+-module(global_changes_server).
+-behavior(gen_server).
+
+
+-export([
+    start_link/0
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+-export([
+    update_docs/1
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+
+-record(state, {
+    update_db,
+    pending_update_count,
+    last_update_time,
+    pending_updates,
+    max_write_delay,
+    dbname,
+    handler_ref
+}).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init([]) ->
+    {ok, Handler} = global_changes_listener:start(),
+    % get configs as strings
+    UpdateDb0 = config:get("global_changes", "update_db", "true"),
+    MaxWriteDelay0 = config:get("global_changes", "max_write_delay", "25"),
+
+    % make config strings into other data types
+    UpdateDb = case UpdateDb0 of "false" -> false; _ -> true end,
+    MaxWriteDelay = list_to_integer(MaxWriteDelay0),
+
+    State = #state{
+        update_db=UpdateDb,
+        pending_update_count=0,
+        pending_updates=sets:new(),
+        max_write_delay=MaxWriteDelay,
+        dbname=global_changes_util:get_dbname(),
+        handler_ref=erlang:monitor(process, Handler)
+    },
+    {ok, State}.
+
+
+terminate(_Reason, _Srv) ->
+    ok.
+
+
+handle_call(_Msg, _From, #state{update_db=false}=State) ->
+    {reply, ok, State};
+handle_call({update_docs, DocIds}, _From, State) ->
+    Pending = sets:union(sets:from_list(DocIds), State#state.pending_updates),
+    NewState = State#state{
+        pending_updates=Pending,
+        pending_update_count=sets:size(Pending)
+    },
+    format_reply(reply, maybe_update_docs(NewState)).
+
+
+handle_cast({set_max_write_delay, MaxWriteDelay}, State) ->
+    NewState = State#state{max_write_delay=MaxWriteDelay},
+    format_reply(noreply, maybe_update_docs(NewState));
+handle_cast({set_update_db, Boolean}, State0) ->
+    % If turning update_db off, clear out server state
+    State = case {Boolean, State0#state.update_db} of
+        {false, true} ->
+            State0#state{
+                update_db=Boolean,
+                pending_updates=sets:new(),
+                pending_update_count=0,
+                last_update_time=undefined
+            };
+        _ ->
+            State0#state{update_db=Boolean}
+    end,
+    format_reply(noreply, maybe_update_docs(State));
+handle_cast(_Msg, State) ->
+    format_reply(noreply, maybe_update_docs(State)).
+
+
+handle_info(start_listener, State) ->
+    {ok, Handler} = global_changes_listener:start(),
+    NewState = State#state{
+        handler_ref=erlang:monitor(process, Handler)
+    },
+    format_reply(noreply, maybe_update_docs(NewState));
+handle_info({'DOWN', Ref, _, _, Reason}, #state{handler_ref=Ref}=State) ->
+    couch_log:error("global_changes_listener terminated: ~w", [Reason]),
+    erlang:send_after(5000, self(), start_listener),
+    format_reply(noreply, maybe_update_docs(State));
+handle_info(_, State) ->
+    format_reply(noreply, maybe_update_docs(State)).
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+maybe_update_docs(#state{pending_update_count=0}=State) ->
+    State;
+maybe_update_docs(#state{update_db=true}=State) ->
+    #state{max_write_delay=MaxWriteDelay, last_update_time=LastUpdateTime} = State,
+    Now = os:timestamp(),
+    case LastUpdateTime of
+    undefined ->
+        {State#state{last_update_time=Now}, MaxWriteDelay};
+    _ ->
+        Delta = round(timer:now_diff(Now, LastUpdateTime)/1000),
+        if Delta >= MaxWriteDelay ->
+            DocIds = sets:to_list(State#state.pending_updates),
+            try group_ids_by_shard(State#state.dbname, DocIds) of
+            GroupedIds ->
+                Docs = dict:fold(fun(ShardName, Ids, DocInfoAcc) ->
+                    {ok, Shard} = couch_db:open(ShardName, []),
+                    try
+                        GroupedDocs = get_docs_locally(Shard, Ids),
+                        GroupedDocs ++ DocInfoAcc
+                    after
+                        couch_db:close(Shard)
+                    end
+                end, [], GroupedIds),
+
+                spawn(fun() ->
+                    fabric:update_docs(State#state.dbname, Docs, [])
+                end)
+            catch error:database_does_not_exist ->
+                ok
+            end,
+            State#state{
+                pending_updates=sets:new(),
+                pending_update_count=0,
+                last_update_time=undefined
+            };
+        true ->
+            {State, MaxWriteDelay-Delta}
+        end
+    end;
+maybe_update_docs(State) ->
+    State.
+
+
+update_docs(Updates) ->
+    gen_server:call(?MODULE, {update_docs, Updates}).
+
+
+group_ids_by_shard(DbName, DocIds) ->
+    LocalNode = node(),
+    lists:foldl(fun(DocId, Acc) ->
+        Shards = mem3:shards(DbName, DocId),
+        lists:foldl(fun
+            (#shard{node=Node, name=Name}, Acc1) when Node == LocalNode ->
+                dict:append(Name, DocId, Acc1);
+            (_, Acc1) ->
+                Acc1
+        end, Acc, Shards)
+    end, dict:new(), DocIds).
+
+
+format_reply(reply, #state{}=State) ->
+    {reply, ok, State};
+format_reply(reply, {State, Timeout}) ->
+    {reply, ok, State, Timeout};
+format_reply(noreply, #state{}=State) ->
+    {noreply, State};
+format_reply(noreply, {State, Timeout}) ->
+    {noreply, State, Timeout}.
+
+
+get_docs_locally(Shard, Ids) ->
+    lists:map(fun(Id) ->
+        DocInfo = couch_db:get_doc_info(Shard, Id),
+        #doc{id=Id, revs=get_rev(DocInfo)}
+    end, Ids).
+
+
+get_rev(not_found) ->
+    {0, []};
+get_rev({ok, #doc_info{revs=[RevInfo]}}) ->
+    {Pos, Rev} = RevInfo#rev_info.rev,
+    {Pos, [Rev]};
+get_rev({ok, #doc_info{revs=[RevInfo|_]}}) ->
+    % couch_doc:to_doc_info/1 sorts things so that the first
+    % #rev_info in the list is the "winning" revision which is
+    % the one we'd want to base our edit off of. In theory
+    % global_changes should never encounter a conflict by design
+    % but we should record if it happens in case our design isn't
+    % quite right.
+    {Pos, Rev} = RevInfo#rev_info.rev,
+    {Pos, [Rev]}.

http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/f429cd0e/src/global_changes_sup.erl
----------------------------------------------------------------------
diff --git a/src/global_changes_sup.erl b/src/global_changes_sup.erl
new file mode 100644
index 0000000..471c8ab
--- /dev/null
+++ b/src/global_changes_sup.erl
@@ -0,0 +1,35 @@
+% Copyright 2013 Cloudant. All rights reserved.
+
+-module(global_changes_sup).
+-behavior(supervisor).
+
+
+-export([start_link/0]).
+
+-export([init/1]).
+
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+init([]) ->
+    {ok, {
+        {one_for_one, 5, 10}, [
+            {
+                global_changes_server,
+                {global_changes_server, start_link, []},
+                permanent,
+                5000,
+                worker,
+                [global_changes_server]
+            },
+            {
+                global_changes_config_listener,
+                {global_changes_config_listener, start_link, []},
+                permanent,
+                5000,
+                worker,
+                [global_changes_config_listener]
+            }
+    ]}}.

http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/f429cd0e/src/global_changes_util.erl
----------------------------------------------------------------------
diff --git a/src/global_changes_util.erl b/src/global_changes_util.erl
new file mode 100644
index 0000000..46abf48
--- /dev/null
+++ b/src/global_changes_util.erl
@@ -0,0 +1,17 @@
+% Copyright 2013 Cloudant. All rights reserved.
+
+-module(global_changes_util).
+
+
+-export([get_dbname/0]).
+
+
+get_dbname() ->
+    case application:get_env(global_changes, dbname) of
+        {ok, DbName} when is_binary(DbName) ->
+            DbName;
+        {ok, DbName} when is_list(DbName) ->
+            iolist_to_binary(DbName);
+        _ ->
+            <<"global_changes">>
+    end.


Mime
View raw message