couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fdman...@apache.org
Subject svn commit: r1171888 - in /couchdb/trunk: share/www/ src/couch_index/src/ src/couch_mrview/src/ src/couchdb/ test/etap/
Date Sat, 17 Sep 2011 03:31:13 GMT
Author: fdmanana
Date: Sat Sep 17 03:31:12 2011
New Revision: 1171888

URL: http://svn.apache.org/viewvc?rev=1171888&view=rev
Log:
Improved _active_tasks API

Tasks are now free to set any properties they wish (as an
Erlang proplist). Different tasks can have different properties
and the status string doesn't exist anymore - instead client
applications can build it using more granular properties from
_active_tasks. Some of these properties are:

1) "progress" (an integer percentage, for all tasks)
2) "database" (for compactions and indexer tasks)
3) "design_document" (for indexer and view compaction tasks)
4) "source" and "target" (for replications)
5) "docs_read", "docs_written", "doc_write_failures",
   "missing_revs_found", "missing_revs_checked", "source_seq",
   "checkpointed_source_seq" and "continuous" for replications


Modified:
    couchdb/trunk/share/www/status.html
    couchdb/trunk/src/couch_index/src/couch_index_api.erl
    couchdb/trunk/src/couch_index/src/couch_index_updater.erl
    couchdb/trunk/src/couch_mrview/src/couch_mrview_compactor.erl
    couchdb/trunk/src/couch_mrview/src/couch_mrview_index.erl
    couchdb/trunk/src/couch_mrview/src/couch_mrview_updater.erl
    couchdb/trunk/src/couchdb/couch_db_updater.erl
    couchdb/trunk/src/couchdb/couch_replicator.erl
    couchdb/trunk/src/couchdb/couch_task_status.erl
    couchdb/trunk/test/etap/090-task-status.t

Modified: couchdb/trunk/share/www/status.html
URL: http://svn.apache.org/viewvc/couchdb/trunk/share/www/status.html?rev=1171888&r1=1171887&r2=1171888&view=diff
==============================================================================
--- couchdb/trunk/share/www/status.html (original)
+++ couchdb/trunk/share/www/status.html Sat Sep 17 03:31:12 2011
@@ -76,15 +76,43 @@ specific language governing permissions 
               .appendTo("#status tbody.content");
           } else {
             $.each(tasks, function(idx, task) {
+              var status, type, object;
+
+              switch (task.type) {
+              case "database_compaction":
+                type = "Database compaction";
+                object = task.database + (task.retry ? " retry" : "");
+                status = "Copied " + task.changes_done + " of " +
+                  task.total_changes + " changes (" + task.progress + "%)";
+                break;
+              case "view_compaction":
+                type = "View compaction";
+                object = task.database + ", " + task.design_document;
+                status = "Progress " + task.progress + "%";
+                break;
+              case "indexer":
+                type = "Indexer";
+                object = task.database + ", " + task.design_document;
+                status = "Processed " + task.changes_done + " of " +
+                  task.total_changes + " changes (" + task.progress + "%)";
+                break;
+              case "replication":
+                type = "Replication";
+                object = task.source + " to " + task.target;
+                status = "Checkpointed source sequence " +
+                  task.checkpointed_source_seq + ", current source sequence " +
+                  task.source_seq + ", progress " + task.progress + "%";
+              }
+
               $("<tr><th></th><td class='object'></td><td
class='started'>" +
                 "</td><td class='updated'></td><td class='pid'></td>"
+
                 "<td class='status'></td></tr>")
-                .find("th").text(task.type).end()
-                .find("td.object").text(task.task).end()
+                .find("th").text(type).end()
+                .find("td.object").text(object).end()
                 .find("td.started").text(toTaskDate(task.started_on)).end()
                 .find("td.updated").text(toTaskDate(task.updated_on)).end()
                 .find("td.pid").text(task.pid).end()
-                .find("td.status").text(task.status).end()
+                .find("td.status").text(status).end()
                 .appendTo("#status tbody.content");
             });
           }

Modified: couchdb/trunk/src/couch_index/src/couch_index_api.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couch_index/src/couch_index_api.erl?rev=1171888&r1=1171887&r2=1171888&view=diff
==============================================================================
--- couchdb/trunk/src/couch_index/src/couch_index_api.erl (original)
+++ couchdb/trunk/src/couch_index/src/couch_index_api.erl Sat Sep 17 03:31:12 2011
@@ -29,7 +29,7 @@ reset(State) ->
     ok.
 
 
-start_update(State) ->
+start_update(State, PurgedState, NumChanges) ->
     {ok, State}.
 
 purge(PurgedIdRevs, State) ->

Modified: couchdb/trunk/src/couch_index/src/couch_index_updater.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couch_index/src/couch_index_updater.erl?rev=1171888&r1=1171887&r2=1171888&view=diff
==============================================================================
--- couchdb/trunk/src/couch_index/src/couch_index_updater.erl (original)
+++ couchdb/trunk/src/couch_index/src/couch_index_updater.erl Sat Sep 17 03:31:12 2011
@@ -121,10 +121,6 @@ update(Idx, Mod, IdxState) ->
         _ -> [conflicts, deleted_conflicts]
     end,
 
