couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dam...@apache.org
Subject svn commit: r645661 - in /incubator/couchdb/trunk: share/server/ share/www/script/ src/couchdb/
Date Mon, 07 Apr 2008 19:51:27 GMT
Author: damien
Date: Mon Apr  7 12:51:17 2008
New Revision: 645661

URL: http://svn.apache.org/viewvc?rev=645661&view=rev
Log:
Compaction. Works, but still needs queueing and better handling for long reads/writes overlapping
the compaction switchover.

Modified:
    incubator/couchdb/trunk/share/server/main.js
    incubator/couchdb/trunk/share/www/script/couch.js
    incubator/couchdb/trunk/share/www/script/couch_tests.js
    incubator/couchdb/trunk/src/couchdb/couch_db.erl
    incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl
    incubator/couchdb/trunk/src/couchdb/couch_rep.erl
    incubator/couchdb/trunk/src/couchdb/couch_stream.erl
    incubator/couchdb/trunk/src/couchdb/mod_couch.erl

Modified: incubator/couchdb/trunk/share/server/main.js
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/share/server/main.js?rev=645661&r1=645660&r2=645661&view=diff
==============================================================================
--- incubator/couchdb/trunk/share/server/main.js [utf-8] (original)
+++ incubator/couchdb/trunk/share/server/main.js [utf-8] Mon Apr  7 12:51:17 2008
@@ -68,7 +68,7 @@
       // [
       //  [["Key","Value"]],                    <- fun 1 returned 1 key value
       //  [],                                   <- fun 2 returned 0 key values
-      //  [["Key1","Value1"],["Key2","Value2"]],<- fun 3 returned 2 key values
+      //  [["Key1","Value1"],["Key2","Value2"]] <- fun 3 returned 2 key values
       // ]
       //
       var doc = cmd[1];

Modified: incubator/couchdb/trunk/share/www/script/couch.js
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/share/www/script/couch.js?rev=645661&r1=645660&r2=645661&view=diff
==============================================================================
--- incubator/couchdb/trunk/share/www/script/couch.js [utf-8] (original)
+++ incubator/couchdb/trunk/share/www/script/couch.js [utf-8] Mon Apr  7 12:51:17 2008
@@ -87,6 +87,9 @@
     var result = JSON.parse(req.responseText);
     if (req.status != 201)
       throw result;
+    for(i in docs) {
+        docs[i]._rev = result.new_revs[i].rev;
+    }
     return result;
   }
 
@@ -127,6 +130,14 @@
     var req = request("GET", this.uri + "_all_docs" + encodeOptions(options));
     var result = JSON.parse(req.responseText);
     if (req.status != 200)
+      throw result;
+    return result;
+  }
+  
+  this.compact = function() {
+    var req = request("POST", this.uri + "_compact");
+    var result = JSON.parse(req.responseText);
+    if (req.status != 202)
       throw result;
     return result;
   }

Modified: incubator/couchdb/trunk/share/www/script/couch_tests.js
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/share/www/script/couch_tests.js?rev=645661&r1=645660&r2=645661&view=diff
==============================================================================
--- incubator/couchdb/trunk/share/www/script/couch_tests.js [utf-8] (original)
+++ incubator/couchdb/trunk/share/www/script/couch_tests.js [utf-8] Mon Apr  7 12:51:17 2008
@@ -839,8 +839,33 @@
       headers: {"if-match": etag}
     });
     T(xhr.status == 202)
-  }
+  },
 
