couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject git commit: Reimplement global_change rate limiting
Date Wed, 13 Aug 2014 21:21:47 GMT
Repository: couchdb-global-changes
Updated Branches:
  refs/heads/windsor-merge a55e2d228 -> 7e63e9291 (forced update)


Reimplement global_change rate limiting

Rather than just checking when a change occurs to update we instead just
update after max_write_delay milliseconds.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/commit/7e63e929
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/tree/7e63e929
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/diff/7e63e929

Branch: refs/heads/windsor-merge
Commit: 7e63e9291815b93011e2c04718bbac9b23d6e322
Parents: 0324bf4
Author: Paul J. Davis <paul.joseph.davis@gmail.com>
Authored: Wed Aug 13 15:54:17 2014 -0500
Committer: Paul J. Davis <paul.joseph.davis@gmail.com>
Committed: Wed Aug 13 16:21:33 2014 -0500

----------------------------------------------------------------------
 src/global_changes_listener.erl |  2 +-
 src/global_changes_server.erl   | 99 +++++++++++++++++-------------------
 2 files changed, 48 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/7e63e929/src/global_changes_listener.erl
----------------------------------------------------------------------
diff --git a/src/global_changes_listener.erl b/src/global_changes_listener.erl
index c25a0d1..9befdfd 100644
--- a/src/global_changes_listener.erl
+++ b/src/global_changes_listener.erl
@@ -46,7 +46,7 @@ start() ->
 init(_) ->
     % get configs as strings
     UpdateDb0 = config:get("global_changes", "update_db", "true"),
-    MaxEventDelay0 = config:get("global_changes", "max_event_delay", "500"),
+    MaxEventDelay0 = config:get("global_changes", "max_event_delay", "25"),
 
     % make config strings into other data types
     UpdateDb = case UpdateDb0 of "false" -> false; _ -> true end,