-    TaskType = <<"Indexer">>,
-    Starting = <<"Starting index update.">>,
-    couch_task_status:add_task(TaskType, Mod:get(idx_name, IdxState), Starting),
-
     couch_util:with_db(DbName, fun(Db) ->
         DbUpdateSeq = couch_db:get_update_seq(Db),
         DbCommittedSeq = couch_db:get_committed_update_seq(Db),
@@ -134,7 +130,6 @@ update(Idx, Mod, IdxState) ->
             reset -> exit(reset)
         end,
 
-        couch_task_status:set_update_frequency(500),
         NumChanges = couch_db:count_changes_since(Db, CurrSeq),
 
         LoadDoc = fun(DocInfo) ->
@@ -155,23 +150,22 @@ update(Idx, Mod, IdxState) ->
             end
         end,
 
-        Proc = fun(DocInfo, _, {IdxStateAcc, Count, _}) ->
+        Proc = fun(DocInfo, _, {IdxStateAcc, _}) ->
             HighSeq = DocInfo#doc_info.high_seq,
             case CommittedOnly and (HighSeq > DbCommittedSeq) of
                 true ->
-                    {stop, {IdxStateAcc, Count, false}};
+                    {stop, {IdxStateAcc, false}};
                 false ->
-                    update_task_status(NumChanges, Count),
                     {Doc, Seq} = LoadDoc(DocInfo),
                     {ok, NewSt} = Mod:process_doc(Doc, Seq, IdxStateAcc),
-                    {ok, {NewSt, Count+1, true}}
+                    {ok, {NewSt, true}}
             end
         end,
 
-        {ok, InitIdxState} = Mod:start_update(Idx, PurgedIdxState),
-        Acc0 = {InitIdxState, 0, true},
+        {ok, InitIdxState} = Mod:start_update(Idx, PurgedIdxState, NumChanges),
+        Acc0 = {InitIdxState, true},
         {ok, _, Acc} = couch_db:enum_docs_since(Db, CurrSeq, Proc, Acc0, []),
-        {ProcIdxSt, _, SendLast} = Acc,
+        {ProcIdxSt, SendLast} = Acc,
 
         % If we didn't bail due to hitting the last committed seq we need
         % to send our last update_seq through.
@@ -182,9 +176,6 @@ update(Idx, Mod, IdxState) ->
                 {ok, ProcIdxSt}
         end,
 
-        couch_task_status:set_update_frequency(0),
-        couch_task_status:update("Waiting for index writer to finish."),
-
         {ok, FinalIdxState} = Mod:finish_update(LastIdxSt),
         exit({updated, FinalIdxState})
     end).
@@ -197,16 +188,8 @@ purge_index(Db, Mod, IdxState) ->
         DbPurgeSeq == IdxPurgeSeq ->
             {ok, IdxState};
         DbPurgeSeq == IdxPurgeSeq + 1 ->
-            couch_task_status:update(<<"Purging index entries.">>),
             {ok, PurgedIdRevs} = couch_db:get_last_purged(Db),
             Mod:purge(Db, DbPurgeSeq, PurgedIdRevs, IdxState);
         true ->
-            couch_task_status:update(<<"Resetting index due to purge state.">>),
             reset
     end.
-
-
-update_task_status(Total, Count) ->
-    PercDone = (Count * 100) div Total,
-    Mesg = "Processed ~p of ~p changes (~p%)",
-    couch_task_status:update(Mesg, [Count, Total, PercDone]).

Modified: couchdb/trunk/src/couch_mrview/src/couch_mrview_compactor.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couch_mrview/src/couch_mrview_compactor.erl?rev=1171888&r1=1171887&r2=1171888&view=diff
==============================================================================
--- couchdb/trunk/src/couch_mrview/src/couch_mrview_compactor.erl (original)
+++ couchdb/trunk/src/couch_mrview/src/couch_mrview_compactor.erl Sat Sep 17 03:31:12 2011
@@ -17,6 +17,15 @@
 
 -export([compact/2, swap_compacted/2]).
 
+-record(acc, {
+   btree = nil,
+   last_id = nil,
+   kvs = [],
+   kvs_size = 0,
+   changes = 0,
+   total_changes
+}).
+
 
 compact(State, Opts) ->
     case lists:member(recompact, Opts) of
@@ -46,15 +55,26 @@ compact(State) ->
     } = EmptyState,
 
     {ok, Count} = couch_btree:full_reduce(IdBtree),
-    TaskName = <<DbName/binary, ":", IdxName/binary>>,
-    couch_task_status:add_task(<<"View Group Compaction">>, TaskName, <<"">>),
+    TotalChanges = lists:foldl(
+        fun(View, Acc) ->
+            {ok, Kvs} = couch_mrview_util:get_row_count(View),
+            Acc + Kvs
+        end,
+        Count, Views),
+    couch_task_status:add_task([
+        {type, view_compaction},
+        {database, DbName},
+        {design_document, IdxName},
+        {progress, 0}
+    ]),
 
     BufferSize0 = couch_config:get(
         "view_compaction", "keyvalue_buffer_size", "2097152"
     ),
     BufferSize = list_to_integer(BufferSize0),
 
-    FoldFun = fun({DocId, _} = KV, {Bt, Acc, AccSize, Copied, LastId}) ->
+    FoldFun = fun({DocId, _} = KV, Acc) ->
+        #acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize, last_id = LastId} = Acc,
         if DocId =:= LastId ->
             % COUCHDB-999 regression test
             ?LOG_ERROR("Duplicate docid `~s` detected in view group `~s`"
@@ -63,26 +83,28 @@ compact(State) ->
             ),
             exit({view_duplicate_id, DocId});
         true -> ok end,
