couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rand...@apache.org
Subject git commit: Asynchronous file writes
Date Fri, 18 Nov 2011 08:56:57 GMT
Updated Branches:
  refs/heads/COUCHDB-1342 c2fa181d3 -> 8d698f571


Asynchronous file writes

This change updates the file module so that it can do
asynchronous writes. Basically it replies immediately
to process asking to write something to the file, with
the position where the chunks will be written to the
file, while a dedicated child process keeps collecting
chunks and write them to the file (and batching them
when possible). After issuing a series of write request
to the file module, the caller can call its 'flush'
function which will block the caller until all the
chunks it requested to write are effectively written
to the file.

This maximizes the IO subsystem, as for example, while
the updater is traversing and modifying the btrees and
doing CPU bound tasks, the writes are happening in
parallel.

Originally described at http://s.apache.org/TVu


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/8d698f57
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/8d698f57
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/8d698f57

Branch: refs/heads/COUCHDB-1342
Commit: 8d698f5713b4883686a71d830a896b0a14ef40e5
Parents: c2fa181
Author: Filipe David Borba Manana <fdmanana@apache.org>
Authored: Sat Nov 5 13:14:13 2011 +0000
Committer: Randall Leeds <randall@apache.org>
Committed: Fri Nov 18 00:56:39 2011 -0800

----------------------------------------------------------------------
 src/couch_mrview/src/couch_mrview_index.erl   |    3 +-
 src/couch_mrview/src/couch_mrview_updater.erl |    8 +-
 src/couch_mrview/src/couch_mrview_util.erl    |    1 +
 src/couchdb/couch_db.erl                      |   36 +-
 src/couchdb/couch_db.hrl                      |    1 -
 src/couchdb/couch_db_updater.erl              |   54 +--
 src/couchdb/couch_file.erl                    |  425 ++++++++++++++------
 test/etap/010-file-basics.t                   |    6 +
 test/etap/011-file-headers.t                  |    7 +
 test/etap/020-btree-basics.t                  |   21 +-
 test/etap/021-btree-reductions.t              |    1 +
 test/etap/050-stream.t                        |    1 +
 test/etap/200-view-group-no-db-leaks.t        |    1 -
 test/etap/201-view-group-shutdown.t           |    1 -
 test/etap/240-replication-compact.t           |    1 -
 15 files changed, 385 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/src/couch_mrview/src/couch_mrview_index.erl