+  compact: function(debug) {
+    var db = new CouchDB("test_suite_db");
+    db.deleteDb();
+    db.createDb();
+    if (debug) debugger;
+    var docs = makeDocs(0, 10);
+    var saveResult = db.bulkSave(docs);
+    T(saveResult.ok);
+    var originalsize = db.info().disk_size;
+    
+    for(var i in docs) {
+        db.deleteDoc(docs[i]);
+    }
+    var deletesize = db.info().disk_size;
+    T(deletesize > originalsize);
+    
+    var xhr = CouchDB.request("POST", "/test_suite_db/_compact");
+    T(xhr.status == 202);
+    //compaction isn't instantaneous, loop until done
+    while(db.info().compact_running) {};
+    
+    var compactedsize = db.info().disk_size;
+    
+    T(deletesize > originalsize);
+    }
 };
 
 function makeDocs(start, end, templateDoc) {

Modified: incubator/couchdb/trunk/src/couchdb/couch_db.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_db.erl?rev=645661&r1=645660&r2=645661&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_db.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_db.erl Mon Apr  7 12:51:17 2008
@@ -105,6 +105,9 @@
 open(DbName, Filepath) ->
     start_link(DbName, Filepath, []).
 
+
+% Compaction still needs work. Right now readers and writers can get an error 
+% file compaction changeover. This doesn't need to be the case.
 start_compact(MainPid) ->
     gen_server:cast(MainPid, start_compact).
 
@@ -179,8 +182,8 @@
         {doc_count, Count},
         {doc_del_count, DelCount},
         {update_seq, SeqNum},
-        {compacting, Compactor/=nil},
-        {size, Size}
+        {compact_running, Compactor/=nil},
+        {disk_size, Size}
         ],
     {ok, InfoList}.
 
@@ -253,6 +256,7 @@
                 Doc#doc{revs=[integer_to_list(couch_util:rand32()) | Revs]}
             end
         end, Docs),
+    NewRevs = [NewRev || #doc{revs=[NewRev|_]} <- Docs2],
     DocBuckets = group_alike_docs(Docs2),
     Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
     Db = get_db(MainPid),
@@ -275,11 +279,17 @@
     
     % flush unwritten binaries to disk.
     DocBuckets3 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <-
DocBuckets2],
-    
+
     case gen_server:call(MainPid, {update_docs, DocBuckets3, Options}) of