-        AccSize2 = AccSize + ?term_size(KV),
-        case AccSize2 >= BufferSize of
+        KvsSize2 = KvsSize + ?term_size(KV),
+        case KvsSize2 >= BufferSize of
             true ->
-                {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV|Acc])),
-                couch_task_status:update("Copied ~p of ~p Ids (~p%)",
-                    [Copied, Count, (Copied * 100) div Count]),
-                {ok, {Bt2, [], 0, Copied+1+length(Acc), DocId}};
+                {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV | Kvs])),
+                Acc2 = update_task(Acc, 1 + length(Kvs)),
+                {ok, Acc2#acc{
+                    btree = Bt2, kvs = [], kvs_size = 0, last_id = DocId}};
             _ ->
-                {ok, {Bt, [KV | Acc], AccSize2, Copied, DocId}}
+                {ok, Acc#acc{
+                    kvs = [KV | Kvs], kvs_size = KvsSize2, last_id = DocId}}
         end
     end,
 
-    InitAcc = {EmptyIdBtree, [], 0, 0, nil},
+    InitAcc = #acc{total_changes = TotalChanges, btree = EmptyIdBtree},
     {ok, _, FinalAcc} = couch_btree:foldl(IdBtree, FoldFun, InitAcc),
-    {Bt3, Uncopied, _, _, _} = FinalAcc,
+    #acc{btree = Bt3, kvs = Uncopied} = FinalAcc,
     {ok, NewIdBtree} = couch_btree:add(Bt3, lists:reverse(Uncopied)),
+    FinalAcc2 = update_task(FinalAcc, length(Uncopied)),
 
