couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1171890 - in /couchdb/branches/1.2.x: share/www/ src/couchdb/ test/etap/
Date Sat, 17 Sep 2011 03:32:39 GMT
Author: fdmanana
Date: Sat Sep 17 03:32:38 2011
New Revision: 1171890

URL: http://svn.apache.org/viewvc?rev=1171890&view=rev
Log:
Improved _active_tasks API

Tasks are now free to set any properties they wish (as an
Erlang proplist). Different tasks can have different properties
and the status string doesn't exist anymore - instead client
applications can build it using more granular properties from
_active_tasks. Some of these properties are:

1) "progress" (an integer percentage, for all tasks)
2) "database" (for compactions and indexer tasks)
3) "design_document" (for indexer and view compaction tasks)
4) "source" and "target" (for replications)
5) "docs_read", "docs_written", "doc_write_failures",
   "missing_revs_found", "missing_revs_checked", "source_seq",
   "checkpointed_source_seq" and "continuous" for replications

COUCHDB-1266

This is a backport of revision 1171888 from trunk.

Modified:
    couchdb/branches/1.2.x/share/www/status.html
    couchdb/branches/1.2.x/src/couchdb/couch_db_updater.erl
    couchdb/branches/1.2.x/src/couchdb/couch_replicator.erl
    couchdb/branches/1.2.x/src/couchdb/couch_task_status.erl
    couchdb/branches/1.2.x/src/couchdb/couch_view_compactor.erl
    couchdb/branches/1.2.x/src/couchdb/couch_view_updater.erl
    couchdb/branches/1.2.x/test/etap/090-task-status.t

Modified: couchdb/branches/1.2.x/share/www/status.html
URL: http://svn.apache.org/viewvc/couchdb/branches/1.2.x/share/www/status.html?rev=1171890&r1=1171889&r2=1171890&view=diff
==============================================================================
--- couchdb/branches/1.2.x/share/www/status.html (original)
+++ couchdb/branches/1.2.x/share/www/status.html Sat Sep 17 03:32:38 2011
@@ -76,15 +76,43 @@ specific language governing permissions 
               .appendTo("#status tbody.content");
           } else {
             $.each(tasks, function(idx, task) {
+              var status, type, object;
+
+              switch (task.type) {
+              case "database_compaction":
+                type = "Database compaction";
+                object = task.database + (task.retry ? " retry" : "");
+                status = "Copied " + task.changes_done + " of " +
+                  task.total_changes + " changes (" + task.progress + "%)";
+                break;
+              case "view_compaction":
+                type = "View compaction";
+                object = task.database + ", " + task.design_document;
+                status = "Progress " + task.progress + "%";
+                break;
+              case "indexer":
+                type = "Indexer";
+                object = task.database + ", " + task.design_document;
+                status = "Processed " + task.changes_done + " of " +
+                  task.total_changes + " changes (" + task.progress + "%)";
+                break;
+              case "replication":
+                type = "Replication";
+                object = task.source + " to " + task.target;
+                status = "Checkpointed source sequence " +
+                  task.checkpointed_source_seq + ", current source sequence " +
+                  task.source_seq + ", progress " + task.progress + "%";
+              }
+
               $("<tr><th></th><td class='object'></td><td
class='started'>" +
                 "</td><td class='updated'></td><td class='pid'></td>"
+
                 "<td class='status'></td></tr>")
-                .find("th").text(task.type).end()
-                .find("td.object").text(task.task).end()
+                .find("th").text(type).end()
+                .find("td.object").text(object).end()
                 .find("td.started").text(toTaskDate(task.started_on)).end()
                 .find("td.updated").text(toTaskDate(task.updated_on)).end()
                 .find("td.pid").text(task.pid).end()
-                .find("td.status").text(task.status).end()
+                .find("td.status").text(status).end()
                 .appendTo("#status tbody.content");
             });
           }

Modified: couchdb/branches/1.2.x/src/couchdb/couch_db_updater.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/1.2.x/src/couchdb/couch_db_updater.erl?rev=1171890&r1=1171889&r2=1171890&view=diff
==============================================================================
--- couchdb/branches/1.2.x/src/couchdb/couch_db_updater.erl (original)
+++ couchdb/branches/1.2.x/src/couchdb/couch_db_updater.erl Sat Sep 17 03:32:38 2011
@@ -879,6 +879,7 @@ copy_docs(Db, #db{updater_fd = DestFd} =
             NewDb#db.docinfo_by_seq_btree, NewDocInfos, RemoveSeqs),
     {ok, FullDocInfoBTree} = couch_btree:add_remove(
             NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []),
+    update_compact_task(length(NewFullDocInfos)),
     NewDb#db{ fulldocinfo_by_id_btree=FullDocInfoBTree,
               docinfo_by_seq_btree=DocInfoBTree}.
 