----------------------------------------------------------------------
diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl
index a604651..21d38d1 100644
--- a/src/couch_mrview/src/couch_mrview_index.erl
+++ b/src/couch_mrview/src/couch_mrview_index.erl
@@ -128,7 +128,8 @@ finish_update(State) ->
 
 commit(State) ->
     Header = {State#mrst.sig, couch_mrview_util:make_header(State)},
-    couch_file:write_header(State#mrst.fd, Header).
+    ok = couch_file:write_header(State#mrst.fd, Header),
+    ok = couch_file:flush(State#mrst.fd).
 
 
 compact(Db, State, Opts) ->

http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/src/couch_mrview/src/couch_mrview_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 3014664..ef6f2e7 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -55,7 +55,8 @@ start_update(Partial, State, NumChanges) ->
 purge(_Db, PurgeSeq, PurgedIdRevs, State) ->
     #mrst{
         id_btree=IdBtree,
-        views=Views
+        views=Views,
+        fd=Fd
     } = State,
 
     Ids = [Id || {Id, _Revs} <- PurgedIdRevs],
@@ -87,6 +88,7 @@ purge(_Db, PurgeSeq, PurgedIdRevs, State) ->
     end,
 
     Views2 = lists:map(RemKeysFun, Views),
+    ok = couch_file:flush(Fd),
     {ok, State#mrst{
         id_btree=IdBtree2,
         views=Views2,
@@ -229,7 +231,8 @@ insert_results(DocId, [KVs | RKVs], [{Id, VKVs} | RVKVs], VKVAcc, VIdKeys) ->
 write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys) ->
     #mrst{
         id_btree=IdBtree,
-        first_build=FirstBuild
+        first_build=FirstBuild,
+        fd=Fd
     } = State,
 
     {ok, ToRemove, IdBtree2} = update_id_btree(IdBtree, DocIdKeys, FirstBuild),
@@ -245,6 +248,7 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys) ->
         View#mrview{btree=VBtree2, update_seq=NewUpdateSeq}
     end,
 
+    ok = couch_file:flush(Fd),
     State#mrst{
         views=lists:zipwith(UpdateView, State#mrst.views, ViewKVs),
         update_seq=UpdateSeq,

http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/src/couch_mrview/src/couch_mrview_util.erl
----------------------------------------------------------------------
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index a13b154..065a2cd 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -546,6 +546,7 @@ delete_file(FName) ->
 reset_index(Db, Fd, #mrst{sig=Sig}=State) ->
     ok = couch_file:truncate(Fd, 0),
     ok = couch_file:write_header(Fd, {Sig, nil}),
+    ok = couch_file:flush(Fd),
     init_state(Db, Fd, reset_state(State), nil).
 
 

http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/src/couchdb/couch_db.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 0656b52..ca6424d 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -232,7 +232,7 @@ get_full_doc_info(Db, Id) ->
     Result.
 
 get_full_doc_infos(Db, Ids) ->
-    couch_btree:lookup(by_id_btree(Db), Ids).
+    couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids).
 
 increment_update_seq(#db{update_pid=UpdatePid}) ->
     gen_server:call(UpdatePid, increment_update_seq).
@@ -267,7 +267,7 @@ get_db_info(Db) ->
         local_docs_btree = LocalBtree
     } = Db,
     {ok, Size} = couch_file:bytes(Fd),
-    {ok, DbReduction} = couch_btree:full_reduce(by_id_btree(Db)),
+    {ok, DbReduction} = couch_btree:full_reduce(IdBtree),
     InfoList = [
         {db_name, Name},
         {doc_count, element(1, DbReduction)},
@@ -301,7 +301,7 @@ sum_tree_sizes(Acc, [T | Rest]) ->
         sum_tree_sizes(Acc + Sz, Rest)
     end.
 
-get_design_docs(Db) ->
+get_design_docs(#db{fulldocinfo_by_id_btree = IdBtree} = Db) ->
     FoldFun = skip_deleted(fun
         (#full_doc_info{deleted = true}, _Reds, Acc) ->
             {ok, Acc};
@@ -311,7 +311,7 @@ get_design_docs(Db) ->
             {stop, Acc}
     end),
     KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}],
-    {ok, _, Docs} = couch_btree:fold(by_id_btree(Db), FoldFun, [], KeyOpts),
+    {ok, _, Docs} = couch_btree:fold(IdBtree, FoldFun, [], KeyOpts),
     Docs.
 
 check_is_admin(#db{user_ctx=#user_ctx{name=Name,roles=Roles}}=Db) ->
@@ -702,7 +702,7 @@ update_docs(Db, Docs, Options, replicated_changes) ->
         DocErrors = [],
         DocBuckets3 = DocBuckets
     end,
-    DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.updater_fd)
+    DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd)
             || Doc <- Bucket] || Bucket <- DocBuckets3],
     {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
     {ok, DocErrors};
@@ -758,7 +758,7 @@ update_docs(Db, Docs, Options, interactive_edit) ->
                 true -> [] end ++ Options,
         DocBuckets3 = [[
                 doc_flush_atts(set_new_att_revpos(
-                        check_dup_atts(Doc)), Db#db.updater_fd)
+                        check_dup_atts(Doc)), Db#db.fd)
                 || Doc <- B] || B <- DocBuckets2],
         {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
         
@@ -832,7 +832,7 @@ write_and_commit(#db{update_pid=UpdatePid}=Db, DocBuckets1,
             % compaction. Retry by reopening the db and writing to the current file
             {ok, Db2} = open_ref_counted(Db#db.main_pid, self()),
             DocBuckets2 = [
-                [doc_flush_atts(Doc, Db2#db.updater_fd) || Doc <- Bucket] ||
+                [doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] ||
                 Bucket <- DocBuckets1
             ],
             % We only retry once
@@ -1037,12 +1037,12 @@ changes_since(Db, StartSeq, Fun, Acc) ->
     
 changes_since(Db, StartSeq, Fun, Options, Acc) ->
     Wrapper = fun(DocInfo, _Offset, Acc2) -> Fun(DocInfo, Acc2) end,
-    {ok, _LastReduction, AccOut} = couch_btree:fold(by_seq_btree(Db),
+    {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree,
         Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options),
     {ok, AccOut}.
 
 count_changes_since(Db, SinceSeq) ->
-    BTree = by_seq_btree(Db),
+    BTree = Db#db.docinfo_by_seq_btree,
     {ok, Changes} =
     couch_btree:fold_reduce(BTree,
         fun(_SeqStart, PartialReds, 0) ->
@@ -1053,13 +1053,14 @@ count_changes_since(Db, SinceSeq) ->
 
 enum_docs_since(Db, SinceSeq, InFun, Acc, Options) ->
     {ok, LastReduction, AccOut} = couch_btree:fold(
-        by_seq_btree(Db), InFun, Acc, [{start_key, SinceSeq + 1} | Options]),
+        Db#db.docinfo_by_seq_btree, InFun, Acc,
+        [{start_key, SinceSeq + 1} | Options]),
     {ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}.
 
 enum_docs(Db, InFun, InAcc, Options) ->
     FoldFun = skip_deleted(InFun),
     {ok, LastReduce, OutAcc} = couch_btree:fold(
-        by_id_btree(Db), FoldFun, InAcc, Options),
+        Db#db.fulldocinfo_by_id_btree, FoldFun, InAcc, Options),
     {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}.
 
 % server functions
@@ -1161,7 +1162,7 @@ open_doc_revs_int(Db, IdRevs, Options) ->
         IdRevs, LookupResults).
 
 open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, Options) ->
-    case couch_btree:lookup(local_btree(Db), [Id]) of
+    case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of
     [{ok, {_, {Rev, BodyData}}}] ->
         Doc = #doc{id=Id, revs={0, [?l2b(integer_to_list(Rev))]}, body=BodyData},
         apply_open_options({ok, Doc}, Options);
@@ -1232,7 +1233,7 @@ read_doc(#db{fd=Fd}, Pos) ->
     couch_file:pread_term(Fd, Pos).
 
 
-make_doc(#db{updater_fd = Fd} = Db, Id, Deleted, Bp, RevisionPath) ->
+make_doc(#db{fd = Fd} = Db, Id, Deleted, Bp, RevisionPath) ->
     {BodyData, Atts} =
     case Bp of
     nil ->
@@ -1303,15 +1304,6 @@ increment_stat(#db{options = Options}, Stat) ->
         couch_stats_collector:increment(Stat)
     end.
 
-local_btree(#db{local_docs_btree = BTree, fd = ReaderFd}) ->
-    BTree#btree{fd = ReaderFd}.
-
-by_seq_btree(#db{docinfo_by_seq_btree = BTree, fd = ReaderFd}) ->
-    BTree#btree{fd = ReaderFd}.
-
-by_id_btree(#db{fulldocinfo_by_id_btree = BTree, fd = ReaderFd}) ->
-    BTree#btree{fd = ReaderFd}.
-
 skip_deleted(FoldFun) ->
     fun
         (visit, KV, Reds, Acc) ->

http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/src/couchdb/couch_db.hrl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index cc97351..d14d5b3 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -170,7 +170,6 @@
     compactor_pid = nil,
     instance_start_time, % number of microsecs since jan 1 1970 as a binary string
     fd,
-    updater_fd,
     fd_ref_counter,
     header = #db_header{},
     committed_update_seq,

http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/src/couchdb/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index e15a944..03a04aa 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -42,14 +42,12 @@ init({MainPid, DbName, Filepath, Fd, Options}) ->
             file:delete(Filepath ++ ".compact")
         end
     end,
-    ReaderFd = open_reader_fd(Filepath, Options),
-    Db = init_db(DbName, Filepath, Fd, ReaderFd, Header, Options),
+    Db = init_db(DbName, Filepath, Fd, Header, Options),
     Db2 = refresh_validate_doc_funs(Db),
     {ok, Db2#db{main_pid = MainPid}}.
 
 
 terminate(_Reason, Db) ->
-    ok = couch_file:close(Db#db.updater_fd),
     ok = couch_file:close(Db#db.fd),
     couch_util:shutdown_sync(Db#db.compactor_pid),
     couch_util:shutdown_sync(Db#db.fd_ref_counter),
@@ -69,7 +67,7 @@ handle_call(increment_update_seq, _From, Db) ->
 
 handle_call({set_security, NewSec}, _From, #db{compression = Comp} = Db) ->
     {ok, Ptr, _} = couch_file:append_term(
-        Db#db.updater_fd, NewSec, [{compression, Comp}]),
+        Db#db.fd, NewSec, [{compression, Comp}]),
     Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr,
             update_seq=Db#db.update_seq+1}),
     ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
@@ -86,7 +84,7 @@ handle_call({purge_docs, _IdRevs}, _From,
     {reply, {error, purge_during_compaction}, Db};
 handle_call({purge_docs, IdRevs}, _From, Db) ->
     #db{
-        updater_fd = Fd,
+        fd = Fd,
         fulldocinfo_by_id_btree = DocInfoByIdBTree,
         docinfo_by_seq_btree = DocInfoBySeqBTree,
         update_seq = LastSeq,
@@ -175,10 +173,9 @@ handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) ->
 
 handle_call({compact_done, CompactFilepath}, _From, #db{filepath=Filepath}=Db) ->
     {ok, NewFd} = couch_file:open(CompactFilepath),
-    ReaderFd = open_reader_fd(CompactFilepath, Db#db.options),
     {ok, NewHeader} = couch_file:read_header(NewFd),
     #db{update_seq=NewSeq} = NewDb =
-        init_db(Db#db.name, Filepath, NewFd, ReaderFd, NewHeader, Db#db.options),
+        init_db(Db#db.name, Filepath, NewFd, NewHeader, Db#db.options),
     unlink(NewFd),
     case Db#db.update_seq == NewSeq of
     true ->
@@ -414,7 +411,7 @@ simple_upgrade_record(Old, _New) ->
 -define(OLD_DISK_VERSION_ERROR,
     "Database files from versions smaller than 0.10.0 are no longer supported").
 
-init_db(DbName, Filepath, Fd, ReaderFd, Header0, Options) ->
+init_db(DbName, Filepath, Fd, Header0, Options) ->
     Header1 = simple_upgrade_record(Header0, #db_header{}),
     Header =
     case element(2, Header1) of
@@ -461,11 +458,10 @@ init_db(DbName, Filepath, Fd, ReaderFd, Header0, Options) ->
     {MegaSecs, Secs, MicroSecs} = now(),
     StartTime = ?l2b(io_lib:format("~p",
             [(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])),
-    {ok, RefCntr} = couch_ref_counter:start([Fd, ReaderFd]),
+    {ok, RefCntr} = couch_ref_counter:start([Fd]),
     #db{
         update_pid=self(),
-        fd = ReaderFd,
-        updater_fd = Fd,
+        fd = Fd,
         fd_ref_counter = RefCntr,
         header=Header,
         fulldocinfo_by_id_btree = IdBtree,
@@ -484,15 +480,6 @@ init_db(DbName, Filepath, Fd, ReaderFd, Header0, Options) ->
         compression = Compression
         }.
 
-open_reader_fd(Filepath, Options) ->
-    {ok, Fd} = case lists:member(sys_db, Options) of
-    true ->
-        couch_file:open(Filepath, [read_only, sys_db]);
-    false ->
-        couch_file:open(Filepath, [read_only])
-    end,
-    unlink(Fd),
-    Fd.
 
 close_db(#db{fd_ref_counter = RefCntr}) ->
     couch_ref_counter:drop(RefCntr).
@@ -515,7 +502,7 @@ refresh_validate_doc_funs(Db) ->
 
 flush_trees(_Db, [], AccFlushedTrees) ->
     {ok, lists:reverse(AccFlushedTrees)};
-flush_trees(#db{updater_fd = Fd} = Db,
+flush_trees(#db{fd = Fd} = Db,
         [InfoUnflushed | RestUnflushed], AccFlushed) ->
     #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
     {Flushed, LeafsSize} = couch_key_tree:mapfold(
@@ -672,7 +659,8 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
         fulldocinfo_by_id_btree = DocInfoByIdBTree,
         docinfo_by_seq_btree = DocInfoBySeqBTree,
         update_seq = LastSeq,
-        revs_limit = RevsLimit
+        revs_limit = RevsLimit,
+        fd = Fd
         } = Db,
     Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList],
     % lookup up the old documents, if they exist.
@@ -707,6 +695,7 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
         fulldocinfo_by_id_btree = DocInfoByIdBTree2,
         docinfo_by_seq_btree = DocInfoBySeqBTree2,
         update_seq = NewSeq},
+    couch_file:flush(Fd),
 
     % Check if we just updated any design documents, and update the validation
     % funs if we did.
@@ -782,8 +771,7 @@ commit_data(Db, true) ->
     Db;
 commit_data(Db, _) ->
     #db{
-        updater_fd = Fd,
-        filepath = Filepath,
+        fd = Fd,
         header = OldHeader,
         fsync_options = FsyncOptions,
         waiting_delayed_commit = Timer
@@ -794,14 +782,15 @@ commit_data(Db, _) ->
         Db#db{waiting_delayed_commit=nil};
     Header ->
         case lists:member(before_header, FsyncOptions) of
-        true -> ok = couch_file:sync(Filepath);
+        true -> ok = couch_file:sync(Fd);
         _    -> ok
         end,
 
         ok = couch_file:write_header(Fd, Header),
+        ok = couch_file:flush(Fd),
 
         case lists:member(after_header, FsyncOptions) of
-        true -> ok = couch_file:sync(Filepath);
+        true -> ok = couch_file:sync(Fd);
         _    -> ok
         end,
 
@@ -811,7 +800,7 @@ commit_data(Db, _) ->
     end.
 
 
-copy_doc_attachments(#db{updater_fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
+copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
     {ok, {BodyData, BinInfos0}} = couch_db:read_doc(SrcDb, SrcSp),
     BinInfos = case BinInfos0 of
     _ when is_binary(BinInfos0) ->
@@ -844,11 +833,12 @@ copy_doc_attachments(#db{updater_fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
         end, BinInfos),
     {BodyData, NewBinInfos}.
 
-copy_docs(Db, #db{updater_fd = DestFd} = NewDb, InfoBySeq0, Retry) ->
+copy_docs(Db, #db{fd = DestFd} = NewDb, InfoBySeq0, Retry) ->
     % COUCHDB-968, make sure we prune duplicates during compaction
     InfoBySeq = lists:usort(fun(#doc_info{id=A}, #doc_info{id=B}) -> A =< B end,
         InfoBySeq0),
     Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
+    ok = couch_file:flush(DestFd),
     LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
 
     NewFullDocInfos1 = lists:map(
@@ -898,6 +888,8 @@ copy_docs(Db, #db{updater_fd = DestFd} = NewDb, InfoBySeq0, Retry) ->
 copy_compact(Db, NewDb0, Retry) ->
     FsyncOptions = [Op || Op <- NewDb0#db.fsync_options, Op == before_header],
     NewDb = NewDb0#db{fsync_options=FsyncOptions},
+    ok = couch_file:flush(Db#db.fd),
+    ok = couch_file:flush(NewDb#db.fd),
     TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
     BufferSize = list_to_integer(
         couch_config:get("database_compaction", "doc_buffer_size", "524288")),
@@ -956,13 +948,14 @@ copy_compact(Db, NewDb0, Retry) ->
     % copy misc header values
     if NewDb3#db.security /= Db#db.security ->
         {ok, Ptr, _} = couch_file:append_term(
-            NewDb3#db.updater_fd, Db#db.security,
+            NewDb3#db.fd, Db#db.security,
             [{compression, NewDb3#db.compression}]),
         NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
     true ->
         NewDb4 = NewDb3
     end,
 
+    ok = couch_file:flush(NewDb4#db.fd),
     commit_data(NewDb4#db{update_seq=Db#db.update_seq}).
 
 start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=PurgeSeq}}=Db) ->
@@ -982,8 +975,7 @@ start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=P
         Retry = false,
         ok = couch_file:write_header(Fd, Header=#db_header{})
     end,
-    ReaderFd = open_reader_fd(CompactFile, Db#db.options),
-    NewDb = init_db(Name, CompactFile, Fd, ReaderFd, Header, Db#db.options),
+    NewDb = init_db(Name, CompactFile, Fd, Header, Db#db.options),
     NewDb2 = if PurgeSeq > 0 ->
         {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
         {ok, Pointer, _} = couch_file:append_term(

http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/src/couchdb/couch_file.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl
index 2c2f11a..253ee17 100644
--- a/src/couchdb/couch_file.erl
+++ b/src/couchdb/couch_file.erl
@@ -17,13 +17,20 @@
 
 -define(SIZE_BLOCK, 4096).
 
+% Queue a max of 1 meg of data before blocking write callers. Prevents memory
+% blowup when streaming in large binaries, etc with slow disk IO.
+-define(MAX_QUEUED_BYTES, 1048576).
+
 -record(file, {
-    fd,
-    eof = 0
+    reader = nil,
+    writer = nil,
+    eof = 0,
+    queued_write_bytes = 0,
+    blocked_writers = []
 }).
 
 % public API
--export([open/1, open/2, close/1, bytes/1, sync/1, truncate/2]).
+-export([open/1, open/2, close/1, bytes/1, flush/1, sync/1, truncate/2]).
 -export([pread_term/2, pread_iolist/2, pread_binary/2]).
 -export([append_binary/2, append_binary_md5/2]).
 -export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]).
@@ -143,8 +150,8 @@ pread_binary(Fd, Pos) ->
 
 pread_iolist(Fd, Pos) ->
     case gen_server:call(Fd, {pread_iolist, Pos}, infinity) of
-    {ok, IoList, <<>>} ->
-        {ok, IoList};
+    {ok, _IoList} = Ok ->
+        Ok;
     {ok, IoList, Md5} ->
         case couch_util:md5(IoList) of
         Md5 ->
@@ -181,13 +188,19 @@ truncate(Fd, Pos) ->
 %%  or {error, Reason}.
 %%----------------------------------------------------------------------
 
-sync(Filepath) when is_list(Filepath) ->
-    {ok, Fd} = file:open(Filepath, [append, raw]),
-    try ok = file:sync(Fd) after ok = file:close(Fd) end;
 sync(Fd) ->
     gen_server:call(Fd, sync, infinity).
 
 %%----------------------------------------------------------------------
+%% Purpose: Ensure that all the data the caller previously asked to write
+%% to the file were flushed to disk (not necessarily fsync'ed).
+%% Returns: ok
+%%----------------------------------------------------------------------
+
+flush(Fd) ->
+    gen_server:call(Fd, flush, infinity).
+
+%%----------------------------------------------------------------------
 %% Purpose: Close the file.
 %% Returns: ok
 %%----------------------------------------------------------------------
@@ -247,6 +260,14 @@ read_header(Fd) ->
     case gen_server:call(Fd, find_header, infinity) of
     {ok, Bin} ->
         {ok, binary_to_term(Bin)};
+    no_valid_header ->
+        flush(Fd),
+        case gen_server:call(Fd, find_header, infinity) of
+        {ok, Bin} ->
+            {ok, binary_to_term(Bin)};
+        Else ->
+            Else
+        end;
     Else ->
         Else
     end.
@@ -256,7 +277,7 @@ write_header(Fd, Data) ->
     Md5 = couch_util:md5(Bin),
     % now we assemble the final header binary and write to disk
     FinalBin = <<Md5/binary, Bin/binary>>,
-    gen_server:call(Fd, {write_header, FinalBin}, infinity).
+    ok = gen_server:call(Fd, {write_header, FinalBin}, infinity).
 
 
 
@@ -268,12 +289,23 @@ init_status_error(ReturnPid, Ref, Error) ->
 % server functions
 
 init({Filepath, Options, ReturnPid, Ref}) ->
-    process_flag(trap_exit, true),
-    OpenOptions = file_open_options(Options),
+   try
+       maybe_create_file(Filepath, Options),
+       process_flag(trap_exit, true),
+       Reader = spawn_reader(Filepath),
+       {Writer, Eof} = spawn_writer(Filepath),
+       maybe_track_open_os_files(Options),
+       {ok, #file{reader = Reader, writer = Writer, eof = Eof}}
+   catch
+   throw:{error, Err} ->
+       init_status_error(ReturnPid, Ref, Err)
+   end.
+
+maybe_create_file(Filepath, Options) ->
     case lists:member(create, Options) of
     true ->
         filelib:ensure_dir(Filepath),
-        case file:open(Filepath, OpenOptions) of
+        case file:open(Filepath, [read, write, binary]) of
         {ok, Fd} ->
             {ok, Length} = file:position(Fd, eof),
             case Length > 0 of
@@ -285,40 +317,19 @@ init({Filepath, Options, ReturnPid, Ref}) ->
                 true ->
                     {ok, 0} = file:position(Fd, 0),
                     ok = file:truncate(Fd),
-                    ok = file:sync(Fd),
-                    maybe_track_open_os_files(Options),
-                    {ok, #file{fd=Fd}};
+                    ok = file:sync(Fd);
                 false ->
                     ok = file:close(Fd),
-                    init_status_error(ReturnPid, Ref, file_exists)
+                    throw({error, file_exists})
                 end;
             false ->
-                maybe_track_open_os_files(Options),
-                {ok, #file{fd=Fd}}
+                ok
             end;
         Error ->
-            init_status_error(ReturnPid, Ref, Error)
+            throw({error, Error})
         end;
     false ->
-        % open in read mode first, so we don't create the file if it doesn't exist.
-        case file:open(Filepath, [read, raw]) of
-        {ok, Fd_Read} ->
-            {ok, Fd} = file:open(Filepath, OpenOptions),
-            ok = file:close(Fd_Read),
-            maybe_track_open_os_files(Options),
-            {ok, Eof} = file:position(Fd, eof),
-            {ok, #file{fd=Fd, eof=Eof}};
-        Error ->
-            init_status_error(ReturnPid, Ref, Error)
-        end
-    end.
-
-file_open_options(Options) ->
-    [read, raw, binary] ++ case lists:member(read_only, Options) of
-    true ->
-        [];
-    false ->
-        [append]
+        ok
     end.
 
 maybe_track_open_os_files(FileOptions) ->
@@ -329,86 +340,110 @@ maybe_track_open_os_files(FileOptions) ->
         couch_stats_collector:track_process_count({couchdb, open_os_files})
     end.
 
-terminate(_Reason, #file{fd = Fd}) ->
-    ok = file:close(Fd).
-
-
-handle_call({pread_iolist, Pos}, _From, File) ->
-    {RawData, NextPos} = try
-        % up to 8Kbs of read ahead
-        read_raw_iolist_int(File, Pos, 2 * ?SIZE_BLOCK - (Pos rem ?SIZE_BLOCK))
-    catch
-    _:_ ->
-        read_raw_iolist_int(File, Pos, 4)
-    end,
-    <<Prefix:1/integer, Len:31/integer, RestRawData/binary>> =
-        iolist_to_binary(RawData),
-    case Prefix of
-    1 ->
-        {Md5, IoList} = extract_md5(
-            maybe_read_more_iolist(RestRawData, 16 + Len, NextPos, File)),
-        {reply, {ok, IoList, Md5}, File};
-    0 ->
-        IoList = maybe_read_more_iolist(RestRawData, Len, NextPos, File),
-        {reply, {ok, IoList, <<>>}, File}
-    end;
-
-handle_call(bytes, _From, #file{fd = Fd} = File) ->
-    {reply, file:position(Fd, eof), File};
-
-handle_call(sync, _From, #file{fd=Fd}=File) ->
-    {reply, file:sync(Fd), File};
-
-handle_call({truncate, Pos}, _From, #file{fd=Fd}=File) ->
-    {ok, Pos} = file:position(Fd, Pos),
-    case file:truncate(Fd) of
-    ok ->
-        {reply, ok, File#file{eof = Pos}};
-    Error ->
-        {reply, Error, File}
-    end;
-
-handle_call({append_bin, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
-    Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin),
-    Size = iolist_size(Blocks),
-    case file:write(Fd, Blocks) of
-    ok ->
-        {reply, {ok, Pos, Size}, File#file{eof = Pos + Size}};
-    Error ->
-        {reply, Error, File}
+terminate(_Reason, #file{reader = Reader, writer = Writer}) ->
+    couch_util:shutdown_sync(Reader),
+    couch_util:shutdown_sync(Writer).
+
+handle_call({pread_iolist, Pos}, From, #file{reader = Reader} = File) ->
+    Reader ! {read, Pos, From},
+    {noreply, File};
+
+handle_call(bytes, _From, #file{eof = Eof} = File) ->
+    {reply, {ok, Eof}, File};
+
+handle_call(sync, From, #file{writer = W} = File) ->
+    W ! {sync, From},
+    {noreply, File};
+
+handle_call({truncate, Pos}, _From, #file{writer = W} = File) ->
+    W ! {truncate, Pos, self()},
+    receive {W, truncated, Pos} -> ok end,
+    {reply, ok, File#file{eof = Pos}};
+
+handle_call({append_bin, Bin}, From, #file{writer = W, eof = Pos} = File) ->
+    if File#file.queued_write_bytes > ?MAX_QUEUED_BYTES ->
+        % We have too much data queued already. Put the writer on the blocked
+        % list so it's blocked until we write already queued data and free
+        % some memory.
+        BlockedWriters = [{From, Bin} | File#file.blocked_writers],
+        {noreply, File#file{blocked_writers = BlockedWriters}};
+    true ->
+        Bytes = iolist_size(Bin),
+        Size = calculate_total_read_len(Pos rem ?SIZE_BLOCK, Bytes),
+        gen_server:reply(From, {ok, Pos, Size}),
+        W ! {chunk, Bin},
+        NewFile = File#file{
+            eof = Pos + Size,
+            queued_write_bytes = File#file.queued_write_bytes + Bytes
+        },
+        {noreply, NewFile}
     end;
 
-handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
-    BinSize = byte_size(Bin),
-    case Pos rem ?SIZE_BLOCK of
+handle_call({write_header, Bin}, From, #file{writer = W, eof = Pos} = File) ->
+    gen_server:reply(From, ok),
+    W ! {header, Bin},
+    Pos2 = case Pos rem ?SIZE_BLOCK of
     0 ->
-        Padding = <<>>;
+        Pos + 5;
     BlockOffset ->
-        Padding = <<0:(8*(?SIZE_BLOCK-BlockOffset))>>
+        Pos + 5 + (?SIZE_BLOCK - BlockOffset)
     end,
-    FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(5, [Bin])],
-    case file:write(Fd, FinalBin) of
-    ok ->
-        {reply, ok, File#file{eof = Pos + iolist_size(FinalBin)}};
-    Error ->
-        {reply, Error, File}
-    end;
+    File2 = File#file{
+        eof = Pos2 + calculate_total_read_len(5, byte_size(Bin))
+    },
+    {noreply, File2};
+
+handle_call(flush, From, #file{writer =  W} = File) ->
+    W ! {flush, From},
+    {noreply, File};
 
-handle_call(find_header, _From, #file{fd = Fd, eof = Pos} = File) ->
-    {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
+handle_call(find_header, From, #file{reader = Reader, eof = Eof} = File) ->
+    Reader ! {find_header, Eof, From},
+    {noreply, File}.
 
-handle_cast(close, Fd) ->
-    {stop,normal,Fd}.
+handle_cast(close, File) ->
+    {stop,normal,File}.
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
+handle_info({dequeued, Bytes}, File) ->
+    #file{
+        writer = Writer,
+        queued_write_bytes = QueuedBytes,
+        eof = Pos,
+        blocked_writers = Blocked
+    } = File,
+    {Blocked2, QueuedBytes2, Pos2} = enqueue_blocked_writes(
+        Writer, lists:reverse(Blocked), QueuedBytes - Bytes, Pos),
+    NewFile = File#file{
+        queued_write_bytes = QueuedBytes2,
+        eof = Pos2,
+        blocked_writers = Blocked2
+    },
+    {noreply, NewFile};
 handle_info({'EXIT', _, normal}, Fd) ->
     {noreply, Fd};
+handle_info({'EXIT', Pid, Reason}, #file{writer = Pid} = Fd) ->
+    {stop, {write_loop_died, Reason}, Fd};
+handle_info({'EXIT', Pid, Reason}, #file{reader = Pid} = Fd) ->
+    {stop, {read_loop_died, Reason}, Fd};
 handle_info({'EXIT', _, Reason}, Fd) ->
     {stop, Reason, Fd}.
 
 
+enqueue_blocked_writes(Writer, [{NextFrom, NextBin} | RestWriteReqs],
+        QueuedBytes, Pos) when QueuedBytes =< ?MAX_QUEUED_BYTES ->
+    Bytes = iolist_size(NextBin),
+    Size = calculate_total_read_len(Pos rem ?SIZE_BLOCK, Bytes),
+    gen_server:reply(NextFrom, {ok, Pos, Size}),
+    Writer ! {chunk, NextBin},
+    enqueue_blocked_writes(
+        Writer, RestWriteReqs, QueuedBytes + Bytes, Pos + Size);
+enqueue_blocked_writes(_Writer, RemainingWriteReqs, QueuedBytes, Pos) ->
+    {lists:reverse(RemainingWriteReqs), QueuedBytes, Pos}.
+
+
 find_header(_Fd, -1) ->
     no_valid_header;
 find_header(Fd, Block) ->
@@ -441,19 +476,17 @@ maybe_read_more_iolist(Buffer, DataSize, _, _)
     when DataSize =< byte_size(Buffer) ->
     <<Data:DataSize/binary, _/binary>> = Buffer,
     [Data];
-maybe_read_more_iolist(Buffer, DataSize, NextPos, File) ->
+maybe_read_more_iolist(Buffer, DataSize, NextPos, Fd) ->
     {Missing, _} =
-        read_raw_iolist_int(File, NextPos, DataSize - byte_size(Buffer)),
+        read_raw_iolist_int(Fd, NextPos, DataSize - byte_size(Buffer)),
     [Buffer, Missing].
 
--spec read_raw_iolist_int(#file{}, Pos::non_neg_integer(), Len::non_neg_integer()) ->
-    {Data::iolist(), CurPos::non_neg_integer()}.
-read_raw_iolist_int(Fd, {Pos, _Size}, Len) -> % 0110 UPGRADE CODE
-    read_raw_iolist_int(Fd, Pos, Len);
-read_raw_iolist_int(#file{fd = Fd}, Pos, Len) ->
+read_raw_iolist_int(ReadFd, {Pos, _Size}, Len) -> % 0110 UPGRADE CODE
+    read_raw_iolist_int(ReadFd, Pos, Len);
+read_raw_iolist_int(ReadFd, Pos, Len) ->
     BlockOffset = Pos rem ?SIZE_BLOCK,
     TotalBytes = calculate_total_read_len(BlockOffset, Len),
-    {ok, <<RawBin:TotalBytes/binary>>} = file:pread(Fd, Pos, TotalBytes),
+    {ok, <<RawBin:TotalBytes/binary>>} = file:pread(ReadFd, Pos, TotalBytes),
     {remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes}.
 
 -spec extract_md5(iolist()) -> {binary(), iolist()}.
@@ -487,16 +520,24 @@ remove_block_prefixes(BlockOffset, Bin) ->
         [Bin]
     end.
 
-make_blocks(_BlockOffset, []) ->
-    [];
 make_blocks(0, IoList) ->
-    [<<0>> | make_blocks(1, IoList)];
+    case iolist_size(IoList) of
+    0 ->
+        [];
+    _ ->
+        [<<0>> | make_blocks(1, IoList)]
+    end;
 make_blocks(BlockOffset, IoList) ->
-    case split_iolist(IoList, (?SIZE_BLOCK - BlockOffset), []) of
-    {Begin, End} ->
-        [Begin | make_blocks(0, End)];
-    _SplitRemaining ->
-        IoList
+    case iolist_size(IoList) of
+    0 ->
+        [];
+    _ ->
+        case split_iolist(IoList, (?SIZE_BLOCK - BlockOffset), []) of
+        {Begin, End} ->
+            [Begin | make_blocks(0, End)];
+        _SplitRemaining ->
+            IoList
+        end
     end.
 
 %% @doc Returns a tuple where the first element contains the leading SplitAt
@@ -522,3 +563,153 @@ split_iolist([Sublist| Rest], SplitAt, BeginAcc) when is_list(Sublist) ->
     end;
 split_iolist([Byte | Rest], SplitAt, BeginAcc) when is_integer(Byte) ->
     split_iolist(Rest, SplitAt - 1, [Byte | BeginAcc]).
+
+
+spawn_writer(Filepath) ->
+    Parent = self(),
+    Pid = spawn_link(fun() ->
+        case file:open(Filepath, [binary, append, raw]) of
+        {ok, Fd} ->
+            {ok, Eof} = file:position(Fd, eof),
+            Parent ! {self(), {ok, Eof}},
+            writer_loop(Parent, Fd, Eof);
+        Error ->
+            Parent ! {self(), Error}
+        end
+    end),
+    receive
+    {Pid, {ok, Eof}} ->
+         {Pid, Eof};
+    {Pid, Error} ->
+         throw({error, Error})
+    end.
+
+
+spawn_reader(Filepath) ->
+    Parent = self(),
+    Pid = spawn_link(fun() ->
+        case file:open(Filepath, [binary, read, raw]) of
+        {ok, Fd} ->
+            Parent ! {self(), ok},
+            reader_loop(Fd);
+        Error ->
+            Parent ! {self(), Error}
+        end
+    end),
+    receive
+    {Pid, ok} ->
+         Pid;
+    {Pid, Error} ->
+         throw({error, Error})
+    end.
+
+
+writer_loop(Parent, Fd, Eof) ->
+    receive
+    {chunk, Chunk} ->
+        writer_collect_chunks(Parent, Fd, Eof, [Chunk]);
+    {header, Header} ->
+        Eof2 = write_header_blocks(Fd, Eof, Header),
+        writer_loop(Parent, Fd, Eof2);
+    {truncate, Pos, From} ->
+        {ok, Pos} = file:position(Fd, Pos),
+        ok = file:truncate(Fd),
+        From ! {self(), truncated, Pos},
+        writer_loop(Parent, Fd, Pos);
+    {flush, From} ->
+        gen_server:reply(From, ok),
+        writer_loop(Parent, Fd, Eof);
+    {sync, From} ->
+        ok = file:sync(Fd),
+        gen_server:reply(From, ok),
+        writer_loop(Parent, Fd, Eof);
+    stop ->
+        ok = file:close(Fd),
+        exit(done)
+    end.
+
+writer_collect_chunks(Parent, Fd, Eof, Acc) ->
+    receive
+    {chunk, Chunk} ->
+        writer_collect_chunks(Parent, Fd, Eof, [Chunk | Acc]);
+    {header, Header} ->
+        Eof2 = write_blocks(Parent, Fd, Eof, Acc),
+        Eof3 = write_header_blocks(Fd, Eof2, Header),
+        writer_loop(Parent, Fd, Eof3);
+    {truncate, Pos, From} ->
+        _ = write_blocks(Parent, Fd, Eof, Acc),
+        {ok, Pos} = file:position(Fd, Pos),
+        ok = file:truncate(Fd),
+        From ! {self(), truncated, Pos},
+        writer_loop(Parent, Fd, Pos);
+    {flush, From} ->
+        Eof2 = write_blocks(Parent, Fd, Eof, Acc),
+        gen_server:reply(From, ok),
+        writer_loop(Parent, Fd, Eof2);
+    {sync, From} ->
+        Eof2 = write_blocks(Parent, Fd, Eof, Acc),
+        ok = file:sync(Fd),
+        gen_server:reply(From, ok),
+        writer_loop(Parent, Fd, Eof2)
+    after 0 ->
+        Eof2 = write_blocks(Parent, Fd, Eof, Acc),
+        writer_loop(Parent, Fd, Eof2)
+    end.
+
+
+write_blocks(Parent, Fd, Eof, Data) ->
+    Blocks = make_blocks(Eof rem ?SIZE_BLOCK, lists:reverse(Data)),
+    ok = file:write(Fd, Blocks),
+    Parent ! {dequeued, iolist_size(Data)},
+    Eof + iolist_size(Blocks).
+
+write_header_blocks(Fd, Eof, Header) ->
+    case Eof rem ?SIZE_BLOCK of
+    0 ->
+        Padding = <<>>;
+    BlockOffset ->
+        Padding = <<0:(8 * (?SIZE_BLOCK - BlockOffset))>>
+    end,
+    FinalHeader = [
+        Padding,
+        <<1, (byte_size(Header)):32/integer>> | make_blocks(5, [Header])
+    ],
+    ok = file:write(Fd, FinalHeader),
+    Eof + iolist_size(FinalHeader).
+
+
+reader_loop(Fd) ->
+    receive
+    {read, Pos, From} ->
+        read_iolist(Fd, Pos, From),
+        reader_loop(Fd);
+    {find_header, Eof, From} ->
+        gen_server:reply(From, find_header(Fd, Eof div ?SIZE_BLOCK)),
+        reader_loop(Fd);
+    stop ->
+        ok = file:close(Fd),
+        exit(done)
+    end.
+
+
+-compile({inline, [read_iolist/3]}).
+
+read_iolist(Fd, Pos, From) ->
+    {RawData, NextPos} = try
+        % up to 8Kbs of read ahead
+        read_raw_iolist_int(Fd, Pos, 2 * ?SIZE_BLOCK - (Pos rem ?SIZE_BLOCK))
+    catch
+    _:_ ->
+        read_raw_iolist_int(Fd, Pos, 4)
+    end,
+    <<Prefix:1/integer, Len:31/integer, RestRawData/binary>> =
+        iolist_to_binary(RawData),
+    case Prefix of
+    1 ->
+        {Md5, IoList} = extract_md5(
+            maybe_read_more_iolist(RestRawData, 16 + Len, NextPos, Fd)),
+        gen_server:reply(From, {ok, IoList, Md5});
+    0 ->
+        IoList = maybe_read_more_iolist(RestRawData, Len, NextPos, Fd),
+        gen_server:reply(From, {ok, IoList})
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/test/etap/010-file-basics.t
----------------------------------------------------------------------
diff --git a/test/etap/010-file-basics.t b/test/etap/010-file-basics.t
index fb1b29e..a1b9a98 100755
--- a/test/etap/010-file-basics.t
+++ b/test/etap/010-file-basics.t
@@ -57,6 +57,7 @@ test() ->
 
     ?etap_match(couch_file:append_binary(Fd, <<"fancy!">>), {ok, Size, _},
         "Appending a binary returns the current file size."),
+    ok = couch_file:flush(Fd),
 
     etap:is({ok, foo}, couch_file:pread_term(Fd, 0),
         "Reading the first term returns what we wrote: foo"),
@@ -70,25 +71,30 @@ test() ->
     ),
 
     {ok, BinPos, _} = couch_file:append_binary(Fd, <<131,100,0,3,102,111,111>>),
+    ok = couch_file:flush(Fd),
     etap:is({ok, foo}, couch_file:pread_term(Fd, BinPos),
         "Reading a term from a written binary term representation succeeds."),
         
     BigBin = list_to_binary(lists:duplicate(100000, 0)),
     {ok, BigBinPos, _} = couch_file:append_binary(Fd, BigBin),
+    ok = couch_file:flush(Fd),
     etap:is({ok, BigBin}, couch_file:pread_binary(Fd, BigBinPos),
         "Reading a large term from a written representation succeeds."),
     
     ok = couch_file:write_header(Fd, hello),
+    ok = couch_file:flush(Fd),
     etap:is({ok, hello}, couch_file:read_header(Fd),
         "Reading a header succeeds."),
         
     {ok, BigBinPos2, _} = couch_file:append_binary(Fd, BigBin),
+    ok = couch_file:flush(Fd),
     etap:is({ok, BigBin}, couch_file:pread_binary(Fd, BigBinPos2),
         "Reading a large term from a written representation succeeds 2."),
 
     % append_binary == append_iolist?
     % Possible bug in pread_iolist or iolist() -> append_binary
     {ok, IOLPos, _} = couch_file:append_binary(Fd, ["foo", $m, <<"bam">>]),
+    ok = couch_file:flush(Fd),
     {ok, IoList} = couch_file:pread_iolist(Fd, IOLPos),
     etap:is(<<"foombam">>, iolist_to_binary(IoList),
         "Reading an results in a binary form of the written iolist()"),

http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/test/etap/011-file-headers.t
----------------------------------------------------------------------
diff --git a/test/etap/011-file-headers.t b/test/etap/011-file-headers.t
index a26b032..82c028f 100755
--- a/test/etap/011-file-headers.t
+++ b/test/etap/011-file-headers.t
@@ -45,6 +45,7 @@ test() ->
     etap:is_greater(Size1, 0,
         "Writing a header allocates space in the file."),
 
+    ok = couch_file:flush(Fd),
     etap:is({ok, {<<"some_data">>, 32}}, couch_file:read_header(Fd),
         "Reading the header returns what we wrote."),
 
@@ -55,6 +56,7 @@ test() ->
     etap:is_greater(Size2, Size1,
         "Writing a second header allocates more space."),
 
+    ok = couch_file:flush(Fd),
     etap:is({ok, [foo, <<"more">>]}, couch_file:read_header(Fd),
         "Reading the second header does not return the first header."),
 
@@ -69,6 +71,7 @@ test() ->
         "Rewriting the same second header returns the same second size."),
 
     couch_file:write_header(Fd, erlang:make_tuple(5000, <<"CouchDB">>)),
+    ok = couch_file:flush(Fd),
     etap:is(
         couch_file:read_header(Fd),
         {ok, erlang:make_tuple(5000, <<"CouchDB">>)},
@@ -82,6 +85,7 @@ test() ->
 
     % Destroy the 0x1 byte that marks a header
     check_header_recovery(fun(CouchFd, RawFd, Expect, HeaderPos) ->
+        ok = couch_file:flush(CouchFd),
         etap:isnt(Expect, couch_file:read_header(CouchFd),
             "Should return a different header before corruption."),
         file:pwrite(RawFd, HeaderPos, <<0>>),
@@ -91,6 +95,7 @@ test() ->
 
     % Corrupt the size.
     check_header_recovery(fun(CouchFd, RawFd, Expect, HeaderPos) ->
+        ok = couch_file:flush(CouchFd),
         etap:isnt(Expect, couch_file:read_header(CouchFd),
             "Should return a different header before corruption."),
         % +1 for 0x1 byte marker
@@ -101,6 +106,7 @@ test() ->
 
     % Corrupt the MD5 signature
     check_header_recovery(fun(CouchFd, RawFd, Expect, HeaderPos) ->
+        ok = couch_file:flush(CouchFd),
         etap:isnt(Expect, couch_file:read_header(CouchFd),
             "Should return a different header before corruption."),
         % +5 = +1 for 0x1 byte and +4 for term size.
@@ -111,6 +117,7 @@ test() ->
 
     % Corrupt the data
     check_header_recovery(fun(CouchFd, RawFd, Expect, HeaderPos) ->
+        ok = couch_file:flush(CouchFd),
         etap:isnt(Expect, couch_file:read_header(CouchFd),
             "Should return a different header before corruption."),
         % +21 = +1 for 0x1 byte, +4 for term size and +16 for MD5 sig

http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/test/etap/020-btree-basics.t
----------------------------------------------------------------------
diff --git a/test/etap/020-btree-basics.t b/test/etap/020-btree-basics.t
index 6886ee1..372b494 100755
--- a/test/etap/020-btree-basics.t
+++ b/test/etap/020-btree-basics.t
@@ -97,6 +97,7 @@ test_kvs(KeyValues) ->
             "After removing all keys btree size is 0."),
 
     {Btree4, _} = lists:foldl(fun(KV, {BtAcc, PrevSize}) ->
+        ok = couch_file:flush(Fd),
         {ok, BtAcc2} = couch_btree:add_remove(BtAcc, [KV], []),
         case couch_btree:size(BtAcc2) > PrevSize of
         true ->
@@ -106,6 +107,7 @@ test_kvs(KeyValues) ->
         end,
         {BtAcc2, couch_btree:size(BtAcc2)}
     end, {Btree3, couch_btree:size(Btree3)}, KeyValues),
+    ok = couch_file:flush(Fd),
 
     etap:ok(test_btree(Btree4, KeyValues),
         "Adding all keys one at a time returns a complete btree."),
@@ -113,6 +115,7 @@ test_kvs(KeyValues) ->
             "Non empty btrees have a size > 0."),
 
     {Btree5, _} = lists:foldl(fun({K, _}, {BtAcc, PrevSize}) ->
+        ok = couch_file:flush(Fd),
         {ok, BtAcc2} = couch_btree:add_remove(BtAcc, [], [K]),
         case couch_btree:size(BtAcc2) < PrevSize of
         true ->
@@ -122,6 +125,8 @@ test_kvs(KeyValues) ->
         end,
         {BtAcc2, couch_btree:size(BtAcc2)}
     end, {Btree4, couch_btree:size(Btree4)}, KeyValues),
+    ok = couch_file:flush(Fd),
+
     etap:ok(test_btree(Btree5, []),
         "Removing all keys one at a time returns an empty btree."),
     etap:is(0, couch_btree:size(Btree5),
@@ -129,6 +134,7 @@ test_kvs(KeyValues) ->
 
     KeyValuesRev = lists:reverse(KeyValues),
     {Btree6, _} = lists:foldl(fun(KV, {BtAcc, PrevSize}) ->
+        ok = couch_file:flush(Fd),
         {ok, BtAcc2} = couch_btree:add_remove(BtAcc, [KV], []),
         case couch_btree:size(BtAcc2) > PrevSize of
         true ->
@@ -170,14 +176,16 @@ test_kvs(KeyValues) ->
     etap:is(couch_file:close(Fd), ok, "closing out"),
     true.
 
-test_btree(Btree, KeyValues) ->
+test_btree(#btree{fd = Fd} = Btree, KeyValues) ->
+    ok = couch_file:flush(Fd),
     ok = test_key_access(Btree, KeyValues),
     ok = test_lookup_access(Btree, KeyValues),
     ok = test_final_reductions(Btree, KeyValues),
     ok = test_traversal_callbacks(Btree, KeyValues),
     true.
 
-test_add_remove(Btree, OutKeyValues, RemainingKeyValues) ->
+test_add_remove(#btree{fd = Fd} = Btree, OutKeyValues, RemainingKeyValues) ->
+    ok = couch_file:flush(Fd),
     Btree2 = lists:foldl(fun({K, _}, BtAcc) ->
         {ok, BtAcc2} = couch_btree:add_remove(BtAcc, [], [K]),
         BtAcc2
@@ -190,7 +198,7 @@ test_add_remove(Btree, OutKeyValues, RemainingKeyValues) ->
     end, Btree2, OutKeyValues),
     true = test_btree(Btree3, OutKeyValues ++ RemainingKeyValues).
 
-test_key_access(Btree, List) ->
+test_key_access(#btree{fd = Fd} = Btree, List) ->
     FoldFun = fun(Element, {[HAcc|TAcc], Count}) ->
         case Element == HAcc of
             true -> {ok, {TAcc, Count + 1}};
@@ -199,18 +207,21 @@ test_key_access(Btree, List) ->
     end,
     Length = length(List),
     Sorted = lists:sort(List),
+    ok = couch_file:flush(Fd),
     {ok, _, {[], Length}} = couch_btree:foldl(Btree, FoldFun, {Sorted, 0}),
     {ok, _, {[], Length}} = couch_btree:fold(Btree, FoldFun, {Sorted, 0}, [{dir, rev}]),
     ok.
 
-test_lookup_access(Btree, KeyValues) ->
+test_lookup_access(#btree{fd = Fd} = Btree, KeyValues) ->
+    ok = couch_file:flush(Fd),
     FoldFun = fun({Key, Value}, {Key, Value}) -> {stop, true} end,
     lists:foreach(fun({Key, Value}) ->
         [{ok, {Key, Value}}] = couch_btree:lookup(Btree, [Key]),
         {ok, _, true} = couch_btree:foldl(Btree, FoldFun, {Key, Value}, [{start_key, Key}])
     end, KeyValues).
 
-test_final_reductions(Btree, KeyValues) ->
+test_final_reductions(#btree{fd = Fd} = Btree, KeyValues) ->
+    ok = couch_file:flush(Fd),
     KVLen = length(KeyValues),
     FoldLFun = fun(_X, LeadingReds, Acc) ->
         CountToStart = KVLen div 3 + Acc,

http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/test/etap/021-btree-reductions.t
----------------------------------------------------------------------
diff --git a/test/etap/021-btree-reductions.t b/test/etap/021-btree-reductions.t
index 331e49a..e597772 100755
--- a/test/etap/021-btree-reductions.t
+++ b/test/etap/021-btree-reductions.t
@@ -47,6 +47,7 @@ test()->
     end, {"odd", []}, lists:seq(1, rows())),
 
     {ok, Btree2} = couch_btree:add_remove(Btree, EvenOddKVs, []),
+    ok = couch_file:flush(Fd),
 
     GroupFun = fun({K1, _}, {K2, _}) -> K1 == K2 end,
     FoldFun = fun(GroupedKey, Unreduced, Acc) ->

http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/test/etap/050-stream.t
----------------------------------------------------------------------
diff --git a/test/etap/050-stream.t b/test/etap/050-stream.t
index de0dfad..622079a 100755
--- a/test/etap/050-stream.t
+++ b/test/etap/050-stream.t
@@ -26,6 +26,7 @@ main(_) ->
     ok.
 
 read_all(Fd, PosList) ->
+    ok = couch_file:flush(Fd),
     Data = couch_stream:foldl(Fd, PosList, fun(Bin, Acc) -> [Bin, Acc] end, []),
     iolist_to_binary(Data).
 

http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/test/etap/200-view-group-no-db-leaks.t
----------------------------------------------------------------------
diff --git a/test/etap/200-view-group-no-db-leaks.t b/test/etap/200-view-group-no-db-leaks.t
index 4586ff4..9b74e01 100755
--- a/test/etap/200-view-group-no-db-leaks.t
+++ b/test/etap/200-view-group-no-db-leaks.t
@@ -25,7 +25,6 @@
     compactor_pid = nil,
     instance_start_time, % number of microsecs since jan 1 1970 as a binary string
     fd,
-    updater_fd,
     fd_ref_counter,
     header = nil,
     committed_update_seq,

http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/test/etap/201-view-group-shutdown.t
----------------------------------------------------------------------
diff --git a/test/etap/201-view-group-shutdown.t b/test/etap/201-view-group-shutdown.t
index 2fabef6..dfd218e 100755
--- a/test/etap/201-view-group-shutdown.t
+++ b/test/etap/201-view-group-shutdown.t
@@ -25,7 +25,6 @@
     compactor_pid = nil,
     instance_start_time, % number of microsecs since jan 1 1970 as a binary string
     fd,
-    updater_fd,
     fd_ref_counter,
     header = nil,
     committed_update_seq,

http://git-wip-us.apache.org/repos/asf/couchdb/blob/8d698f57/test/etap/240-replication-compact.t
----------------------------------------------------------------------
diff --git a/test/etap/240-replication-compact.t b/test/etap/240-replication-compact.t
index c8b265e..3e3d608 100755
--- a/test/etap/240-replication-compact.t
+++ b/test/etap/240-replication-compact.t
@@ -30,7 +30,6 @@
     compactor_pid = nil,
     instance_start_time, % number of microsecs since jan 1 1970 as a binary string
     fd,
-    updater_fd,
     fd_ref_counter,
     header = nil,
     committed_update_seq,


Mime
View raw message