-    NewViews = lists:map(fun({View, EmptyView}) ->
-        compact_view(View, EmptyView, BufferSize)
-    end, lists:zip(Views, EmptyViews)),
+    {NewViews, _} = lists:mapfoldl(fun({View, EmptyView}, Acc) ->
+        compact_view(View, EmptyView, BufferSize, Acc)
+    end, FinalAcc2, lists:zip(Views, EmptyViews)),
 
     unlink(EmptyState#mrst.fd),
     {ok, EmptyState#mrst{
@@ -109,27 +131,31 @@ recompact(State) ->
     end.
 
 
-%% @spec compact_view(View, EmptyView, Retry) -> CompactView
-compact_view(View, EmptyView, BufferSize) ->
-    {ok, Count} = couch_mrview_util:get_row_count(View),
-    Fun = fun(KV, {Bt, Acc, AccSize, Copied}) ->
-        AccSize2 = AccSize + ?term_size(KV),
-        if AccSize2 >= BufferSize ->
-            {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV|Acc])),
-            couch_task_status:update("View #~p: copied ~p of ~p KVs (~p%)",
-                [View#mrview.id_num, Copied, Count, (Copied*100) div Count]),
-            {ok, {Bt2, [], 0, Copied + 1 + length(Acc)}};
+%% @spec compact_view(View, EmptyView, Retry, Acc) -> {CompactView, NewAcc}
+compact_view(View, EmptyView, BufferSize, Acc0) ->
+    Fun = fun(KV, #acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize} = Acc) ->
+        KvsSize2 = KvsSize + ?term_size(KV),
+        if KvsSize2 >= BufferSize ->
+            {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV | Kvs])),
+            Acc2 = update_task(Acc, 1 + length(Kvs)),
+            {ok, Acc2#acc{btree = Bt2, kvs = [], kvs_size = 0}};
         true ->
-            {ok, {Bt, [KV|Acc], AccSize2, Copied}}
+            {ok, Acc#acc{kvs = [KV | Kvs], kvs_size = KvsSize2}}
         end
     end,
 
-    InitAcc = {EmptyView#mrview.btree, [], 0, 0},
+    InitAcc = Acc0#acc{kvs = [], kvs_size = 0, btree = EmptyView#mrview.btree},
     {ok, _, FinalAcc} = couch_btree:foldl(View#mrview.btree, Fun, InitAcc),
-    {Bt3, Uncopied, _, _} = FinalAcc,
+    #acc{btree = Bt3, kvs = Uncopied} = FinalAcc,
     {ok, NewBt} = couch_btree:add(Bt3, lists:reverse(Uncopied)),
+    FinalAcc2 = update_task(FinalAcc, length(Uncopied)),
+    {EmptyView#mrview{btree=NewBt}, FinalAcc2}.
+
 
-    EmptyView#mrview{btree=NewBt}.
+update_task(#acc{changes = Changes, total_changes = Total} = Acc, ChangesInc) ->
+    Changes2 = Changes + ChangesInc,
+    couch_task_status:update([{progress, (Changes2 * 100) div Total}]),
+    Acc#acc{changes = Changes2}.
 
 
 swap_compacted(OldState, NewState) ->

Modified: couchdb/trunk/src/couch_mrview/src/couch_mrview_index.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couch_mrview/src/couch_mrview_index.erl?rev=1171888&r1=1171887&r2=1171888&view=diff
==============================================================================
--- couchdb/trunk/src/couch_mrview/src/couch_mrview_index.erl (original)
+++ couchdb/trunk/src/couch_mrview/src/couch_mrview_index.erl Sat Sep 17 03:31:12 2011
@@ -15,7 +15,7 @@
 
 -export([get/2]).
 -export([init/2, open/2, close/1, reset/1, delete/1]).
--export([start_update/2, purge/4, process_doc/3, finish_update/1, commit/1]).
+-export([start_update/3, purge/4, process_doc/3, finish_update/1, commit/1]).
 -export([compact/2, swap_compacted/2]).
 
 
@@ -106,8 +106,8 @@ reset(State) ->
     end).
 
 
-start_update(PartialDest, State) ->
-    couch_mrview_updater:start_update(PartialDest, State).
+start_update(PartialDest, State, NumChanges) ->
+    couch_mrview_updater:start_update(PartialDest, State, NumChanges).
 
 
 purge(Db, PurgeSeq, PurgedIdRevs, State) ->

Modified: couchdb/trunk/src/couch_mrview/src/couch_mrview_updater.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couch_mrview/src/couch_mrview_updater.erl?rev=1171888&r1=1171887&r2=1171888&view=diff
==============================================================================
--- couchdb/trunk/src/couch_mrview/src/couch_mrview_updater.erl (original)
+++ couchdb/trunk/src/couch_mrview/src/couch_mrview_updater.erl Sat Sep 17 03:31:12 2011
@@ -12,13 +12,13 @@
 
 -module(couch_mrview_updater).
 
--export([start_update/2, purge/4, process_doc/3, finish_update/1]).
+-export([start_update/3, purge/4, process_doc/3, finish_update/1]).
 
 -include("couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
 
-start_update(Partial, State) ->
+start_update(Partial, State, NumChanges) ->
     QueueOpts = [{max_size, 100000}, {max_items, 500}],
     {ok, DocQueue} = couch_work_queue:new(QueueOpts),
     {ok, WriteQueue} = couch_work_queue:new(QueueOpts),
@@ -32,7 +32,18 @@ start_update(Partial, State) ->
     },
 
     Self = self(),
-    MapFun = fun() -> map_docs(Self, InitState) end,
+    MapFun = fun() ->
+        couch_task_status:add_task([
+            {type, indexer},
+            {database, State#mrst.db_name},
+            {design_document, State#mrst.idx_name},
+            {progress, 0},
+            {changes_done, 0},
+            {total_changes, NumChanges}
+        ]),
+        couch_task_status:set_update_frequency(500),
+        map_docs(Self, InitState)
+    end,
     WriteFun = fun() -> write_results(Self, InitState) end,
 
     spawn_link(MapFun),
@@ -136,6 +147,7 @@ map_docs(Parent, State0) ->
                     {erlang:max(Seq, SeqAcc), [{Id, Res} | Results]}
             end,
             FoldFun = fun(Docs, Acc) ->
+                update_task(length(Docs)),
                 lists:foldl(DocFun, Acc, Docs)
             end,
             Results = lists:foldl(FoldFun, {0, []}, Dequeued),
@@ -255,3 +267,16 @@ send_partial(Pid, State) when is_pid(Pid
     gen_server:cast(Pid, {new_state, State});
 send_partial(_, _) ->
     ok.
+
+
+update_task(NumChanges) ->
+    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
+    Changes2 = Changes + NumChanges,
+    Progress = case Total of
+        0 ->
+            % updater restart after compaction finishes
+            0;
+        _ ->
+            (Changes2 * 100) div Total
+    end,
+    couch_task_status:update([{progress, Progress}, {changes_done, Changes2}]).

Modified: couchdb/trunk/src/couchdb/couch_db_updater.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_db_updater.erl?rev=1171888&r1=1171887&r2=1171888&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_db_updater.erl (original)
+++ couchdb/trunk/src/couchdb/couch_db_updater.erl Sat Sep 17 03:31:12 2011
@@ -879,6 +879,7 @@ copy_docs(Db, #db{updater_fd = DestFd} =
             NewDb#db.docinfo_by_seq_btree, NewDocInfos, RemoveSeqs),
     {ok, FullDocInfoBTree} = couch_btree:add_remove(
             NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []),
+    update_compact_task(length(NewFullDocInfos)),
     NewDb#db{ fulldocinfo_by_id_btree=FullDocInfoBTree,
               docinfo_by_seq_btree=DocInfoBTree}.
 
@@ -896,40 +897,46 @@ copy_compact(Db, NewDb0, Retry) ->
 
     EnumBySeqFun =
     fun(#doc_info{high_seq=Seq}=DocInfo, _Offset,
-        {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize, TotalCopied}) ->
+        {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) ->
 
         AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
         if AccUncopiedSize2 >= BufferSize ->
             NewDb2 = copy_docs(
                 Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
-            TotalCopied2 = TotalCopied + 1 + length(AccUncopied),
-            couch_task_status:update("Copied ~p of ~p changes (~p%)",
-                [TotalCopied2, TotalChanges, (TotalCopied2 * 100) div TotalChanges]),
             AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
             if AccCopiedSize2 >= CheckpointAfter ->
-                {ok, {commit_data(NewDb2#db{update_seq = Seq}), [],
-                    0, 0, TotalCopied2}};
+                {ok, {commit_data(NewDb2#db{update_seq = Seq}), [], 0, 0}};
             true ->
-                {ok, {NewDb2#db{update_seq = Seq}, [],
-                    0, AccCopiedSize2, TotalCopied2}}
+                {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}}
             end;
         true ->
             {ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2,
-                AccCopiedSize, TotalCopied}}
+                AccCopiedSize}}
         end
     end,
 
-    couch_task_status:set_update_frequency(500),
+    TaskProps0 = [
+        {type, database_compaction},
+        {database, Db#db.name},
+        {progress, 0},
+        {changes_done, 0},
+        {total_changes, TotalChanges}
+    ],
+    case Retry of
+    true ->
+        couch_task_status:update([{retry, true}]);
+    false ->
+        couch_task_status:add_task(TaskProps0),
+        couch_task_status:set_update_frequency(500)
+    end,
 
-    {ok, _, {NewDb2, Uncopied, _, _, ChangesDone}} =
+    {ok, _, {NewDb2, Uncopied, _, _}} =
         couch_btree:foldl(Db#db.docinfo_by_seq_btree, EnumBySeqFun,
-            {NewDb, [], 0, 0, 0},
+            {NewDb, [], 0, 0},
             [{start_key, NewDb#db.update_seq + 1}]),
 
-    couch_task_status:update("Flushing"),
-
     NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
-    TotalChanges = ChangesDone + length(Uncopied),
+    TotalChanges = (couch_task_status:get(changes_done) - NewDb#db.update_seq),
 
     % copy misc header values
     if NewDb3#db.security /= Db#db.security ->
@@ -948,7 +955,6 @@ start_copy_compact(#db{name=Name,filepat
     ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
     case couch_file:open(CompactFile) of
     {ok, Fd} ->
-        couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary,
" retry">>, <<"Starting">>),
         Retry = true,
         case couch_file:read_header(Fd) of
         {ok, Header} ->
@@ -957,7 +963,6 @@ start_copy_compact(#db{name=Name,filepat
             ok = couch_file:write_header(Fd, Header=#db_header{})
         end;
     {error, enoent} ->
-        couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>),
         {ok, Fd} = couch_file:open(CompactFile, [create]),
         Retry = false,
         ok = couch_file:write_header(Fd, Header=#db_header{})
@@ -984,6 +989,17 @@ start_copy_compact(#db{name=Name,filepat
         start_copy_compact(CurrentDb)
     end.
 
+update_compact_task(NumChanges) ->
+    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
+    Changes2 = Changes + NumChanges,
+    Progress = case Total of
+    0 ->
+        0;
+    _ ->
+        (Changes2 * 100) div Total
+    end,
+    couch_task_status:update([{changes_done, Changes2}, {progress, Progress}]).
+
 make_doc_summary(#db{compression = Comp}, {Body0, Atts0}) ->
     Body = case couch_compress:is_compressed(Body0) of
     true ->

Modified: couchdb/trunk/src/couchdb/couch_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator.erl?rev=1171888&r1=1171887&r2=1171888&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator.erl Sat Sep 17 03:31:12 2011
@@ -213,7 +213,8 @@ do_init(#rep{options = Options, id = {Ba
         source_name = SourceName,
         target_name = TargetName,
         start_seq = {_Ts, StartSeq},
-        source_seq = SourceCurSeq
+        source_seq = SourceCurSeq,
+        committed_seq = {_, CommittedSeq}
     } = State = init_state(Rep),
 
     NumWorkers = get_value(worker_processes, Options),
@@ -240,11 +241,21 @@ do_init(#rep{options = Options, id = {Ba
         end,
         lists:seq(1, NumWorkers)),
 
-    couch_task_status:add_task(
-        "Replication",
-         io_lib:format("`~s`: `~s` -> `~s`",
-            [BaseId ++ Ext, SourceName, TargetName]),
-         io_lib:format("Processed ~p / ~p changes", [StartSeq, SourceCurSeq])),
+    couch_task_status:add_task([
+        {type, replication},
+        {source, ?l2b(SourceName)},
+        {target, ?l2b(TargetName)},
+        {continuous, get_value(continuous, Options, false)},
+        {revisions_checked, 0},
+        {missing_revisions_found, 0},
+        {docs_read, 0},
+        {docs_written, 0},
+        {doc_write_failures, 0},
+        {source_seq, SourceCurSeq},
+        {checkpointed_source_seq, CommittedSeq},
+        {progress, 0}
+    ]),
+    couch_task_status:set_update_frequency(1000),
 
     % Until OTP R14B03:
     %
@@ -337,9 +348,10 @@ handle_info({'EXIT', Pid, Reason}, #rep_
     end.
 
 
-handle_call({report_seq_done, Seq, StatsInc}, _From,
+handle_call({report_seq_done, Seq, StatsInc}, From,
     #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone,
         current_through_seq = ThroughSeq, stats = Stats} = State) ->
+    gen_server:reply(From, ok),
     {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
     [Seq | Rest] ->
         {Seq, Rest};
@@ -360,8 +372,6 @@ handle_call({report_seq_done, Seq, Stats
         [Seq, ThroughSeq, NewThroughSeq, HighestDone,
             NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
     SourceCurSeq = source_cur_seq(State),
-    couch_task_status:update(
-        "Processed ~p / ~p changes", [element(2, NewThroughSeq), SourceCurSeq]),
     NewState = State#rep_state{
         stats = sum_stats([Stats, StatsInc]),
         current_through_seq = NewThroughSeq,
@@ -369,7 +379,8 @@ handle_call({report_seq_done, Seq, Stats
         highest_seq_done = NewHighestDone,
         source_seq = SourceCurSeq
     },
-    {reply, ok, NewState}.
+    update_task(NewState),
+    {noreply, NewState}.
 
 
 handle_cast({db_compacted, DbName},
@@ -425,7 +436,7 @@ terminate(Reason, State) ->
 
 
 terminate_cleanup(State) ->
-    couch_task_status:update("Finishing"),
+    update_task(State),
     stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier),
     stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier),
     couch_api_wrap:db_close(State#rep_state.source),
@@ -684,6 +695,7 @@ do_checkpoint(State) ->
                 source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
                 target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
             },
+            update_task(NewState),
             {ok, NewState}
         catch throw:{checkpoint_commit_failure, _} = Failure ->
             Failure
@@ -858,3 +870,32 @@ source_cur_seq(#rep_state{source = #http
 source_cur_seq(#rep_state{source = Db, source_seq = Seq}) ->
     {ok, Info} = couch_api_wrap:get_db_info(Db),
     get_value(<<"update_seq">>, Info, Seq).
+
+
+update_task(State) ->
+    #rep_state{
+        current_through_seq = {_, CurSeq},
+        committed_seq = {_, CommittedSeq},
+        source_seq = SourceCurSeq,
+        stats = Stats
+    } = State,
+    couch_task_status:update([
+        {revisions_checked, Stats#rep_stats.missing_checked},
+        {missing_revisions_found, Stats#rep_stats.missing_found},
+        {docs_read, Stats#rep_stats.docs_read},
+        {docs_written, Stats#rep_stats.docs_written},
+        {doc_write_failures, Stats#rep_stats.doc_write_failures},
+        {source_seq, SourceCurSeq},
+        {checkpointed_source_seq, CommittedSeq},
+        case is_number(CurSeq) andalso is_number(SourceCurSeq) of
+        true ->
+            case SourceCurSeq of
+            0 ->
+                {progress, 0};
+            _ ->
+                {progress, (CurSeq * 100) div SourceCurSeq}
+            end;
+        false ->
+            {progress, null}
+        end
+    ]).

Modified: couchdb/trunk/src/couchdb/couch_task_status.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_task_status.erl?rev=1171888&r1=1171887&r2=1171888&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_task_status.erl (original)
+++ couchdb/trunk/src/couchdb/couch_task_status.erl Sat Sep 17 03:31:12 2011
@@ -13,29 +13,26 @@
 -module(couch_task_status).
 -behaviour(gen_server).
 
-% This module allows is used to track the status of long running tasks.
-% Long running tasks register (add_task/3) then update their status (update/1)
-% and the task and status is added to tasks list. When the tracked task dies
-% it will be automatically removed the tracking. To get the tasks list, use the
-% all/0 function
+% This module is used to track the status of long running tasks.
+% Long running tasks register themselves, via a call to add_task/1, and then
+% update their status properties via update/1. The status of a task is a
+% list of properties. Each property is a tuple, with the first element being
+% either an atom or a binary and the second element must be an EJSON value. When
+% a task updates its status, it can override some or all of its properties.
+% The properties {started_on, UnitTimestamp}, {updated_on, UnixTimestamp} and
+% {pid, ErlangPid} are automatically added by this module.
+% When a tracked task dies, its status will be automatically removed from
+% memory. To get the tasks list, call the all/0 function.
 
 -export([start_link/0, stop/0]).
--export([all/0, add_task/3, update/1, update/2, set_update_frequency/1]).
+-export([all/0, add_task/1, update/1, get/1, set_update_frequency/1]).
 
 -export([init/1, terminate/2, code_change/3]).
 -export([handle_call/3, handle_cast/2, handle_info/2]).
 
--import(couch_util, [to_binary/1]).
-
 -include("couch_db.hrl").
 
--record(task_status, {
-    type,
-    name,
-    start_ts,
-    update_ts,
-    status
-}).
+-define(set(L, K, V), lists:keystore(K, 1, L, {K, V})).
 
 
 start_link() ->
@@ -50,36 +47,41 @@ all() ->
     gen_server:call(?MODULE, all).
 
 
-add_task(Type, TaskName, StatusText) ->
+add_task(Props) ->
     put(task_status_update, {{0, 0, 0}, 0}),
-    Ts = now_ts(),
-    Msg = {
-        add_task,
-        #task_status{
-            type = to_binary(Type),
-            name = to_binary(TaskName),
-            status = to_binary(StatusText),
-            start_ts = Ts,
-            update_ts = Ts
-        }
-    },
-    gen_server:call(?MODULE, Msg).
+    Ts = timestamp(),
+    TaskProps = lists:ukeysort(
+        1, [{started_on, Ts}, {updated_on, Ts} | Props]),
+    put(task_status_props, TaskProps),
+    gen_server:call(?MODULE, {add_task, TaskProps}).
 
 
 set_update_frequency(Msecs) ->
     put(task_status_update, {{0, 0, 0}, Msecs * 1000}).
 
 
-update(StatusText) ->
-    update("~s", [StatusText]).
+update(Props) ->
+    MergeProps = lists:ukeysort(1, Props),
+    TaskProps = lists:ukeymerge(1, MergeProps, erlang:get(task_status_props)),
+    put(task_status_props, TaskProps),
+    maybe_persist(TaskProps).
+
+
+get(Props) when is_list(Props) ->
+    TaskProps = erlang:get(task_status_props),
+    [couch_util:get_value(P, TaskProps) || P <- Props];
+get(Prop) ->
+    TaskProps = erlang:get(task_status_props),
+    couch_util:get_value(Prop, TaskProps).
+
 
-update(Format, Data) ->
-    {LastUpdateTime, Frequency} = get(task_status_update),
+maybe_persist(TaskProps0) ->
+    {LastUpdateTime, Frequency} = erlang:get(task_status_update),
     case timer:now_diff(Now = now(), LastUpdateTime) >= Frequency of
     true ->
         put(task_status_update, {Now, Frequency}),
-        Msg = ?l2b(io_lib:format(Format, Data)),
-        gen_server:cast(?MODULE, {update_status, self(), Msg});
+        TaskProps = ?set(TaskProps0, updated_on, timestamp(Now)),
+        gen_server:cast(?MODULE, {update_status, self(), TaskProps});
     false ->
         ok
     end.
@@ -95,10 +97,10 @@ terminate(_Reason,_State) ->
     ok.
 
 
-handle_call({add_task, TaskStatus}, {From, _}, Server) ->
+handle_call({add_task, TaskProps}, {From, _}, Server) ->
     case ets:lookup(?MODULE, From) of
     [] ->
-        true = ets:insert(?MODULE, {From, TaskStatus}),
+        true = ets:insert(?MODULE, {From, TaskProps}),
         erlang:monitor(process, From),
         {reply, ok, Server};
     [_] ->
@@ -106,25 +108,23 @@ handle_call({add_task, TaskStatus}, {Fro
     end;
 handle_call(all, _, Server) ->
     All = [
-        [
-            {type, Task#task_status.type},
-            {task, Task#task_status.name},
-            {started_on, Task#task_status.start_ts},
-            {updated_on, Task#task_status.update_ts},
-            {status, Task#task_status.status},
-            {pid, ?l2b(pid_to_list(Pid))}
-        ]
+        [{pid, ?l2b(pid_to_list(Pid))} | TaskProps]
         ||
-        {Pid, Task} <- ets:tab2list(?MODULE)
+        {Pid, TaskProps} <- ets:tab2list(?MODULE)
     ],
     {reply, All, Server}.
 
 
-handle_cast({update_status, Pid, StatusText}, Server) ->
-    [{Pid, #task_status{name = TaskName} = Task}] = ets:lookup(?MODULE, Pid),
-    ?LOG_DEBUG("New task status for ~s: ~s",[TaskName, StatusText]),
-    NewTaskStatus = Task#task_status{status = StatusText, update_ts = now_ts()},
-    true = ets:insert(?MODULE, {Pid, NewTaskStatus}),
+handle_cast({update_status, Pid, NewProps}, Server) ->
+    case ets:lookup(?MODULE, Pid) of
+    [{Pid, _CurProps}] ->
+        ?LOG_DEBUG("New task status for ~p: ~p", [Pid, NewProps]),
+        true = ets:insert(?MODULE, {Pid, NewProps});
+    _ ->
+        % Task finished/died in the meanwhile and we must have received
+        % a monitor message before this call - ignore.
+        ok
+    end,
     {noreply, Server};
 handle_cast(stop, State) ->
     {stop, normal, State}.
@@ -139,6 +139,8 @@ code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 
-now_ts() ->
-    {Mega, Secs, _} = erlang:now(),
+timestamp() ->
+    timestamp(now()).
+
+timestamp({Mega, Secs, _}) ->
     Mega * 1000000 + Secs.

Modified: couchdb/trunk/test/etap/090-task-status.t
URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/090-task-status.t?rev=1171888&r1=1171887&r2=1171888&view=diff
==============================================================================
--- couchdb/trunk/test/etap/090-task-status.t (original)
+++ couchdb/trunk/test/etap/090-task-status.t Sat Sep 17 03:31:12 2011
@@ -15,7 +15,7 @@
 
 main(_) ->
     test_util:init_code_path(),
-    etap:plan(16),
+    etap:plan(28),
     case (catch test()) of
         ok ->
             etap:end_tests();
@@ -25,7 +25,7 @@ main(_) ->
     end,
     ok.
 
-check_status(Pid,ListPropLists) ->
+get_task_prop(Pid, Prop) ->
     From = list_to_binary(pid_to_list(Pid)),
     Element = lists:foldl(
         fun(PropList,Acc) ->
@@ -36,18 +36,28 @@ check_status(Pid,ListPropLists) ->
                     []
             end
         end,
-        [], ListPropLists
+        [], couch_task_status:all()
     ),
-    couch_util:get_value(status,hd(Element)).
+    case couch_util:get_value(Prop, hd(Element), nil) of
+    nil ->
+        etap:bail("Could not get property '" ++ couch_util:to_list(Prop) ++
+            "' for task " ++ pid_to_list(Pid));
+    Value ->
+        Value
+    end.
+
+now_ts() ->
+    {Mega, Secs, _} = erlang:now(),
+    Mega * 1000000 + Secs.
 
 loop() ->
     receive
-    {add, From} ->
-        Resp = couch_task_status:add_task("type", "task", "init"),
+    {add, Props, From} ->
+        Resp = couch_task_status:add_task(Props),
         From ! {ok, self(), Resp},
         loop();
-    {update, Status, From} ->
-        Resp = couch_task_status:update(Status),
+    {update, Props, From} ->
+        Resp = couch_task_status:update(Props),
         From ! {ok, self(), Resp},
         loop();
     {update_frequency, Msecs, From} ->
@@ -82,96 +92,159 @@ test() ->
     Pid2 = spawn(TaskUpdater),
     Pid3 = spawn(TaskUpdater),
 
-    ok = call(Pid1, add),
+    ok = call(Pid1, add, [{type, replication}, {progress, 0}]),
     etap:is(
         length(couch_task_status:all()),
         1,
         "Started a task"
     ),
+    Task1StartTime = get_task_prop(Pid1, started_on),
+    etap:is(
+        is_integer(Task1StartTime),
+        true,
+        "Task start time is defined."
+    ),
+    etap:is(
+        get_task_prop(Pid1, updated_on),
+        Task1StartTime,
+        "Task's start time is the same as the update time before an update."
+    ),
 
     etap:is(
-        call(Pid1, add),
+        call(Pid1, add, [{type, compaction}, {progress, 0}]),
         {add_task_error, already_registered},
         "Unable to register multiple tasks for a single Pid."
     ),
 
     etap:is(
-        check_status(Pid1, couch_task_status:all()),
-        <<"init">>,
-        "Task status was set to 'init'."
+        get_task_prop(Pid1, type),
+        replication,
+        "Task type is 'replication'."
     ),
-
-    call(Pid1,update,"running"),
     etap:is(
-        check_status(Pid1,couch_task_status:all()),
-        <<"running">>,
-        "Status updated to 'running'."
+        get_task_prop(Pid1, progress),
+        0,
+        "Task progress is 0."
     ),
 
+    ok = timer:sleep(1000),
+    call(Pid1, update, [{progress, 25}]),
+    etap:is(
+        get_task_prop(Pid1, progress),
+        25,
+        "Task progress is 25."
+    ),
+    etap:is(
+        get_task_prop(Pid1, updated_on) > Task1StartTime,
+        true,
+        "Task's last update time has increased after an update."
+    ),
 
-    call(Pid2,add),
+    call(Pid2, add, [{type, compaction}, {progress, 0}]),
     etap:is(
         length(couch_task_status:all()),
         2,
         "Started a second task."
     ),
-
+    Task2StartTime = get_task_prop(Pid2, started_on),
+    etap:is(
+        is_integer(Task2StartTime),
+        true,
+        "Second task's start time is defined."
+    ),
     etap:is(
-        check_status(Pid2, couch_task_status:all()),
-        <<"init">>,
-        "Second tasks's status was set to 'init'."
+        get_task_prop(Pid2, updated_on),
+        Task2StartTime,
+        "Second task's start time is the same as the update time before an update."
     ),
 
-    call(Pid2, update, "running"),
     etap:is(
-        check_status(Pid2, couch_task_status:all()),
-        <<"running">>,
-        "Second task's status updated to 'running'."
+        get_task_prop(Pid2, type),
+        compaction,
+        "Second task's type is 'compaction'."
+    ),
+    etap:is(
+        get_task_prop(Pid2, progress),
+        0,
+        "Second task's progress is 0."
     ),
 
+    ok = timer:sleep(1000),
+    call(Pid2, update, [{progress, 33}]),
+    etap:is(
+        get_task_prop(Pid2, progress),
+        33,
+        "Second task's progress updated to 33."
+    ),
+    etap:is(
+        get_task_prop(Pid2, updated_on) > Task2StartTime,
+        true,
+        "Second task's last update time has increased after an update."
+    ),
 
-    call(Pid3, add),
+    call(Pid3, add, [{type, indexer}, {progress, 0}]),
     etap:is(
         length(couch_task_status:all()),
         3,
         "Registered a third task."
     ),
-
+    Task3StartTime = get_task_prop(Pid3, started_on),
+    etap:is(
+        is_integer(Task3StartTime),
+        true,
+        "Third task's start time is defined."
+    ),
     etap:is(
-        check_status(Pid3, couch_task_status:all()),
-        <<"init">>,
-        "Third tasks's status was set to 'init'."
+        get_task_prop(Pid3, updated_on),
+        Task3StartTime,
+        "Third task's start time is the same as the update time before an update."
     ),
 
-    call(Pid3, update, "running"),
     etap:is(
-        check_status(Pid3, couch_task_status:all()),
-        <<"running">>,
-        "Third task's status updated to 'running'."
+        get_task_prop(Pid3, type),
+        indexer,
+        "Third task's type is 'indexer'."
+    ),
+    etap:is(
+        get_task_prop(Pid3, progress),
+        0,
+        "Third task's progress is 0."
     ),
 
+    ok = timer:sleep(1000),
+    call(Pid3, update, [{progress, 50}]),
+    etap:is(
+        get_task_prop(Pid3, progress),
+        50,
+        "Third task's progress updated to 50."
+    ),
+    etap:is(
+        get_task_prop(Pid3, updated_on) > Task3StartTime,
+        true,
+        "Third task's last update time has increased after an update."
+    ),
 
     call(Pid3, update_frequency, 500),
-    call(Pid3, update, "still running"),
+    call(Pid3, update, [{progress, 66}]),
     etap:is(
-        check_status(Pid3, couch_task_status:all()),
-        <<"still running">>,
-        "Third task's status updated to 'still running'."
+        get_task_prop(Pid3, progress),
+        66,
+        "Third task's progress updated to 66."
     ),
 
-    call(Pid3, update, "skip this update"),
+    call(Pid3, update, [{progress, 67}]),
     etap:is(
-        check_status(Pid3, couch_task_status:all()),
-        <<"still running">>,
-        "Status update dropped because of frequency limit."
+        get_task_prop(Pid3, progress),
+        66,
+        "Task update dropped because of frequency limit."
     ),
 
     call(Pid3, update_frequency, 0),
-    call(Pid3, update, "don't skip"),
+    call(Pid3, update, [{progress, 77}]),
     etap:is(
-        check_status(Pid3, couch_task_status:all()),
-        <<"don't skip">>,
-        "Status updated after reseting frequency limit."
+        get_task_prop(Pid3, progress),
+        77,
+        "Task updated after reseting frequency limit."
     ),
 
 



Mime
View raw message