http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/7e63e929/src/global_changes_server.erl
----------------------------------------------------------------------
diff --git a/src/global_changes_server.erl b/src/global_changes_server.erl
index e6a8e54..1762b76 100644
--- a/src/global_changes_server.erl
+++ b/src/global_changes_server.erl
@@ -40,7 +40,6 @@
 -record(state, {
     update_db,
     pending_update_count,
-    last_update_time,
     pending_updates,
     max_write_delay,
     dbname,
@@ -56,12 +55,15 @@ init([]) ->
     {ok, Handler} = global_changes_listener:start(),
     % get configs as strings
     UpdateDb0 = config:get("global_changes", "update_db", "true"),
-    MaxWriteDelay0 = config:get("global_changes", "max_write_delay", "500"),
+    MaxWriteDelay0 = config:get("global_changes", "max_write_delay", "25"),
 
     % make config strings into other data types
     UpdateDb = case UpdateDb0 of "false" -> false; _ -> true end,
     MaxWriteDelay = list_to_integer(MaxWriteDelay0),
 
+    % Start our write triggers
+    erlang:send_after(MaxWriteDelay, self(), flush_updates),
+
     State = #state{
         update_db=UpdateDb,
         pending_update_count=0,
@@ -89,11 +91,11 @@ handle_cast({update_docs, DocIds}, State) ->
         pending_updates=Pending,
         pending_update_count=sets:size(Pending)
     },
-    maybe_update_docs(NewState);
+    {noreply, NewState};
 
 handle_cast({set_max_write_delay, MaxWriteDelay}, State) ->
     NewState = State#state{max_write_delay=MaxWriteDelay},
-    maybe_update_docs(NewState);
+    {noreply, NewState};
 handle_cast({set_update_db, Boolean}, State0) ->
     % If turning update_db off, clear out server state
     State = case {Boolean, State0#state.update_db} of
@@ -101,76 +103,69 @@ handle_cast({set_update_db, Boolean}, State0) ->
             State0#state{
                 update_db=Boolean,
                 pending_updates=sets:new(),
-                pending_update_count=0,
-                last_update_time=undefined
+                pending_update_count=0
             };
         _ ->
             State0#state{update_db=Boolean}
     end,
-    maybe_update_docs(State);
+    {noreply, State};
 handle_cast(_Msg, State) ->
-    maybe_update_docs(State).
+    {noreply, State}.
 
 
+handle_info(flush_updates, #state{pending_update_count=0}=State) ->
+    erlang:send_after(State#state.max_write_delay, self(), flush_updates),
+    {noreply, State};
+handle_info(flush_updates, #state{update_db=false}=State) ->
+    erlang:send_after(State#state.max_write_delay, self(), flush_updates),
+    {noreply, State};
+handle_info(flush_updates, State) ->
+    erlang:send_after(State#state.max_write_delay, self(), flush_updates),
+    flush_updates(State);
 handle_info(start_listener, State) ->
     {ok, Handler} = global_changes_listener:start(),
     NewState = State#state{
         handler_ref=erlang:monitor(process, Handler)
     },
-    maybe_update_docs(NewState);
+    {noreply, NewState};
 handle_info({'DOWN', Ref, _, _, Reason}, #state{handler_ref=Ref}=State) ->
     couch_log:error("global_changes_listener terminated: ~w", [Reason]),
     erlang:send_after(5000, self(), start_listener),
-    maybe_update_docs(State);
+    {noreply, State};
 handle_info(_, State) ->
-    maybe_update_docs(State).
+    {noreply, State}.
 
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 
-maybe_update_docs(#state{pending_update_count=0}=State) ->
-    {noreply, State};
-maybe_update_docs(#state{update_db=true}=State) ->
-    #state{max_write_delay=MaxWriteDelay, last_update_time=LastUpdateTime} = State,
-    Now = os:timestamp(),
-    case LastUpdateTime of
-    undefined ->
-        {noreply, State#state{last_update_time=Now}, MaxWriteDelay};
-    _ ->
-        Delta = round(timer:now_diff(Now, LastUpdateTime)/1000),
-        if Delta >= MaxWriteDelay ->
-            DocIds = sets:to_list(State#state.pending_updates),
-            try group_ids_by_shard(State#state.dbname, DocIds) of
-            GroupedIds ->
-                Docs = dict:fold(fun(ShardName, Ids, DocInfoAcc) ->
-                    {ok, Shard} = couch_db:open(ShardName, []),
-                    try
-                        GroupedDocs = get_docs_locally(Shard, Ids),
-                        GroupedDocs ++ DocInfoAcc
-                    after
-                        couch_db:close(Shard)
-                    end
-                end, [], GroupedIds),
-
-                spawn(fun() ->
-                    fabric:update_docs(State#state.dbname, Docs, [])
-                end)
-            catch error:database_does_not_exist ->
-                {noreply, State}
-            end,
-            {noreply, State#state{
-                pending_updates=sets:new(),
-                pending_update_count=0,
-                last_update_time=undefined
-            }};
-        true ->
-            {noreply, State, MaxWriteDelay-Delta}
-        end
-    end;
-maybe_update_docs(State) ->
-    {noreply, State}.
+
+flush_updates(State) ->
+    DocIds = sets:to_list(State#state.pending_updates),
+    try group_ids_by_shard(State#state.dbname, DocIds) of
+    GroupedIds ->
+        Docs = dict:fold(fun(ShardName, Ids, DocInfoAcc) ->
+            {ok, Shard} = couch_db:open(ShardName, []),
+            try
+                GroupedDocs = get_docs_locally(Shard, Ids),
+                GroupedDocs ++ DocInfoAcc
+            after
+                couch_db:close(Shard)
+            end
+        end, [], GroupedIds),
+
+        spawn(fun() ->
+            couch_log:error("DOCS ~p", [Docs]),
+            fabric:update_docs(State#state.dbname, Docs, [])
+        end)
+    catch error:database_does_not_exist ->
+        {noreply, State}
+    end,
+    {noreply, State#state{
+        pending_updates=sets:new(),
+        pending_update_count=0
+    }}.
 
 
 update_docs(Node, Updates) ->


Mime
View raw message