@@ -896,40 +897,46 @@ copy_compact(Db, NewDb0, Retry) ->
 
     EnumBySeqFun =
     fun(#doc_info{high_seq=Seq}=DocInfo, _Offset,
-        {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize, TotalCopied}) ->
+        {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) ->
 
         AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
         if AccUncopiedSize2 >= BufferSize ->
             NewDb2 = copy_docs(
                 Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
-            TotalCopied2 = TotalCopied + 1 + length(AccUncopied),
-            couch_task_status:update("Copied ~p of ~p changes (~p%)",
-                [TotalCopied2, TotalChanges, (TotalCopied2 * 100) div TotalChanges]),
             AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
             if AccCopiedSize2 >= CheckpointAfter ->
-                {ok, {commit_data(NewDb2#db{update_seq = Seq}), [],
-                    0, 0, TotalCopied2}};
+                {ok, {commit_data(NewDb2#db{update_seq = Seq}), [], 0, 0}};
             true ->
-                {ok, {NewDb2#db{update_seq = Seq}, [],
-                    0, AccCopiedSize2, TotalCopied2}}
+                {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}}
             end;
         true ->
             {ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2,
-                AccCopiedSize, TotalCopied}}
+                AccCopiedSize}}
         end
     end,
 
-    couch_task_status:set_update_frequency(500),
+    TaskProps0 = [
+        {type, database_compaction},
+        {database, Db#db.name},
+        {progress, 0},
+        {changes_done, 0},
+        {total_changes, TotalChanges}
+    ],
+    case Retry of
+    true ->
+        couch_task_status:update([{retry, true}]);
+    false ->
+        couch_task_status:add_task(TaskProps0),
+        couch_task_status:set_update_frequency(500)
+    end,
 
-    {ok, _, {NewDb2, Uncopied, _, _, ChangesDone}} =
+    {ok, _, {NewDb2, Uncopied, _, _}} =
         couch_btree:foldl(Db#db.docinfo_by_seq_btree, EnumBySeqFun,
-            {NewDb, [], 0, 0, 0},
+            {NewDb, [], 0, 0},
             [{start_key, NewDb#db.update_seq + 1}]),
 
-    couch_task_status:update("Flushing"),
-
     NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
-    TotalChanges = ChangesDone + length(Uncopied),
+    TotalChanges = (couch_task_status:get(changes_done) - NewDb#db.update_seq),
 
     % copy misc header values
     if NewDb3#db.security /= Db#db.security ->
@@ -948,7 +955,6 @@ start_copy_compact(#db{name=Name,filepat
     ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
     case couch_file:open(CompactFile) of
     {ok, Fd} ->
-        couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary,
" retry">>, <<"Starting">>),
         Retry = true,
         case couch_file:read_header(Fd) of
         {ok, Header} ->
@@ -957,7 +963,6 @@ start_copy_compact(#db{name=Name,filepat
             ok = couch_file:write_header(Fd, Header=#db_header{})
         end;
     {error, enoent} ->
-        couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>),
         {ok, Fd} = couch_file:open(CompactFile, [create]),
         Retry = false,
         ok = couch_file:write_header(Fd, Header=#db_header{})
@@ -984,6 +989,17 @@ start_copy_compact(#db{name=Name,filepat
         start_copy_compact(CurrentDb)
     end.
 
+update_compact_task(NumChanges) ->
+    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
+    Changes2 = Changes + NumChanges,
+    Progress = case Total of
+    0 ->
+        0;
+    _ ->
+        (Changes2 * 100) div Total
+    end,
+    couch_task_status:update([{changes_done, Changes2}, {progress, Progress}]).
+
 make_doc_summary(#db{compression = Comp}, {Body0, Atts0}) ->
     Body = case couch_compress:is_compressed(Body0) of
     true ->

Modified: couchdb/branches/1.2.x/src/couchdb/couch_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/1.2.x/src/couchdb/couch_replicator.erl?rev=1171890&r1=1171889&r2=1171890&view=diff
==============================================================================
--- couchdb/branches/1.2.x/src/couchdb/couch_replicator.erl (original)
+++ couchdb/branches/1.2.x/src/couchdb/couch_replicator.erl Sat Sep 17 03:32:38 2011
@@ -213,7 +213,8 @@ do_init(#rep{options = Options, id = {Ba
         source_name = SourceName,
         target_name = TargetName,
         start_seq = {_Ts, StartSeq},
-        source_seq = SourceCurSeq
+        source_seq = SourceCurSeq,
+        committed_seq = {_, CommittedSeq}
     } = State = init_state(Rep),
 
     NumWorkers = get_value(worker_processes, Options),
@@ -240,11 +241,21 @@ do_init(#rep{options = Options, id = {Ba
         end,
         lists:seq(1, NumWorkers)),
 
-    couch_task_status:add_task(
-        "Replication",
-         io_lib:format("`~s`: `~s` -> `~s`",
-            [BaseId ++ Ext, SourceName, TargetName]),
-         io_lib:format("Processed ~p / ~p changes", [StartSeq, SourceCurSeq])),
+    couch_task_status:add_task([
+        {type, replication},
+        {source, ?l2b(SourceName)},
+        {target, ?l2b(TargetName)},
+        {continuous, get_value(continuous, Options, false)},
+        {revisions_checked, 0},
+        {missing_revisions_found, 0},
+        {docs_read, 0},
+        {docs_written, 0},
+        {doc_write_failures, 0},
+        {source_seq, SourceCurSeq},
+        {checkpointed_source_seq, CommittedSeq},
+        {progress, 0}
+    ]),
+    couch_task_status:set_update_frequency(1000),
 
     % Until OTP R14B03:
     %
@@ -337,9 +348,10 @@ handle_info({'EXIT', Pid, Reason}, #rep_
     end.
 
 
-handle_call({report_seq_done, Seq, StatsInc}, _From,
+handle_call({report_seq_done, Seq, StatsInc}, From,
     #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone,
         current_through_seq = ThroughSeq, stats = Stats} = State) ->
+    gen_server:reply(From, ok),
     {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
     [Seq | Rest] ->
         {Seq, Rest};
@@ -360,8 +372,6 @@ handle_call({report_seq_done, Seq, Stats
         [Seq, ThroughSeq, NewThroughSeq, HighestDone,
             NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
     SourceCurSeq = source_cur_seq(State),
-    couch_task_status:update(
-        "Processed ~p / ~p changes", [element(2, NewThroughSeq), SourceCurSeq]),
     NewState = State#rep_state{
         stats = sum_stats([Stats, StatsInc]),
         current_through_seq = NewThroughSeq,
@@ -369,7 +379,8 @@ handle_call({report_seq_done, Seq, Stats
         highest_seq_done = NewHighestDone,
         source_seq = SourceCurSeq
     },
-    {reply, ok, NewState}.
+    update_task(NewState),
+    {noreply, NewState}.
 
 
 handle_cast({db_compacted, DbName},
@@ -425,7 +436,7 @@ terminate(Reason, State) ->
 
 
 terminate_cleanup(State) ->
-    couch_task_status:update("Finishing"),
+    update_task(State),
     stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier),
     stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier),
     couch_api_wrap:db_close(State#rep_state.source),
@@ -684,6 +695,7 @@ do_checkpoint(State) ->
                 source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
                 target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
             },
+            update_task(NewState),
             {ok, NewState}
         catch throw:{checkpoint_commit_failure, _} = Failure ->
             Failure
@@ -858,3 +870,32 @@ source_cur_seq(#rep_state{source = #http
 source_cur_seq(#rep_state{source = Db, source_seq = Seq}) ->
     {ok, Info} = couch_api_wrap:get_db_info(Db),
     get_value(<<"update_seq">>, Info, Seq).
+
+
+update_task(State) ->
+    #rep_state{
+        current_through_seq = {_, CurSeq},
+        committed_seq = {_, CommittedSeq},
+        source_seq = SourceCurSeq,
+        stats = Stats
+    } = State,
+    couch_task_status:update([
+        {revisions_checked, Stats#rep_stats.missing_checked},
+        {missing_revisions_found, Stats#rep_stats.missing_found},
+        {docs_read, Stats#rep_stats.docs_read},
+        {docs_written, Stats#rep_stats.docs_written},
+        {doc_write_failures, Stats#rep_stats.doc_write_failures},
+        {source_seq, SourceCurSeq},
+        {checkpointed_source_seq, CommittedSeq},
+        case is_number(CurSeq) andalso is_number(SourceCurSeq) of
+        true ->
+            case SourceCurSeq of
+            0 ->
+                {progress, 0};
+            _ ->
+                {progress, (CurSeq * 100) div SourceCurSeq}
+            end;
+        false ->
+            {progress, null}
+        end
+    ]).

Modified: couchdb/branches/1.2.x/src/couchdb/couch_task_status.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/1.2.x/src/couchdb/couch_task_status.erl?rev=1171890&r1=1171889&r2=1171890&view=diff
==============================================================================
--- couchdb/branches/1.2.x/src/couchdb/couch_task_status.erl (original)
+++ couchdb/branches/1.2.x/src/couchdb/couch_task_status.erl Sat Sep 17 03:32:38 2011
@@ -13,29 +13,26 @@
 -module(couch_task_status).
 -behaviour(gen_server).
 
-% This module allows is used to track the status of long running tasks.
-% Long running tasks register (add_task/3) then update their status (update/1)
-% and the task and status is added to tasks list. When the tracked task dies
-% it will be automatically removed the tracking. To get the tasks list, use the
-% all/0 function
+% This module is used to track the status of long running tasks.
+% Long running tasks register themselves, via a call to add_task/1, and then
+% update their status properties via update/1. The status of a task is a
+% list of properties. Each property is a tuple, with the first element being
+% either an atom or a binary and the second element must be an EJSON value. When
+% a task updates its status, it can override some or all of its properties.
+% The properties {started_on, UnitTimestamp}, {updated_on, UnixTimestamp} and
+% {pid, ErlangPid} are automatically added by this module.
+% When a tracked task dies, its status will be automatically removed from
+% memory. To get the tasks list, call the all/0 function.
 
 -export([start_link/0, stop/0]).
--export([all/0, add_task/3, update/1, update/2, set_update_frequency/1]).
+-export([all/0, add_task/1, update/1, get/1, set_update_frequency/1]).
 
 -export([init/1, terminate/2, code_change/3]).
 -export([handle_call/3, handle_cast/2, handle_info/2]).
 
--import(couch_util, [to_binary/1]).
-
 -include("couch_db.hrl").
 
--record(task_status, {
-    type,
-    name,
-    start_ts,
-    update_ts,
-    status
-}).
+-define(set(L, K, V), lists:keystore(K, 1, L, {K, V})).
 
 
 start_link() ->
@@ -50,36 +47,41 @@ all() ->
     gen_server:call(?MODULE, all).
 
 
-add_task(Type, TaskName, StatusText) ->
+add_task(Props) ->
     put(task_status_update, {{0, 0, 0}, 0}),
-    Ts = now_ts(),
-    Msg = {
-        add_task,
-        #task_status{
-            type = to_binary(Type),
-            name = to_binary(TaskName),
-            status = to_binary(StatusText),
-            start_ts = Ts,
-            update_ts = Ts
-        }
-    },
-    gen_server:call(?MODULE, Msg).
+    Ts = timestamp(),
+    TaskProps = lists:ukeysort(
+        1, [{started_on, Ts}, {updated_on, Ts} | Props]),
+    put(task_status_props, TaskProps),
+    gen_server:call(?MODULE, {add_task, TaskProps}).
 
 
 set_update_frequency(Msecs) ->
     put(task_status_update, {{0, 0, 0}, Msecs * 1000}).
 
 
-update(StatusText) ->
-    update("~s", [StatusText]).
+update(Props) ->
+    MergeProps = lists:ukeysort(1, Props),
+    TaskProps = lists:ukeymerge(1, MergeProps, erlang:get(task_status_props)),
+    put(task_status_props, TaskProps),
+    maybe_persist(TaskProps).
+
+
+get(Props) when is_list(Props) ->
+    TaskProps = erlang:get(task_status_props),
+    [couch_util:get_value(P, TaskProps) || P <- Props];
+get(Prop) ->
+    TaskProps = erlang:get(task_status_props),
+    couch_util:get_value(Prop, TaskProps).
+
 
-update(Format, Data) ->
-    {LastUpdateTime, Frequency} = get(task_status_update),
+maybe_persist(TaskProps0) ->
+    {LastUpdateTime, Frequency} = erlang:get(task_status_update),
     case timer:now_diff(Now = now(), LastUpdateTime) >= Frequency of
     true ->
         put(task_status_update, {Now, Frequency}),
-        Msg = ?l2b(io_lib:format(Format, Data)),
-        gen_server:cast(?MODULE, {update_status, self(), Msg});
+        TaskProps = ?set(TaskProps0, updated_on, timestamp(Now)),
+        gen_server:cast(?MODULE, {update_status, self(), TaskProps});
     false ->
         ok
     end.
@@ -95,10 +97,10 @@ terminate(_Reason,_State) ->
     ok.
 
 
-handle_call({add_task, TaskStatus}, {From, _}, Server) ->
+handle_call({add_task, TaskProps}, {From, _}, Server) ->
     case ets:lookup(?MODULE, From) of
     [] ->
-        true = ets:insert(?MODULE, {From, TaskStatus}),
+        true = ets:insert(?MODULE, {From, TaskProps}),
         erlang:monitor(process, From),
         {reply, ok, Server};
     [_] ->
@@ -106,25 +108,23 @@ handle_call({add_task, TaskStatus}, {Fro
     end;
 handle_call(all, _, Server) ->
     All = [
-        [
-            {type, Task#task_status.type},
-            {task, Task#task_status.name},
-            {started_on, Task#task_status.start_ts},
-            {updated_on, Task#task_status.update_ts},
-            {status, Task#task_status.status},
-            {pid, ?l2b(pid_to_list(Pid))}
-        ]
+        [{pid, ?l2b(pid_to_list(Pid))} | TaskProps]
         ||
-        {Pid, Task} <- ets:tab2list(?MODULE)
+        {Pid, TaskProps} <- ets:tab2list(?MODULE)
     ],
     {reply, All, Server}.
 
 
-handle_cast({update_status, Pid, StatusText}, Server) ->
-    [{Pid, #task_status{name = TaskName} = Task}] = ets:lookup(?MODULE, Pid),
-    ?LOG_DEBUG("New task status for ~s: ~s",[TaskName, StatusText]),
-    NewTaskStatus = Task#task_status{status = StatusText, update_ts = now_ts()},
-    true = ets:insert(?MODULE, {Pid, NewTaskStatus}),
+handle_cast({update_status, Pid, NewProps}, Server) ->
+    case ets:lookup(?MODULE, Pid) of
+    [{Pid, _CurProps}] ->
+        ?LOG_DEBUG("New task status for ~p: ~p", [Pid, NewProps]),
+        true = ets:insert(?MODULE, {Pid, NewProps});
+    _ ->
+        % Task finished/died in the meanwhile and we must have received
+        % a monitor message before this call - ignore.
+        ok
+    end,
     {noreply, Server};
 handle_cast(stop, State) ->
     {stop, normal, State}.
@@ -139,6 +139,8 @@ code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 
-now_ts() ->
-    {Mega, Secs, _} = erlang:now(),
+timestamp() ->
+    timestamp(now()).
+
+timestamp({Mega, Secs, _}) ->
     Mega * 1000000 + Secs.

Modified: couchdb/branches/1.2.x/src/couchdb/couch_view_compactor.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/1.2.x/src/couchdb/couch_view_compactor.erl?rev=1171890&r1=1171889&r2=1171890&view=diff
==============================================================================
--- couchdb/branches/1.2.x/src/couchdb/couch_view_compactor.erl (original)
+++ couchdb/branches/1.2.x/src/couchdb/couch_view_compactor.erl Sat Sep 17 03:32:38 2011
@@ -16,6 +16,15 @@
 
 -export([start_compact/2, cancel_compact/2]).
 
+-record(acc, {
+   btree = nil,
+   last_id = nil,
+   kvs = [],
+   kvs_size = 0,
+   changes = 0,
+   total_changes
+}).
+
 %% @spec start_compact(DbName::binary(), GroupId:binary()) -> ok
 %% @doc Compacts the views.  GroupId must not include the _design/ prefix
 start_compact(DbName, GroupId) ->
@@ -48,37 +57,48 @@ compact_group(Group, EmptyGroup, DbName)
     {ok, DbReduce} = couch_btree:full_reduce(Db#db.fulldocinfo_by_id_btree),
     Count = element(1, DbReduce),
 
-    <<"_design", ShortName/binary>> = GroupId,
-    TaskName = <<DbName/binary, ShortName/binary>>,
-    couch_task_status:add_task(<<"View Group Compaction">>, TaskName, <<"">>),
+    TotalChanges = lists:foldl(
+        fun(View, Acc) ->
+            {ok, Kvs} = couch_view:get_row_count(View),
+            Acc + Kvs
+        end,
+        Count, Views),
+    Acc0 = #acc{total_changes = TotalChanges, btree = EmptyIdBtree},
+
+    couch_task_status:add_task([
+        {type, view_compaction},
+        {database, DbName},
+        {design_document, GroupId},
+        {progress, 0}
+    ]),
     BufferSize = list_to_integer(
         couch_config:get("view_compaction", "keyvalue_buffer_size", "2097152")),
 
-    Fun = fun({DocId, _ViewIdKeys} = KV,
-            {Bt, Acc, AccSize, TotalCopied, LastId}) ->
+    Fun = fun({DocId, _ViewIdKeys} = KV, Acc) ->
+        #acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize, last_id = LastId} = Acc,
         if DocId =:= LastId -> % COUCHDB-999
             ?LOG_ERROR("Duplicates of document `~s` detected in view group `~s`"
                 ", database `~s` - view rebuild, from scratch, is required",
                 [DocId, GroupId, DbName]),
             exit({view_duplicated_id, DocId});
         true -> ok end,
-        AccSize2 = AccSize + ?term_size(KV),
-        if AccSize2 >= BufferSize ->
-            {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV|Acc])),
-            couch_task_status:update("Copied ~p of ~p Ids (~p%)",
-                [TotalCopied, Count, (TotalCopied*100) div Count]),
-            {ok, {Bt2, [], 0, TotalCopied + 1 + length(Acc), DocId}};
+        KvsSize2 = KvsSize + ?term_size(KV),
+        if KvsSize2 >= BufferSize ->
+            {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV | Kvs])),
+            Acc2 = update_task(Acc, 1 + length(Kvs)),
+            {ok, Acc2#acc{btree = Bt2, kvs = [], kvs_size = 0, last_id = DocId}};
         true ->
-            {ok, {Bt, [KV|Acc], AccSize2, TotalCopied, DocId}}
+            {ok, Acc#acc{kvs = [KV | Kvs], kvs_size = KvsSize2, last_id = DocId}}
         end
     end,
-    {ok, _, {Bt3, Uncopied, _, _Total, _LastId}} = couch_btree:foldl(
-        IdBtree, Fun, {EmptyIdBtree, [], 0, 0, nil}),
+    {ok, _, #acc{btree = Bt3, kvs = Uncopied} = Acc1} = couch_btree:foldl(
+        IdBtree, Fun, Acc0),
     {ok, NewIdBtree} = couch_btree:add(Bt3, lists:reverse(Uncopied)),
+    Acc2 = update_task(Acc1, length(Uncopied)),
 
-    NewViews = lists:map(fun({View, EmptyView}) ->
-        compact_view(View, EmptyView, BufferSize)
-    end, lists:zip(Views, EmptyViews)),
+    {NewViews, _} = lists:mapfoldl(fun({View, EmptyView}, Acc) ->
+        compact_view(View, EmptyView, BufferSize, Acc)
+    end, Acc2, lists:zip(Views, EmptyViews)),
 
     NewGroup = EmptyGroup#group{
         id_btree=NewIdBtree,
@@ -103,26 +123,27 @@ maybe_retry_compact(#db{name = DbName} =
         end
     end.
 
-%% @spec compact_view(View, EmptyView, Retry) -> CompactView
-compact_view(View, EmptyView, BufferSize) ->
-    {ok, Count} = couch_view:get_row_count(View),
-
+%% @spec compact_view(View, EmptyView, Retry, Acc) -> {CompactView, NewAcc}
+compact_view(View, #view{btree = Bt0} = EmptyView, BufferSize, Acc0) ->
     %% Key is {Key,DocId}
-    Fun = fun(KV, {Bt, Acc, AccSize, TotalCopied}) ->
-        AccSize2 = AccSize + ?term_size(KV),
-        if AccSize2 >= BufferSize ->
-            {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV|Acc])),
-            couch_task_status:update("View #~p: copied ~p of ~p KVs (~p%)",
-                [View#view.id_num, TotalCopied, Count,
-                    (TotalCopied*100) div Count]),
-            {ok, {Bt2, [], 0, TotalCopied + 1 + length(Acc)}};
+    Fun = fun(KV, #acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize} = Acc) ->
+        KvsSize2 = KvsSize + ?term_size(KV),
+        if KvsSize2 >= BufferSize ->
+            {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV | Kvs])),
+            Acc2 = update_task(Acc, 1 + length(Kvs)),
+            {ok, Acc2#acc{btree = Bt2, kvs = [], kvs_size = 0}};
         true ->
-            {ok, {Bt, [KV|Acc], AccSize2, TotalCopied}}
+            {ok, Acc#acc{kvs = [KV | Kvs], kvs_size = KvsSize2}}
         end
     end,
 
-    {ok, _, {Bt3, Uncopied, _, _Total}} = couch_btree:foldl(
-        View#view.btree, Fun, {EmptyView#view.btree, [], 0, 0}),
-    {ok, NewBt} = couch_btree:add(Bt3, lists:reverse(Uncopied)),
-    EmptyView#view{btree = NewBt}.
-
+    {ok, _, #acc{btree = Bt, kvs = Uncopied} = Acc1} = couch_btree:foldl(
+        View#view.btree, Fun, Acc0#acc{kvs = [], kvs_size = 0, btree = Bt0}),
+    {ok, NewBt} = couch_btree:add(Bt, lists:reverse(Uncopied)),
+    Acc2 = update_task(Acc1, length(Uncopied)),
+    {EmptyView#view{btree = NewBt}, Acc2}.
+
+update_task(#acc{changes = Changes, total_changes = Total} = Acc, ChangesInc) ->
+    Changes2 = Changes + ChangesInc,
+    couch_task_status:update([{progress, (Changes2 * 100) div Total}]),
+    Acc#acc{changes = Changes2}.

Modified: couchdb/branches/1.2.x/src/couchdb/couch_view_updater.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/1.2.x/src/couchdb/couch_view_updater.erl?rev=1171890&r1=1171889&r2=1171890&view=diff
==============================================================================
--- couchdb/branches/1.2.x/src/couchdb/couch_view_updater.erl (original)
+++ couchdb/branches/1.2.x/src/couchdb/couch_view_updater.erl Sat Sep 17 03:32:38 2011
@@ -51,23 +51,23 @@ update(Owner, Group, #db{name = DbName} 
     end),
     TotalChanges = couch_db:count_changes_since(Db, Seq),
     spawn_link(fun() ->
-        couch_task_status:add_task(
-            <<"View Group Indexer">>,
-            <<DbName/binary, " ", GroupName/binary>>,
-            <<"Starting index update">>),
+        couch_task_status:add_task([
+            {type, indexer},
+            {database, DbName},
+            {design_document, GroupName},
+            {progress, 0},
+            {changes_done, 0},
+            {total_changes, TotalChanges}
+        ]),
         couch_task_status:set_update_frequency(500),
         Group2 =
         if DbPurgeSeq == PurgeSeq + 1 ->
-            couch_task_status:update(<<"Removing purged entries from view index.">>),
             purge_index(Group, Db);
         true ->
             Group
         end,
         ViewEmptyKVs = [{View, []} || View <- Group2#group.views],
-        do_writes(Self, Owner, Group2, WriteQueue,
-            Seq == 0, ViewEmptyKVs, 0, TotalChanges),
-        couch_task_status:set_update_frequency(0),
-        couch_task_status:update("Finishing.")
+        do_writes(Self, Owner, Group2, WriteQueue, Seq == 0, ViewEmptyKVs)
     end),
     % compute on all docs modified since we last computed.
     #group{ design_options = DesignOptions } = Group,
@@ -176,8 +176,7 @@ do_maps(#group{query_server = Qs} = Grou
         do_maps(Group, MapQueue, WriteQueue)
     end.
 
-do_writes(Parent, Owner, Group, WriteQueue, InitialBuild, ViewEmptyKVs,
-        ChangesDone, TotalChanges) ->
+do_writes(Parent, Owner, Group, WriteQueue, InitialBuild, ViewEmptyKVs) ->
     case couch_work_queue:dequeue(WriteQueue) of
     closed ->
         Parent ! {new_group, Group};
@@ -204,11 +203,8 @@ do_writes(Parent, Owner, Group, WriteQue
         _ ->
             ok = gen_server:cast(Owner, {partial_update, Parent, Group2})
         end,
-        ChangesDone2 = ChangesDone + length(Queue),
-        couch_task_status:update("Processed ~p of ~p changes (~p%)",
-              [ChangesDone2, TotalChanges, (ChangesDone2 * 100) div TotalChanges]),
-        do_writes(Parent, Owner, Group2, WriteQueue, InitialBuild, ViewEmptyKVs,
-            ChangesDone2, TotalChanges)
+        update_task(length(Queue)),
+        do_writes(Parent, Owner, Group2, WriteQueue, InitialBuild, ViewEmptyKVs)
     end.
 
 
@@ -275,4 +271,8 @@ write_changes(Group, ViewKeyValuesToAdd,
         end,    Group#group.views, ViewKeyValuesToAdd),
     Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2}.
 
-
+update_task(NumChanges) ->
+    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
+    Changes2 = Changes + NumChanges,
+    Progress = (Changes2 * 100) div Total,
+    couch_task_status:update([{progress, Progress}, {changes_done, Changes2}]).

Modified: couchdb/branches/1.2.x/test/etap/090-task-status.t
URL: http://svn.apache.org/viewvc/couchdb/branches/1.2.x/test/etap/090-task-status.t?rev=1171890&r1=1171889&r2=1171890&view=diff
==============================================================================
--- couchdb/branches/1.2.x/test/etap/090-task-status.t (original)
+++ couchdb/branches/1.2.x/test/etap/090-task-status.t Sat Sep 17 03:32:38 2011
@@ -15,7 +15,7 @@
 
 main(_) ->
     test_util:init_code_path(),
-    etap:plan(16),
+    etap:plan(28),
     case (catch test()) of
         ok ->
             etap:end_tests();
@@ -25,7 +25,7 @@ main(_) ->
     end,
     ok.
 
-check_status(Pid,ListPropLists) ->
+get_task_prop(Pid, Prop) ->
     From = list_to_binary(pid_to_list(Pid)),
     Element = lists:foldl(
         fun(PropList,Acc) ->
@@ -36,18 +36,28 @@ check_status(Pid,ListPropLists) ->
                     []
             end
         end,
-        [], ListPropLists
+        [], couch_task_status:all()
     ),
-    couch_util:get_value(status,hd(Element)).
+    case couch_util:get_value(Prop, hd(Element), nil) of
+    nil ->
+        etap:bail("Could not get property '" ++ couch_util:to_list(Prop) ++
+            "' for task " ++ pid_to_list(Pid));
+    Value ->
+        Value
+    end.
+
+now_ts() ->
+    {Mega, Secs, _} = erlang:now(),
+    Mega * 1000000 + Secs.
 
 loop() ->
     receive
-    {add, From} ->
-        Resp = couch_task_status:add_task("type", "task", "init"),
+    {add, Props, From} ->
+        Resp = couch_task_status:add_task(Props),
         From ! {ok, self(), Resp},
         loop();
-    {update, Status, From} ->
-        Resp = couch_task_status:update(Status),
+    {update, Props, From} ->
+        Resp = couch_task_status:update(Props),
         From ! {ok, self(), Resp},
         loop();
     {update_frequency, Msecs, From} ->
@@ -82,96 +92,159 @@ test() ->
     Pid2 = spawn(TaskUpdater),
     Pid3 = spawn(TaskUpdater),
 
-    ok = call(Pid1, add),
+    ok = call(Pid1, add, [{type, replication}, {progress, 0}]),
     etap:is(
         length(couch_task_status:all()),
         1,
         "Started a task"
     ),
+    Task1StartTime = get_task_prop(Pid1, started_on),
+    etap:is(
+        is_integer(Task1StartTime),
+        true,
+        "Task start time is defined."
+    ),
+    etap:is(
+        get_task_prop(Pid1, updated_on),
+        Task1StartTime,
+        "Task's start time is the same as the update time before an update."
+    ),
 
     etap:is(
-        call(Pid1, add),
+        call(Pid1, add, [{type, compaction}, {progress, 0}]),
         {add_task_error, already_registered},
         "Unable to register multiple tasks for a single Pid."
     ),
 
     etap:is(
-        check_status(Pid1, couch_task_status:all()),
-        <<"init">>,
-        "Task status was set to 'init'."
+        get_task_prop(Pid1, type),
+        replication,
+        "Task type is 'replication'."
     ),
-
-    call(Pid1,update,"running"),
     etap:is(
-        check_status(Pid1,couch_task_status:all()),
-        <<"running">>,
-        "Status updated to 'running'."
+        get_task_prop(Pid1, progress),
+        0,
+        "Task progress is 0."
     ),
 
+    ok = timer:sleep(1000),
+    call(Pid1, update, [{progress, 25}]),
+    etap:is(
+        get_task_prop(Pid1, progress),
+        25,
+        "Task progress is 25."
+    ),
+    etap:is(
+        get_task_prop(Pid1, updated_on) > Task1StartTime,
+        true,
+        "Task's last update time has increased after an update."
+    ),
 
-    call(Pid2,add),
+    call(Pid2, add, [{type, compaction}, {progress, 0}]),
     etap:is(
         length(couch_task_status:all()),
         2,
         "Started a second task."
     ),
-
+    Task2StartTime = get_task_prop(Pid2, started_on),
+    etap:is(
+        is_integer(Task2StartTime),
+        true,
+        "Second task's start time is defined."
+    ),
     etap:is(
-        check_status(Pid2, couch_task_status:all()),
-        <<"init">>,
-        "Second tasks's status was set to 'init'."
+        get_task_prop(Pid2, updated_on),
+        Task2StartTime,
+        "Second task's start time is the same as the update time before an update."
     ),
 
-    call(Pid2, update, "running"),
     etap:is(
-        check_status(Pid2, couch_task_status:all()),
-        <<"running">>,
-        "Second task's status updated to 'running'."
+        get_task_prop(Pid2, type),
+        compaction,
+        "Second task's type is 'compaction'."
+    ),
+    etap:is(
+        get_task_prop(Pid2, progress),
+        0,
+        "Second task's progress is 0."
     ),
 
+    ok = timer:sleep(1000),
+    call(Pid2, update, [{progress, 33}]),
+    etap:is(
+        get_task_prop(Pid2, progress),
+        33,
+        "Second task's progress updated to 33."
+    ),
+    etap:is(
+        get_task_prop(Pid2, updated_on) > Task2StartTime,
+        true,
+        "Second task's last update time has increased after an update."
+    ),
 
-    call(Pid3, add),
+    call(Pid3, add, [{type, indexer}, {progress, 0}]),
     etap:is(
         length(couch_task_status:all()),
         3,
         "Registered a third task."
     ),
-
+    Task3StartTime = get_task_prop(Pid3, started_on),
+    etap:is(
+        is_integer(Task3StartTime),
+        true,
+        "Third task's start time is defined."
+    ),
     etap:is(
-        check_status(Pid3, couch_task_status:all()),
-        <<"init">>,
-        "Third tasks's status was set to 'init'."
+        get_task_prop(Pid3, updated_on),
+        Task3StartTime,
+        "Third task's start time is the same as the update time before an update."
     ),
 
-    call(Pid3, update, "running"),
     etap:is(
-        check_status(Pid3, couch_task_status:all()),
-        <<"running">>,
-        "Third task's status updated to 'running'."
+        get_task_prop(Pid3, type),
+        indexer,
+        "Third task's type is 'indexer'."
+    ),
+    etap:is(
+        get_task_prop(Pid3, progress),
+        0,
+        "Third task's progress is 0."
     ),
 
+    ok = timer:sleep(1000),
+    call(Pid3, update, [{progress, 50}]),
+    etap:is(
+        get_task_prop(Pid3, progress),
+        50,
+        "Third task's progress updated to 50."
+    ),
+    etap:is(
+        get_task_prop(Pid3, updated_on) > Task3StartTime,
+        true,
+        "Third task's last update time has increased after an update."
+    ),
 
     call(Pid3, update_frequency, 500),
-    call(Pid3, update, "still running"),
+    call(Pid3, update, [{progress, 66}]),
     etap:is(
-        check_status(Pid3, couch_task_status:all()),
-        <<"still running">>,
-        "Third task's status updated to 'still running'."
+        get_task_prop(Pid3, progress),
+        66,
+        "Third task's progress updated to 66."
     ),
 
-    call(Pid3, update, "skip this update"),
+    call(Pid3, update, [{progress, 67}]),
     etap:is(
-        check_status(Pid3, couch_task_status:all()),
-        <<"still running">>,
-        "Status update dropped because of frequency limit."
+        get_task_prop(Pid3, progress),
+        66,
+        "Task update dropped because of frequency limit."
     ),
 
     call(Pid3, update_frequency, 0),
-    call(Pid3, update, "don't skip"),
+    call(Pid3, update, [{progress, 77}]),
     etap:is(
-        check_status(Pid3, couch_task_status:all()),
-        <<"don't skip">>,
-        "Status updated after reseting frequency limit."
+        get_task_prop(Pid3, progress),
+        77,
+        "Task updated after reseting frequency limit."
     ),
 
 



Mime
View raw message