-    ok ->
-        % return back the new rev ids, in the same order input.
-        {ok, [NewRev || #doc{revs=[NewRev|_]} <- Docs2]};
+    ok -> {ok, NewRevs};
+    retry ->
+        Db2 = get_db(MainPid),
+        DocBuckets4 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket
<- DocBuckets3],
+        % We only retry once
+        case gen_server:call(MainPid, {update_docs, DocBuckets4, Options}) of
+        ok -> {ok, NewRevs};
+        Else -> throw(Else)
+        end;
     Else->
         throw(Else)
     end.
@@ -477,7 +487,7 @@
     MainPid ! {initialized, Db2},
     update_loop(Db2).
     
-update_loop(#db{name=Name,filepath=Filepath, main_pid=MainPid}=Db) ->
+update_loop(#db{fd=Fd,name=Name,filepath=Filepath, main_pid=MainPid}=Db) ->
     receive
     {OrigFrom, update_docs, DocActions, Options} ->
         case (catch update_docs_int(Db, DocActions, Options)) of
@@ -486,6 +496,9 @@
             gen_server:reply(OrigFrom, ok),
             couch_db_update_notifier:notify({updated, Name}),
             update_loop(Db2);
+        retry ->
+            gen_server:reply(OrigFrom, retry),
+            update_loop(Db);
         conflict ->
             gen_server:reply(OrigFrom, conflict),
             update_loop(Db);
@@ -519,7 +532,17 @@
                 doc_count = Db#db.doc_count,
                 doc_del_count = Db#db.doc_del_count,
                 filepath = Filepath},
-            close_db(Db),
+            
+            couch_stream:close(Db#db.summary_stream),
+            % close file handle async.
+            % wait 5 secs before closing, allowing readers to finish
+            unlink(Fd),
+            spawn_link(fun() ->
+                receive after 5000 -> ok end,
+                couch_file:close(Fd),
+                file:delete(Filepath ++ ".old")
+                end),
+                
             ok = gen_server:call(MainPid, {db_updated, NewDb2}),
             couch_log:info("Compaction for db ~p completed.", [Name]),
             update_loop(NewDb2#db{compactor_pid=nil});
@@ -651,17 +674,29 @@
 
 flush_trees(_Db, [], AccFlushedTrees) ->
     {ok, lists:reverse(AccFlushedTrees)};
-flush_trees(Db, [Unflushed | RestUnflushed], AccFlushed) ->
+flush_trees(#db{fd=Fd}=Db, [Unflushed | RestUnflushed], AccFlushed) ->
        Flushed = couch_key_tree:map(
         fun(_Rev, Value) ->
             case Value of
             #doc{attachments=Atts,deleted=IsDeleted}=Doc ->
                 % this node value is actually an unwritten document summary,
                 % write to disk.
-                
-                % convert bins, removing the FD.
-                % All bins should have been flushed to disk already.
-                Bins = [{BinName, {BinType, BinSp, BinLen}} || {BinName, {BinType, {_Fd,
BinSp, BinLen}}} <- Atts],
+                % make sure the Fd in the written bins is the same Fd we are.
+                Bins =
+                case Atts of
+                [] -> [];
+                [{_BName, {_Type, {BinFd, _Sp, _Len}}} | _ ] when BinFd == Fd ->
+                    % convert bins, removing the FD.
+                    % All bins should have been flushed to disk already.
+                    [{BinName, {BinType, BinSp, BinLen}}
+                        || {BinName, {BinType, {_Fd, BinSp, BinLen}}}
+                        <- Atts];
+                _ ->
+                    % BinFd must not equal our Fd. This can happen when a database
+                    % is being updated during a compaction
+                    couch_log:debug("File where the attachments are written has changed.
Possibly retrying."),
+                    throw(retry)
+                end,
                 {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body,
Bins}),
                 {IsDeleted, NewSummaryPointer};
             _ ->
@@ -880,7 +915,7 @@
     fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied}) ->
         case couch_util:should_flush() of
         true ->
-            NewDb2 = copy_docs(Db, AccNewDb, lists:reverse(AccUncopied, DocInfo)),
+            NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied])),
             {ok, {commit_data(NewDb2#db{update_seq=Seq}), []}};
         false ->    
             {ok, {AccNewDb, [DocInfo | AccUncopied]}}

Modified: incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl?rev=645661&r1=645660&r2=645661&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl Mon Apr  7 12:51:17 2008
@@ -36,8 +36,6 @@
     readline(Port, []).
 
 readline(Port, Acc) ->
-    Timer = erlang:send_after(timeout(), self(), timeout),
-    Result =
     receive
     {Port, {data, {noeol, Data}}} ->
         readline(Port, [Data|Acc]);
@@ -45,20 +43,11 @@
         lists:flatten(lists:reverse(Acc, Data));
     {Port, Err} ->
         catch port_close(Port),
-        erlang:cancel_timer(Timer),
-        throw({map_process_error, Err});
-    timeout ->
+        throw({map_process_error, Err})
+    after timeout() ->
         catch port_close(Port),
         throw({map_process_error, "map function timed out"})
-    end,
-    case erlang:cancel_timer(Timer) of
-    false ->
-        % message already sent. clear it
-        receive timeout -> ok end;
-    _ ->
-        ok
-    end,
-    Result.
+    end.
 
 read_json(Port) ->
     case cjson:decode(readline(Port)) of
@@ -108,8 +97,7 @@
 
 map_docs({_Lang, Port}, Docs) ->
     % send the documents
-    Results =
-    lists:map(
+    Results = lists:map(
         fun(Doc) ->
             Json = couch_doc:to_json_obj(Doc, []),
             case prompt(Port, {"map_doc", Json}) of

Modified: incubator/couchdb/trunk/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_rep.erl?rev=645661&r1=645660&r2=645661&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_rep.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_rep.erl Mon Apr  7 12:51:17 2008
@@ -14,7 +14,7 @@
 
 -include("couch_db.hrl").
 
--export([replicate/2, replicate/3, test/0, test_write_docs/3]).
+-export([replicate/2, replicate/3]).
 
 -record(stats, {
     docs_read=0,
@@ -117,8 +117,7 @@
     end.
 
 pull_rep(DbTarget, DbSource, SourceSeqNum, Stats) ->
-    {ok, NewSeq} =
-    enum_docs_since(DbSource, SourceSeqNum,
+    {ok, NewSeq} = enum_docs_since(DbSource, SourceSeqNum,
         fun(#doc_info{update_seq=Seq}=SrcDocInfo, _, {_, AccStats}) ->
             Stats2 = maybe_save_docs(DbTarget, DbSource, SrcDocInfo, AccStats),
             {ok, {Seq, Stats2}}
@@ -136,23 +135,15 @@
     [] ->
         Stats;
     _Else ->
-        % the 'ok' below validates no unrecoverable errors (like network failure, etc).
         {ok, DocResults} = open_doc_revs(DbSource, Id, MissingRevs, [latest]),
+        % only save successful reads
+        Docs = [RevDoc || {ok, RevDoc} <- DocResults],
+        ok = save_docs(DbTarget, Docs, []),
 
-        Docs = [RevDoc || {ok, RevDoc} <- DocResults], % only match successful loads
-
-        Stats2 = Stats#stats{
+        Stats#stats{
             docs_read=Stats#stats.docs_read + length(Docs),
-            read_errors=Stats#stats.read_errors + length(DocResults) - length(Docs)},
-
-        case Docs of
-        [] ->
-            Stats2;
-        _ ->
-            % the 'ok' below validates no unrecoverable errors (like network failure, etc).
-            ok = save_docs(DbTarget, Docs, []),
-            Stats2#stats{docs_copied=Stats2#stats.docs_copied+length(Docs)}
-        end
+            read_errors=Stats#stats.read_errors + length(DocResults) - length(Docs),
+            docs_copied=Stats#stats.docs_copied + length(Docs)}
     end.
 
 
@@ -280,29 +271,3 @@
     couch_db:open_doc_revs(Db, DocId, Revs, Options).
 
 
-
-
-
-test() ->
-    couch_server:start(),
-    %{ok, LocalA} = couch_server:open("replica_a"),
-    {ok, LocalA} = couch_server:create("replica_a", [overwrite]),
-    {ok, _} = couch_server:create("replica_b", [overwrite]),
-    %DbA = "replica_a",
-    DbA = "http://localhost:5984/replica_a/",
-    %DbB = "replica_b",
-    DbB = "http://localhost:5984/replica_b/",
-    _DocUnids = test_write_docs(10, LocalA, []),
-    replicate(DbA, DbB),
-    %{ok, _Rev} = couch_db:delete_doc(LocalA, lists:nth(1, DocUnids), any),
-    % replicate(DbA, DbB),
-    ok.
-
-test_write_docs(0, _Db, Output) ->
-    lists:reverse(Output);
-test_write_docs(N, Db, Output) ->
-    Doc = #doc{
-        id=integer_to_list(N),
-        body={obj, [{"foo", integer_to_list(N)}, {"num", N}, {"bar", "blah"}]}},
-    couch_db:save_doc(Db, Doc, []),
-    test_write_docs(N-1, Db, [integer_to_list(N) | Output]).

Modified: incubator/couchdb/trunk/src/couchdb/couch_stream.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_stream.erl?rev=645661&r1=645660&r2=645661&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_stream.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_stream.erl Mon Apr  7 12:51:17 2008
@@ -79,7 +79,7 @@
     {ok, Bin, Sp2}.
 
 copy_to_new_stream(Src, Sp, Len, DestFd) ->
-    Dest = open(DestFd),
+    {ok, Dest} = open(DestFd),
     {ok, NewSp} = copy(Src, Sp, Len, Dest),
     close(Dest),
     {ok, NewSp}.

Modified: incubator/couchdb/trunk/src/couchdb/mod_couch.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/mod_couch.erl?rev=645661&r1=645660&r2=645661&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/mod_couch.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/mod_couch.erl Mon Apr  7 12:51:17 2008
@@ -195,6 +195,8 @@
     send_ok(Mod, 201);
 do(#mod{method="POST"}=Mod, #uri_parts{doc="_missing_revs"}=Parts) ->
     handle_missing_revs_request(Mod, Parts);
+do(#mod{method="POST"}=Mod, #uri_parts{doc="_compact"}=Parts) ->
+    handle_compact(Mod, Parts);
 do(#mod{method="PUT"}=Mod, #uri_parts{doc=""}=Parts) ->
     handle_db_create(Mod, Parts);
 do(#mod{method="DELETE"}=Mod, #uri_parts{doc=""}=Parts) ->
@@ -486,6 +488,10 @@
     {ok, Results} = couch_db:get_missing_revs(Db, DocIdRevs),
     JsonResults = [{Id, list_to_tuple(Revs)} || {Id, Revs} <- Results],
     send_json(Mod, 200, {obj, [{missing_revs, {obj, JsonResults}}]}).
+
+handle_compact(Mod, Parts) ->
+    ok = couch_db:start_compact(open_db(Parts)),
+    send_ok(Mod, 202).
 
 handle_replication_request(#mod{entity_body=RawJson}=Mod) ->
     {obj, Props} = cjson:decode(RawJson),



Mime
View raw message