couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dam...@apache.org
Subject svn commit: r778485 - in /couchdb/trunk: ./ etc/couchdb/ etc/default/ share/www/script/test/ src/couchdb/
Date Mon, 25 May 2009 19:52:28 GMT
Author: damien
Date: Mon May 25 19:52:28 2009
New Revision: 778485

URL: http://svn.apache.org/viewvc?rev=778485&view=rev
Log:
Merging new tail append storage into trunk. Upgrades are automatic, once opened by this version  old versions of CouchDB will not be able to open the files. As a precaution, you should back-up your production databases before upgrading.

Modified:
    couchdb/trunk/   (props changed)
    couchdb/trunk/etc/couchdb/local_dev.ini
    couchdb/trunk/etc/default/couchdb   (props changed)
    couchdb/trunk/share/www/script/test/basics.js
    couchdb/trunk/share/www/script/test/compact.js
    couchdb/trunk/src/couchdb/couch_db.erl
    couchdb/trunk/src/couchdb/couch_db.hrl
    couchdb/trunk/src/couchdb/couch_db_updater.erl
    couchdb/trunk/src/couchdb/couch_doc.erl
    couchdb/trunk/src/couchdb/couch_file.erl
    couchdb/trunk/src/couchdb/couch_httpd_db.erl
    couchdb/trunk/src/couchdb/couch_stream.erl
    couchdb/trunk/src/couchdb/couch_view_group.erl

Propchange: couchdb/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon May 25 19:52:28 2009
@@ -1,3 +1,4 @@
 /couchdb/branches/0.9.x:775634
 /couchdb/branches/design_resources:751716-751803
 /couchdb/branches/form:729440-730015
+/couchdb/branches/tail_header:775760-778477

Modified: couchdb/trunk/etc/couchdb/local_dev.ini
URL: http://svn.apache.org/viewvc/couchdb/trunk/etc/couchdb/local_dev.ini?rev=778485&r1=778484&r2=778485&view=diff
==============================================================================
--- couchdb/trunk/etc/couchdb/local_dev.ini (original)
+++ couchdb/trunk/etc/couchdb/local_dev.ini Mon May 25 19:52:28 2009
@@ -12,7 +12,7 @@
 ;bind_address = 127.0.0.1
 
 [log]
-level = info
+level = error
 
 [update_notification]
 ;unique notifier name=/full/path/to/exe -with "cmd line arg"

Propchange: couchdb/trunk/etc/default/couchdb
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon May 25 19:52:28 2009
@@ -1,4 +1,5 @@
 /couchdb/branches/0.9.x/etc/default/couchdb:775634
 /couchdb/branches/design_resources/etc/default/couchdb:751716-751803
 /couchdb/branches/form/etc/default/couchdb:729440-730015
+/couchdb/branches/tail_header/etc/default/couchdb:775760-778477
 /incubator/couchdb/trunk/etc/default/couchdb:642419-694440

Modified: couchdb/trunk/share/www/script/test/basics.js
URL: http://svn.apache.org/viewvc/couchdb/trunk/share/www/script/test/basics.js?rev=778485&r1=778484&r2=778485&view=diff
==============================================================================
--- couchdb/trunk/share/www/script/test/basics.js (original)
+++ couchdb/trunk/share/www/script/test/basics.js Mon May 25 19:52:28 2009
@@ -139,7 +139,7 @@
 
     // make sure we can still open the old rev of the deleted doc
     T(db.open(existingDoc._id, {rev: existingDoc._rev}) != null);
-
+    console.log("db.info: " + db.info.update_seq),
     // make sure restart works
     T(db.ensureFullCommit().ok);
     restartServer();

Modified: couchdb/trunk/share/www/script/test/compact.js
URL: http://svn.apache.org/viewvc/couchdb/trunk/share/www/script/test/compact.js?rev=778485&r1=778484&r2=778485&view=diff
==============================================================================
--- couchdb/trunk/share/www/script/test/compact.js (original)
+++ couchdb/trunk/share/www/script/test/compact.js Mon May 25 19:52:28 2009
@@ -15,7 +15,7 @@
   db.deleteDb();
   db.createDb();
   if (debug) debugger;
-  var docs = makeDocs(0, 10);
+  var docs = makeDocs(0, 20);
   db.bulkSave(docs);
 
   var binAttDoc = {
@@ -35,6 +35,7 @@
   for(var i in docs) {
       db.deleteDoc(docs[i]);
   }
+  T(db.ensureFullCommit().ok);
   var deletesize = db.info().disk_size;
   T(deletesize > originalsize);
 

Modified: couchdb/trunk/src/couchdb/couch_db.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_db.erl?rev=778485&r1=778484&r2=778485&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_db.erl (original)
+++ couchdb/trunk/src/couchdb/couch_db.erl Mon May 25 19:52:28 2009
@@ -24,7 +24,7 @@
 -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
 -export([start_link/3,open_doc_int/3,set_admins/2,get_admins/1,ensure_full_commit/1]).
 -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
--export([changes_since/5]).
+-export([changes_since/5,read_doc/2]).
 
 -include("couch_db.hrl").
 
@@ -50,6 +50,7 @@
         {ok, Fd} ->
             ?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]),
             ok = file:rename(Filepath ++ ".compact", Filepath),
+            ok = couch_file:sync(Fd),
             {ok, Fd};
         {error, enoent} ->
             {not_found, no_db_file}
@@ -154,7 +155,7 @@
 purge_docs(#db{update_pid=UpdatePid}, IdsRevs) ->
     gen_server:call(UpdatePid, {purge_docs, IdsRevs}).
     
-get_committed_update_seq(#db{header=#db_header{update_seq=Seq}}) ->
+get_committed_update_seq(#db{committed_update_seq=Seq}) ->
     Seq.
 
 get_update_seq(#db{update_seq=Seq})->
@@ -565,93 +566,55 @@
     % already written to our file, nothing to write
     {Fd, StreamPointer, Len};
   
+flush_binary(Fd, {OtherFd, StreamPointer, Len}) when is_tuple(StreamPointer) ->
+    {NewStreamData, Len} = 
+            couch_stream:old_copy_to_new_stream(OtherFd, StreamPointer, Len, Fd),
+    {Fd, NewStreamData, Len};
+
 flush_binary(Fd, {OtherFd, StreamPointer, Len}) ->
-    with_stream(Fd, fun(OutputStream) -> 
-        % written to a different file (or a closed file
-        % instance, which will cause an error)
-        ok = couch_stream:set_min_buffer(OutputStream, Len),
-        {ok, {NewStreamPointer, Len}, _EndSp} =
-        couch_stream:foldl(OtherFd, StreamPointer, Len,
-            fun(Bin, {BeginPointer, SizeAcc}) ->
-                {ok, Pointer} = couch_stream:write(OutputStream, Bin),
-                case SizeAcc of
-                0 -> % this was the first write, record the pointer
-                    {ok, {Pointer, size(Bin)}};
-                _ ->
-                    {ok, {BeginPointer, SizeAcc  + size(Bin)}}
-                end
-            end,
-            {{0,0}, 0}),
-        {Fd, NewStreamPointer, Len}
-    end);
+    {NewStreamData, Len} = 
+            couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
+    {Fd, NewStreamData, Len};
                          
 flush_binary(Fd, Bin) when is_binary(Bin) ->
-    with_stream(Fd, fun(OutputStream) -> 
-        ok = couch_stream:set_min_buffer(OutputStream, size(Bin)),
-        {ok, StreamPointer} = couch_stream:write(OutputStream, Bin),
-        {Fd, StreamPointer, size(Bin)}
+    with_stream(Fd, fun(OutputStream) ->
+        couch_stream:write(OutputStream, Bin)
     end);
                  
 flush_binary(Fd, {StreamFun, undefined}) when is_function(StreamFun) ->
-    % max_attachment_chunk_size control the max we buffer in memory
-    MaxChunkSize = list_to_integer(couch_config:get("couchdb", 
-        "max_attachment_chunk_size","4294967296")),
     with_stream(Fd, fun(OutputStream) -> 
         % StreamFun(MaxChunkSize, WriterFun) must call WriterFun
-        % once for each chunk of the attachment.
-        WriterFun = make_writer_fun(OutputStream),
-        {ok, {TotalLength, NewStreamPointer}} = 
-            StreamFun(MaxChunkSize, WriterFun, {0, nil}),
-        {Fd, NewStreamPointer, TotalLength}
+        % once for each chunk of the attachment,
+        StreamFun(4096,
+            % WriterFun({Length, Binary}, State)
+            % WriterFun({0, _Footers}, State)
+            % Called with Length == 0 on the last time.
+            % WriterFun returns NewState.
+            fun({0, _Footers}, _) ->
+                ok;
+            ({_Length, Bin}, _) ->
+                couch_stream:write(OutputStream, Bin)
+            end, ok)
     end);
              
 flush_binary(Fd, {Fun, Len}) when is_function(Fun) ->
     with_stream(Fd, fun(OutputStream) -> 
-        ok = couch_stream:set_min_buffer(OutputStream, Len),
-        {ok, StreamPointer} =
-            write_streamed_attachment(OutputStream, Fun, Len, nil),
-        {Fd, StreamPointer, Len}
+        write_streamed_attachment(OutputStream, Fun, Len)
     end).
             
 with_stream(Fd, Fun) ->
     {ok, OutputStream} = couch_stream:open(Fd),
-    Result = Fun(OutputStream),
-    couch_stream:close(OutputStream),
-    Result.
+    Fun(OutputStream),
+    {StreamInfo, Len} = couch_stream:close(OutputStream),
+    {Fd, StreamInfo, Len}.
 
-make_writer_fun(Stream) ->
-    % WriterFun({Length, Binary}, State)
-    % WriterFun({0, _Footers}, State)
-    % Called with Length == 0 on the last time.
-    % WriterFun returns NewState.
-    fun
-        ({0, _Footers}, {FinalLen, SpFin}) ->
-            % last block, return the final tuple
-            {ok, {FinalLen, SpFin}};
-        ({Length, Bin}, {Total, nil}) ->
-            % save StreamPointer 
-            ok = couch_stream:set_min_buffer(Stream, Length),
-            {ok, StreamPointer} = couch_stream:write(Stream, Bin),
-            {Total+Length, StreamPointer};
-        ({Length, Bin}, {Total, SpAcc}) ->
-            % write the Bin to disk 
-            ok = couch_stream:set_min_buffer(Stream, Length),
-            {ok, _Sp} = couch_stream:write(Stream, Bin),
-            {Total+Length, SpAcc}
-    end.
-    
-write_streamed_attachment(_Stream, _F, 0, SpAcc) ->
-    {ok, SpAcc};
-write_streamed_attachment(Stream, F, LenLeft, nil) ->
-    Bin = F(),
-    TruncatedBin = check_bin_length(LenLeft, Bin),
-    {ok, SpAcc} = couch_stream:write(Stream, TruncatedBin),
-    write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin), SpAcc);
-write_streamed_attachment(Stream, F, LenLeft, SpAcc) ->
+    
+write_streamed_attachment(_Stream, _F, 0) ->
+    ok;
+write_streamed_attachment(Stream, F, LenLeft) ->
     Bin = F(),
-    TruncatedBin = check_bin_length(LenLeft, Bin),
-    {ok, _} = couch_stream:write(Stream, TruncatedBin),
-    write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin), SpAcc).
+    ok = couch_stream:write(Stream, check_bin_length(LenLeft, Bin)),
+    write_streamed_attachment(Stream, F, LenLeft - size(Bin)).
 
 %% on rare occasions ibrowse seems to process a chunked response incorrectly
 %% and include an extra "\r" in the last chunk.  This code ensures that we 
@@ -857,6 +820,12 @@
     true -> [{local_seq, Seq}]
     end.
 
+read_doc(Fd, Pos) when is_integer(Pos) ->
+    couch_file:pread_term(Fd, Pos);
+read_doc(Fd, OldStyleStreamPointer) ->
+    % 09 UPGRADE CODE
+    couch_stream:old_read_term(Fd, OldStyleStreamPointer).
+
 
 doc_to_tree(#doc{revs={Start, RevIds}}=Doc) ->
     [Tree] = doc_to_tree_simple(Doc, lists:reverse(RevIds)),
@@ -869,14 +838,13 @@
     [{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}].
 
     
-make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) ->
+make_doc(#db{fd=Fd}, Id, Deleted, Bp, RevisionPath) ->
     {BodyData, BinValues} =
     case Bp of
     nil ->
         {[], []};
     _ ->
-        {ok, {BodyData0, BinValues0}} =
-            couch_stream:read_term( Db#db.summary_stream, Bp),
+        {ok, {BodyData0, BinValues0}} = read_doc(Fd, Bp),
         {BodyData0,
             [{Name,{Type,{Fd,Sp,Len}}} || {Name,{Type,Sp,Len}} <- BinValues0]}
     end,

Modified: couchdb/trunk/src/couchdb/couch_db.hrl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_db.hrl?rev=778485&r1=778484&r2=778485&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_db.hrl (original)
+++ couchdb/trunk/src/couchdb/couch_db.hrl Mon May 25 19:52:28 2009
@@ -109,13 +109,12 @@
 % if the disk revision is incremented, then new upgrade logic will need to be
 % added to couch_db_updater:init_db.
 
--define(DISK_VERSION_0_9, 1).
--define(LATEST_DISK_VERSION, 2).
+-define(LATEST_DISK_VERSION, 3).
 
 -record(db_header,
     {disk_version = ?LATEST_DISK_VERSION,  
      update_seq = 0,
-     summary_stream_state = nil,
+     unused = 0,
      fulldocinfo_by_id_btree_state = nil,
      docinfo_by_seq_btree_state = nil,
      local_docs_btree_state = nil,
@@ -133,7 +132,7 @@
     fd,
     fd_ref_counter,
     header = #db_header{},
-    summary_stream,
+    committed_update_seq,
     fulldocinfo_by_id_btree,
     docinfo_by_seq_btree,
     local_docs_btree,
@@ -145,7 +144,8 @@
     admins_ptr = nil,
     user_ctx = #user_ctx{},
     waiting_delayed_commit = nil,
-    revs_limit = 1000
+    revs_limit = 1000,
+    fsync_options = []
     }).
 
 

Modified: couchdb/trunk/src/couchdb/couch_db_updater.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_db_updater.erl?rev=778485&r1=778484&r2=778485&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_db_updater.erl (original)
+++ couchdb/trunk/src/couchdb/couch_db_updater.erl Mon May 25 19:52:28 2009
@@ -19,18 +19,18 @@
 
 -include("couch_db.hrl").
 
--define(HEADER_SIG, <<$g, $m, $k, 0>>).
 
 init({MainPid, DbName, Filepath, Fd, Options}) ->
     case lists:member(create, Options) of
     true ->
         % create a new header and writes it to the file
         Header =  #db_header{},
-        ok = couch_file:write_header(Fd, ?HEADER_SIG, Header),
+        ok = couch_file:write_header(Fd, Header),
         % delete any old compaction files that might be hanging around
         file:delete(Filepath ++ ".compact");
     false ->
-        {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG)
+        ok = couch_file:upgrade_old_header(Fd, <<$g, $m, $k, 0>>), % 09 UPGRADE CODE
+        {ok, Header} = couch_file:read_header(Fd)
     end,
     
     Db = init_db(DbName, Filepath, Fd, Header),
@@ -56,7 +56,7 @@
     end;
 handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
     {reply, ok, Db}; % no data waiting, return ok immediately
-handle_call(full_commit, _From, Db) ->
+handle_call(full_commit, _From,  Db) ->
     {reply, ok, commit_data(Db)}; % commit the data and return ok
 handle_call(increment_update_seq, _From, Db) ->
     Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
@@ -158,7 +158,7 @@
     end;
 handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
     {ok, NewFd} = couch_file:open(CompactFilepath),
-    {ok, NewHeader} = couch_file:read_header(NewFd, ?HEADER_SIG),
+    {ok, NewHeader} = couch_file:read_header(NewFd),
     #db{update_seq=NewSeq} = NewDb =
             init_db(Db#db.name, Filepath, NewFd, NewHeader),
     unlink(NewFd),
@@ -191,7 +191,7 @@
     end.
 
 handle_info(delayed_commit, Db) ->
-    {noreply, commit_data(Db#db{waiting_delayed_commit=nil})}.
+    {noreply, commit_data(Db)}.
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
@@ -214,6 +214,7 @@
             [#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} || 
                 {Rev, Seq, Bp} <- DeletedRevInfos]};
 btree_by_seq_join(KeySeq,{Id, Rev, Bp, Conflicts, DelConflicts, Deleted}) ->
+    % 09 UPGRADE CODE
     % this is the 0.9.0 and earlier by_seq record. It's missing the body pointers
     % and individual seq nums for conflicts that are currently in the index, 
     % meaning the filtered _changes api will not work except for on main docs.
@@ -244,6 +245,7 @@
         (_RevId, ?REV_MISSING) ->
             ?REV_MISSING;
         (_RevId, {IsDeleted, BodyPointer}) ->
+            % 09 UPGRADE CODE
             % this is the 0.9.0 and earlier rev info record. It's missing the seq
             % nums, which means couchdb will sometimes reexamine unchanged
             % documents with the _changes API.
@@ -280,17 +282,27 @@
 less_docid({}, _) -> false; % {} -> special key sorts after all
 less_docid(A, B) -> A < B.
 
+
 init_db(DbName, Filepath, Fd, Header0) ->
-    case element(2, Header0) of
-    ?DISK_VERSION_0_9 -> ok; % no problem, all records upgrade on the fly
-    ?LATEST_DISK_VERSION -> ok;
+    Header1 = simple_upgrade_record(Header0, #db_header{}),
+    Header =
+    case element(2, Header1) of
+    1 -> Header1#db_header{unused = 0}; % 0.9
+    2 -> Header1#db_header{unused = 0}; % post 0.9 and pre 0.10
+    ?LATEST_DISK_VERSION -> Header1;
     _ -> throw({database_disk_version_error, "Incorrect disk header version"})
     end,
-    Header = simple_upgrade_record(Header0, #db_header{}),
-    {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
-    ok = couch_stream:set_min_buffer(SummaryStream, 10000),
     Less = fun less_docid/2,
-            
+        
+    {ok, FsyncOptions} = couch_util:parse_term(
+            couch_config:get("couchdb", "fsync_options", 
+                    "[before_header, after_header, on_file_open]")),
+    
+    case lists:member(on_file_open, FsyncOptions) of
+    true -> ok = couch_file:sync(Fd);
+    _ -> ok
+    end,
+        
     {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
         [{split, fun(X) -> btree_by_id_split(X) end},
         {join, fun(X,Y) -> btree_by_id_join(X,Y) end},
@@ -308,7 +320,6 @@
     AdminsPtr ->
         {ok, Admins} = couch_file:pread_term(Fd, AdminsPtr)
     end,
-    
     % convert start time tuple to microsecs and store as a binary string
     {MegaSecs, Secs, MicroSecs} = now(),
     StartTime = ?l2b(io_lib:format("~p",
@@ -319,22 +330,22 @@
         fd=Fd,
         fd_ref_counter = RefCntr,
         header=Header,
-        summary_stream = SummaryStream,
         fulldocinfo_by_id_btree = IdBtree,
         docinfo_by_seq_btree = SeqBtree,
         local_docs_btree = LocalDocsBtree,
+        committed_update_seq = Header#db_header.update_seq,
         update_seq = Header#db_header.update_seq,
         name = DbName,
         filepath = Filepath,
         admins = Admins,
         admins_ptr = AdminsPtr,
         instance_start_time = StartTime,
-        revs_limit = Header#db_header.revs_limit
+        revs_limit = Header#db_header.revs_limit,
+        fsync_options = FsyncOptions
         }.
 
 
-close_db(#db{ fd_ref_counter = RefCntr, summary_stream = SummaryStream}) ->
-    couch_stream:close(SummaryStream),
+close_db(#db{fd_ref_counter = RefCntr}) ->
     couch_ref_counter:drop(RefCntr).
     
 
@@ -387,7 +398,7 @@
                     ?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []),
                     throw(retry)
                 end,
-                {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}),
+                {ok, NewSummaryPointer} = couch_file:append_term(Fd, {Doc#doc.body, Bins}),
                 {IsDeleted, NewSummaryPointer, UpdateSeq};
             _ ->
                 Value
@@ -549,18 +560,18 @@
 commit_data(Db) ->
     commit_data(Db, false).
 
-
-commit_data(#db{fd=Fd, header=Header} = Db, Delay) ->
-    Header2 = Header#db_header{
+db_to_header(Db, Header) ->
+    Header#db_header{
         update_seq = Db#db.update_seq,
-        summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
         docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
         fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
         local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
         admins_ptr = Db#db.admins_ptr,
-        revs_limit = Db#db.revs_limit
-        },
-    if Header == Header2 ->
+        revs_limit = Db#db.revs_limit}.
+
+commit_data(#db{fd=Fd,header=OldHeader,fsync_options=FsyncOptions}=Db, Delay) ->
+    Header = db_to_header(Db, OldHeader),
+    if OldHeader == Header ->
         Db;
     Delay and (Db#db.waiting_delayed_commit == nil) ->
         Db#db{waiting_delayed_commit=
@@ -575,43 +586,75 @@
             end;
         true -> ok
         end,
-        ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2),
-        Db#db{waiting_delayed_commit=nil,header=Header2}
+        case lists:member(before_header, FsyncOptions) of
+        true -> ok = couch_file:sync(Fd);
+        _    -> ok
+        end,
+        
+        ok = couch_file:write_header(Fd, Header),
+        
+        case lists:member(after_header, FsyncOptions) of
+        true -> ok = couch_file:sync(Fd);
+        _    -> ok
+        end,
+        
+        Db#db{waiting_delayed_commit=nil,
+            header=Header,
+            committed_update_seq=Db#db.update_seq}
     end.
 
-copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) ->
-    {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp),
+
+copy_doc_attachments(SrcFd, SrcSp, DestFd) ->
+    {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcFd, SrcSp),
     % copy the bin values
-    NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) ->
-        {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
-        {Name, {Type, NewBinSp, Len}}
+    NewBinInfos = lists:map(
+        fun({Name, {Type, BinSp, Len}}) when is_tuple(BinSp) orelse BinSp == null ->
+            % 09 UPGRADE CODE
+            {NewBinSp, Len} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
+            {Name, {Type, NewBinSp, Len}};
+        ({Name, {Type, BinSp, Len}}) ->
+            {NewBinSp, Len} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
+            {Name, {Type, NewBinSp, Len}}
         end, BinInfos),
-    % now write the document summary
-    {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}),
-    Sp.
+    {BodyData, NewBinInfos}.
 
-copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) ->
+copy_rev_tree_attachments(_SrcFd, _DestFd, []) ->
     [];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{Start, Tree} | RestTree]) ->
+copy_rev_tree_attachments(SrcFd, DestFd, [{Start, Tree} | RestTree]) ->
     % root nner node, only copy info/data from leaf nodes
-    [Tree2] = copy_rev_tree(SrcFd, DestFd, DestStream, [Tree]),
-    [{Start, Tree2} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) ->
+    [Tree2] = copy_rev_tree_attachments(SrcFd, DestFd, [Tree]),
+    [{Start, Tree2} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)];
+copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) ->
     % This is a leaf node, copy it over
-    NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream),
-    [{RevId, {IsDel, NewSp, Seq}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) ->
+    DocBody = copy_doc_attachments(SrcFd, Sp, DestFd),
+    [{RevId, {IsDel, DocBody, Seq}, []} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)];
+copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, _, SubTree} | RestTree]) ->
     % inner node, only copy info/data from leaf nodes
-    [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)].
+    [{RevId, ?REV_MISSING, copy_rev_tree_attachments(SrcFd, DestFd, SubTree)} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)].
+
     
-copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq, Retry) ->
+copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) ->
     Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
     LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
+    
+    % write out the attachments
     NewFullDocInfos0 = lists:map(
         fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
-            Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)}
+            Info#full_doc_info{rev_tree=copy_rev_tree_attachments(SrcFd, DestFd, RevTree)}
         end, LookupResults),
-    NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos0),
+    % write out the docs
+    % we do this in 2 stages so the docs are written out contiguously, making
+    % view indexing and replication faster.
+    NewFullDocInfos1 = lists:map(
+        fun(#full_doc_info{rev_tree=RevTree}=Info) ->
+            Info#full_doc_info{rev_tree=couch_key_tree:map_leafs(
+                fun(_Key, {IsDel, DocBody, Seq}) ->
+                    {ok, Pos} = couch_file:append_term(DestFd, DocBody),
+                    {IsDel, Pos, Seq}
+                end, RevTree)}
+        end, NewFullDocInfos0),
+
+    NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos1),
     NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos],
     RemoveSeqs =
     case Retry of
@@ -633,7 +676,9 @@
 
 
           
-copy_compact(Db, NewDb, Retry) ->
+copy_compact(Db, NewDb0, Retry) ->
+    FsyncOptions = [Op || Op <- NewDb0#db.fsync_options, Op == before_header],
+    NewDb = NewDb0#db{fsync_options=FsyncOptions},
     TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
     EnumBySeqFun =
     fun(#doc_info{high_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) ->
@@ -677,15 +722,14 @@
     {ok, Fd} ->
         couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>),
         Retry = true,
-        {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG);
+        {ok, Header} = couch_file:read_header(Fd);
     {error, enoent} ->
         couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>),
         {ok, Fd} = couch_file:open(CompactFile, [create]),
         Retry = false,
-        ok = couch_file:write_header(Fd, ?HEADER_SIG, Header=#db_header{})
+        ok = couch_file:write_header(Fd, Header=#db_header{})
     end,
     NewDb = init_db(Name, CompactFile, Fd, Header),
-    unlink(Fd),
     NewDb2 = copy_compact(Db, NewDb, Retry),
     
     gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}),

Modified: couchdb/trunk/src/couchdb/couch_doc.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_doc.erl?rev=778485&r1=778484&r2=778485&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_doc.erl (original)
+++ couchdb/trunk/src/couchdb/couch_doc.erl Mon May 25 19:52:28 2009
@@ -252,13 +252,12 @@
 
 
 bin_foldl(Bin, Fun, Acc) when is_binary(Bin) ->
-    case Fun(Bin, Acc) of
-        {ok, Acc2} -> {ok, Acc2};
-        {done, Acc2} -> {ok, Acc2}
-    end;
-bin_foldl({Fd, Sp, Len}, Fun, Acc) ->
-    {ok, Acc2, _Sp2} = couch_stream:foldl(Fd, Sp, Len, Fun, Acc),
-    {ok, Acc2}.
+    Fun(Bin, Acc);
+bin_foldl({Fd, Sp, Len}, Fun, Acc) when is_tuple(Sp) orelse Sp == null ->
+    % 09 UPGRADE CODE
+    couch_stream:old_foldl(Fd, Sp, Len, Fun, Acc);
+bin_foldl({Fd, Sp, _Len}, Fun, Acc) ->
+    couch_stream:foldl(Fd, Sp, Fun, Acc).
 
 bin_size(Bin) when is_binary(Bin) ->
     size(Bin);
@@ -267,9 +266,8 @@
 
 bin_to_binary(Bin) when is_binary(Bin) ->
     Bin;
-bin_to_binary({Fd, Sp, Len}) ->
-    {ok, Bin, _Sp2} = couch_stream:read(Fd, Sp, Len),
-    Bin.
+bin_to_binary({Fd, Sp, _Len}) ->
+    couch_stream:foldl(Fd, Sp, fun(Bin, Acc) -> [Bin|Acc] end, []).
 
 get_validate_doc_fun(#doc{body={Props}}) ->
     Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),

Modified: couchdb/trunk/src/couchdb/couch_file.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_file.erl?rev=778485&r1=778484&r2=778485&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_file.erl (original)
+++ couchdb/trunk/src/couchdb/couch_file.erl Mon May 25 19:52:28 2009
@@ -15,10 +15,16 @@
 
 -include("couch_db.hrl").
 
--define(HEADER_SIZE, 2048). % size of each segment of the doubly written header
+-define(SIZE_BLOCK, 4096).
 
--export([open/1, open/2, close/1, pread/3, pwrite/3, expand/2, bytes/1, sync/1]).
--export([append_term/2, pread_term/2,write_header/3, read_header/2, truncate/2]).
+-record(file, {
+    fd,
+    tail_append_begin=0 % 09 UPGRADE CODE
+    }).
+
+-export([open/1, open/2, close/1, bytes/1, sync/1, append_binary/2,old_pread/3]).
+-export([append_term/2, pread_term/2, pread_iolist/2, write_header/2]).
+-export([pread_binary/2, read_header/1, truncate/2, upgrade_old_header/2]).
 -export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
 
 %%----------------------------------------------------------------------
@@ -52,39 +58,6 @@
 
 
 %%----------------------------------------------------------------------
-%% Args:    Pos is the offset from the beginning of the file, Bytes is
-%%  is the number of bytes to read.
-%% Returns: {ok, Binary} where Binary is a binary data from disk
-%%  or {error, Reason}.
-%%----------------------------------------------------------------------
-
-pread(Fd, Pos, Bytes) when Bytes > 0 ->
-    gen_server:call(Fd, {pread, Pos, Bytes}, infinity).
-
-
-%%----------------------------------------------------------------------
-%% Args:    Pos is the offset from the beginning of the file, Bin is
-%%  is the binary to write
-%% Returns: ok
-%%  or {error, Reason}.
-%%----------------------------------------------------------------------
-
-pwrite(Fd, Pos, Bin) ->
-    gen_server:call(Fd, {pwrite, Pos, Bin}, infinity).
-
-%%----------------------------------------------------------------------
-%% Purpose: To append a segment of zeros to the end of the file.
-%% Args:    Bytes is the number of bytes to append to the file.
-%% Returns: {ok, Pos} where Pos is the file offset to the beginning of
-%%  the new segments.
-%%  or {error, Reason}.
-%%----------------------------------------------------------------------
-
-expand(Fd, Bytes) when Bytes > 0 ->
-    gen_server:call(Fd, {expand, Bytes}, infinity).
-
-
-%%----------------------------------------------------------------------
 %% Purpose: To append an Erlang term to the end of the file.
 %% Args:    Erlang term to serialize and append to the file.
 %% Returns: {ok, Pos} where Pos is the file offset to the beginning the
@@ -93,7 +66,7 @@
 %%----------------------------------------------------------------------
 
 append_term(Fd, Term) ->
-    append_binary(Fd, term_to_binary(Term, [compressed])).
+    append_binary(Fd, term_to_binary(Term)).
 
 
 %%----------------------------------------------------------------------
@@ -105,7 +78,8 @@
 %%----------------------------------------------------------------------
     
 append_binary(Fd, Bin) ->
-    gen_server:call(Fd, {append_bin, Bin}, infinity).
+    Size = iolist_size(Bin),
+    gen_server:call(Fd, {append_bin, [<<Size:32/integer>>, Bin]}, infinity).
 
 
 %%----------------------------------------------------------------------
@@ -115,10 +89,12 @@
 %%  or {error, Reason}.
 %%----------------------------------------------------------------------
 
+    
 pread_term(Fd, Pos) ->
     {ok, Bin} = pread_binary(Fd, Pos),
     {ok, binary_to_term(Bin)}.
 
+
 %%----------------------------------------------------------------------
 %% Purpose: Reads a binrary from a file that was written with append_binary
 %% Args:    Pos, the offset into the file where the term is serialized.
@@ -127,8 +103,26 @@
 %%----------------------------------------------------------------------
 
 pread_binary(Fd, Pos) ->
-    gen_server:call(Fd, {pread_bin, Pos}, infinity).
+    {ok, L} = pread_iolist(Fd, Pos),
+    {ok, iolist_to_binary(L)}.
 
+pread_iolist(Fd, Pos) ->
+    {ok, LenIolist, NextPos} =read_raw_iolist(Fd, Pos, 4),
+    <<Len:32/integer>> = iolist_to_binary(LenIolist),
+    {ok, Iolist, _} = read_raw_iolist(Fd, NextPos, Len),
+    {ok, Iolist}.
+
+read_raw_iolist(Fd, Pos, Len) ->
+    BlockOffset = Pos rem ?SIZE_BLOCK,
+    TotalBytes = calculate_total_read_len(BlockOffset, Len),
+    {ok, <<RawBin:TotalBytes/binary>>, HasPrefixes} = gen_server:call(Fd, {pread, Pos, TotalBytes}, infinity),
+    if HasPrefixes ->
+        {ok, remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes};
+    true ->
+        % 09 UPGRADE CODE
+        <<ReturnBin:Len/binary, _/binary>> = RawBin,
+        {ok, [ReturnBin], Pos + Len}
+    end.
 
 %%----------------------------------------------------------------------
 %% Purpose: The length of a file, in bytes.
@@ -167,35 +161,153 @@
     catch unlink(Fd),
     Result.
 
+% 09 UPGRADE CODE
+old_pread(Fd, Pos, Len) ->
+    {ok, <<RawBin:Len/binary>>, false} = gen_server:call(Fd, {pread, Pos, Len}, infinity),
+    {ok, RawBin}.
+
+% 09 UPGRADE CODE
+upgrade_old_header(Fd, Sig) ->
+    gen_server:call(Fd, {upgrade_old_header, Sig}, infinity).
+
+
+read_header(Fd) ->
+    case gen_server:call(Fd, find_header, infinity) of
+    {ok, Bin} ->
+        {ok, binary_to_term(Bin)};
+    Else ->
+        Else
+    end.
+    
+write_header(Fd, Data) ->
+    Bin = term_to_binary(Data),
+    Md5 = erlang:md5(Bin),
+    % now we assemble the final header binary and write to disk
+    FinalBin = <<Md5/binary, Bin/binary>>,
+    gen_server:call(Fd, {write_header, FinalBin}, infinity).
+    
+
 
-write_header(Fd, Prefix, Data) ->
-    TermBin = term_to_binary(Data),
-    % the size of all the bytes written to the header, including the md5 signature (16 bytes)
-    FilledSize = size(Prefix) + size(TermBin) + 16,
-    {TermBin2, FilledSize2} =
-    case FilledSize > ?HEADER_SIZE of
+
+init_status_error(ReturnPid, Ref, Error) ->
+    ReturnPid ! {Ref, self(), Error},
+    ignore.
+
+% server functions
+
+init({Filepath, Options, ReturnPid, Ref}) ->
+    case lists:member(create, Options) of
     true ->
-        % too big!
-        {ok, Pos} = append_binary(Fd, TermBin),
-        PtrBin = term_to_binary({pointer_to_header_data, Pos}),
-        {PtrBin, size(Prefix) + size(PtrBin) + 16};
+        filelib:ensure_dir(Filepath),
+        case file:open(Filepath, [read, write, raw, binary]) of
+        {ok, Fd} ->
+            {ok, Length} = file:position(Fd, eof),
+            case Length > 0 of
+            true ->
+                % this means the file already exists and has data.
+                % FYI: We don't differentiate between empty files and non-existant
+                % files here.
+                case lists:member(overwrite, Options) of
+                true ->
+                    {ok, 0} = file:position(Fd, 0),
+                    ok = file:truncate(Fd),
+                    ok = file:sync(Fd),
+                    couch_stats_collector:track_process_count(
+                            {couchdb, open_os_files}),
+                    {ok, #file{fd=Fd}};
+                false ->
+                    ok = file:close(Fd),
+                    init_status_error(ReturnPid, Ref, file_exists)
+                end;
+            false ->
+                couch_stats_collector:track_process_count(
+                        {couchdb, open_os_files}),
+                {ok, #file{fd=Fd}}
+            end;
+        Error ->
+            init_status_error(ReturnPid, Ref, Error)
+        end;
     false ->
-        {TermBin, FilledSize}
+        % open in read mode first, so we don't create the file if it doesn't exist.
+        case file:open(Filepath, [read, raw]) of
+        {ok, Fd_Read} ->
+            {ok, Fd} = file:open(Filepath, [read, write, raw, binary]),
+            ok = file:close(Fd_Read),
+            couch_stats_collector:track_process_count({couchdb, open_os_files}),
+            {ok, #file{fd=Fd}};
+        Error ->
+            init_status_error(ReturnPid, Ref, Error)
+        end
+    end.
+
+
+terminate(_Reason, _Fd) ->
+    ok.
+
+
+handle_call({pread, Pos, Bytes}, _From, #file{fd=Fd,tail_append_begin=TailAppendBegin}=File) ->
+    {ok, Bin} = file:pread(Fd, Pos, Bytes),
+    {reply, {ok, Bin, Pos >= TailAppendBegin}, File};
+handle_call(bytes, _From, #file{fd=Fd}=File) ->
+    {reply, file:position(Fd, eof), File};
+handle_call(sync, _From, #file{fd=Fd}=File) ->
+    {reply, file:sync(Fd), File};
+handle_call({truncate, Pos}, _From, #file{fd=Fd}=File) ->
+    {ok, Pos} = file:position(Fd, Pos),
+    {reply, file:truncate(Fd), File};
+handle_call({append_bin, Bin}, _From, #file{fd=Fd}=File) ->
+    {ok, Pos} = file:position(Fd, eof),
+    Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin),
+    case file:pwrite(Fd, Pos, Blocks) of
+    ok ->
+        {reply, {ok, Pos}, File};
+    Error ->
+        {reply, Error, File}
+    end;
+handle_call({write_header, Bin}, _From, #file{fd=Fd}=File) ->
+    {ok, Pos} = file:position(Fd, eof),
+    BinSize = size(Bin),
+    case Pos rem ?SIZE_BLOCK of
+    0 ->
+        Padding = <<>>;
+    BlockOffset ->
+        Padding = <<0:(8*(?SIZE_BLOCK-BlockOffset))>>
     end,
-    ok = sync(Fd),
-    % pad out the header with zeros, then take the md5 hash
-    PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>,
-    Sig = erlang:md5([TermBin2, PadZeros]),
-    % now we assemble the final header binary and write to disk
-    WriteBin = <<Prefix/binary, TermBin2/binary, PadZeros/binary, Sig/binary>>,
-    ?HEADER_SIZE = size(WriteBin), % sanity check
-    DblWriteBin = [WriteBin, WriteBin],
-    ok = pwrite(Fd, 0, DblWriteBin),
-    ok = sync(Fd).
+    FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(1, [Bin])],
+    {reply, file:pwrite(Fd, Pos, FinalBin), File};
+
+
+handle_call({upgrade_old_header, Prefix}, _From, #file{fd=Fd}=File) ->
+    case (catch read_old_header(Fd, Prefix)) of
+    {ok, Header} ->
+        {ok, TailAppendBegin} = file:position(Fd, eof),
+        Bin = term_to_binary(Header),
+        Md5 = erlang:md5(Bin),
+        % now we assemble the final header binary and write to disk
+        FinalBin = <<Md5/binary, Bin/binary>>,
+        {reply, ok, _} = handle_call({write_header, FinalBin}, ok, File),
+        ok = write_old_header(Fd, <<"upgraded">>, TailAppendBegin),
+        {reply, ok, File#file{tail_append_begin=TailAppendBegin}};
+    _Error ->
+        case (catch read_old_header(Fd, <<"upgraded">>)) of
+        {ok, TailAppendBegin} ->
+            {reply, ok, File#file{tail_append_begin = TailAppendBegin}};
+        _Error2 ->
+            {reply, ok, File}
+        end
+    end;
+
 
+handle_call(find_header, _From, #file{fd=Fd}=File) ->
+    {ok, Pos} = file:position(Fd, eof),
+    {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
+        
+% 09 UPGRADE CODE
+-define(HEADER_SIZE, 2048). % size of each segment of the doubly written header
 
-read_header(Fd, Prefix) ->
-    {ok, Bin} = couch_file:pread(Fd, 0, 2*(?HEADER_SIZE)),
+% 09 UPGRADE CODE
+read_old_header(Fd, Prefix) ->
+    {ok, Bin} = file:pread(Fd, 0, 2*(?HEADER_SIZE)),
     <<Bin1:(?HEADER_SIZE)/binary, Bin2:(?HEADER_SIZE)/binary>> = Bin,
     Result =
     % read the first header
@@ -238,6 +350,7 @@
         Result
     end.
     
+% 09 UPGRADE CODE
 extract_header(Prefix, Bin) ->
     SizeOfPrefix = size(Prefix),
     SizeOfTermBin = ?HEADER_SIZE -
@@ -260,88 +373,35 @@
     _ ->
         unknown_header_type
     end.
+    
 
-
-init_status_error(ReturnPid, Ref, Error) ->
-    ReturnPid ! {Ref, self(), Error},
-    ignore.
-
-% server functions
-
-init({Filepath, Options, ReturnPid, Ref}) ->
-    case lists:member(create, Options) of
+% 09 UPGRADE CODE
+write_old_header(Fd, Prefix, Data) ->
+    TermBin = term_to_binary(Data),
+    % the size of all the bytes written to the header, including the md5 signature (16 bytes)
+    FilledSize = size(Prefix) + size(TermBin) + 16,
+    {TermBin2, FilledSize2} =
+    case FilledSize > ?HEADER_SIZE of
     true ->
-        filelib:ensure_dir(Filepath),
-        case file:open(Filepath, [read, write, raw, binary]) of
-        {ok, Fd} ->
-            {ok, Length} = file:position(Fd, eof),
-            case Length > 0 of
-            true ->
-                % this means the file already exists and has data.
-                % FYI: We don't differentiate between empty files and non-existant
-                % files here.
-                case lists:member(overwrite, Options) of
-                true ->
-                    {ok, 0} = file:position(Fd, 0),
-                    ok = file:truncate(Fd),
-                    couch_stats_collector:track_process_count(
-                            {couchdb, open_os_files}),
-                    {ok, Fd};
-                false ->
-                    ok = file:close(Fd),
-                    init_status_error(ReturnPid, Ref, file_exists)
-                end;
-            false ->
-                couch_stats_collector:track_process_count(
-                        {couchdb, open_os_files}),
-                {ok, Fd}
-            end;
-        Error ->
-            init_status_error(ReturnPid, Ref, Error)
-        end;
+        % too big!
+        {ok, Pos} = append_binary(Fd, TermBin),
+        PtrBin = term_to_binary({pointer_to_header_data, Pos}),
+        {PtrBin, size(Prefix) + size(PtrBin) + 16};
     false ->
-        % open in read mode first, so we don't create the file if it doesn't exist.
-        case file:open(Filepath, [read, raw]) of
-        {ok, Fd_Read} ->
-            {ok, Fd} = file:open(Filepath, [read, write, raw, binary]),
-            ok = file:close(Fd_Read),
-            couch_stats_collector:track_process_count({couchdb, open_os_files}),
-            {ok, Fd};
-        Error ->
-            init_status_error(ReturnPid, Ref, Error)
-        end
-    end.
-
-
-terminate(_Reason, _Fd) ->
-    ok.
-
-
-handle_call({pread, Pos, Bytes}, _From, Fd) ->
-    {reply, file:pread(Fd, Pos, Bytes), Fd};
-handle_call({pwrite, Pos, Bin}, _From, Fd) ->
-    {reply, file:pwrite(Fd, Pos, Bin), Fd};
-handle_call({expand, Num}, _From, Fd) ->
-    {ok, Pos} = file:position(Fd, eof),
-    {reply, {file:pwrite(Fd, Pos + Num - 1, <<0>>), Pos}, Fd};
-handle_call(bytes, _From, Fd) ->
-    {reply, file:position(Fd, eof), Fd};
-handle_call(sync, _From, Fd) ->
-    {reply, file:sync(Fd), Fd};
-handle_call({truncate, Pos}, _From, Fd) ->
-    {ok, Pos} = file:position(Fd, Pos),
-    {reply, file:truncate(Fd), Fd};
-handle_call({append_bin, Bin}, _From, Fd) ->
-    Len = size(Bin),
-    Bin2 = <<Len:32, Bin/binary>>,
-    {ok, Pos} = file:position(Fd, eof),
-    {reply, {file:pwrite(Fd, Pos, Bin2), Pos}, Fd};
-handle_call({pread_bin, Pos}, _From, Fd) ->
-    {ok, <<TermLen:32>>} = file:pread(Fd, Pos, 4),
-    {ok, Bin} = file:pread(Fd, Pos + 4, TermLen),
-    {reply, {ok, Bin}, Fd}.
-
+        {TermBin, FilledSize}
+    end,
+    ok = file:sync(Fd),
+    % pad out the header with zeros, then take the md5 hash
+    PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>,
+    Sig = erlang:md5([TermBin2, PadZeros]),
+    % now we assemble the final header binary and write to disk
+    WriteBin = <<Prefix/binary, TermBin2/binary, PadZeros/binary, Sig/binary>>,
+    ?HEADER_SIZE = size(WriteBin), % sanity check
+    DblWriteBin = [WriteBin, WriteBin],
+    ok = file:pwrite(Fd, 0, DblWriteBin),
+    ok = file:sync(Fd).
 
+    
 handle_cast(close, Fd) ->
     {stop,normal,Fd}.
 
@@ -351,3 +411,82 @@
 
 handle_info({'EXIT', _, Reason}, Fd) ->
     {stop, Reason, Fd}.
+
+
+find_header(_Fd, -1) ->
+    no_valid_header;
+find_header(Fd, Block) ->
+    case (catch load_header(Fd, Block)) of
+    {ok, Bin} ->
+        {ok, Bin};
+    _Error ->
+        find_header(Fd, Block -1)
+    end.
+    
+load_header(Fd, Block) ->
+    {ok, <<1>>} = file:pread(Fd, Block*?SIZE_BLOCK, 1),
+    {ok, <<HeaderLen:32/integer>>} = file:pread(Fd, (Block*?SIZE_BLOCK) + 1, 4),
+    TotalBytes = calculate_total_read_len(1, HeaderLen),
+    {ok, <<RawBin:TotalBytes/binary>>} = 
+            file:pread(Fd, (Block*?SIZE_BLOCK) + 5, TotalBytes),
+    <<Md5Sig:16/binary, HeaderBin/binary>> = 
+        iolist_to_binary(remove_block_prefixes(1, RawBin)),
+    Md5Sig = erlang:md5(HeaderBin),
+    {ok, HeaderBin}.
+
+calculate_total_read_len(0, FinalLen) ->
+    calculate_total_read_len(1, FinalLen) + 1;
+calculate_total_read_len(BlockOffset, FinalLen) ->
+    case ?SIZE_BLOCK - BlockOffset of
+    BlockLeft when BlockLeft >= FinalLen ->
+        FinalLen;
+    BlockLeft ->
+        FinalLen + ((FinalLen - BlockLeft) div (?SIZE_BLOCK -1)) +
+            if ((FinalLen - BlockLeft) rem (?SIZE_BLOCK -1)) == 0 -> 0;
+                true -> 1 end
+    end.
+
+remove_block_prefixes(_BlockOffset, <<>>) ->
+    [];
+remove_block_prefixes(0, <<_BlockPrefix,Rest/binary>>) ->
+    remove_block_prefixes(1, Rest);
+remove_block_prefixes(BlockOffset, Bin) ->
+    BlockBytesAvailable = ?SIZE_BLOCK - BlockOffset,
+    case size(Bin) of
+    Size when Size > BlockBytesAvailable ->
+        <<DataBlock:BlockBytesAvailable/binary,Rest/binary>> = Bin,
+        [DataBlock | remove_block_prefixes(0, Rest)];
+    _Size ->
+        [Bin]
+    end.
+
+make_blocks(_BlockOffset, []) ->
+    [];
+make_blocks(0, IoList) ->
+    [<<0>> | make_blocks(1, IoList)];
+make_blocks(BlockOffset, IoList) ->
+    case split_iolist(IoList, (?SIZE_BLOCK - BlockOffset), []) of
+    {Begin, End} ->
+        [Begin | make_blocks(0, End)];
+    _Size ->
+        IoList
+    end.
+
+split_iolist(List, 0, BeginAcc) ->
+    {lists:reverse(BeginAcc), List};
+split_iolist([], SplitAt, _BeginAcc) ->
+    SplitAt;
+split_iolist([<<Bin/binary>> | Rest], SplitAt, BeginAcc)  when SplitAt > size(Bin) ->
+    split_iolist(Rest, SplitAt - size(Bin), [Bin | BeginAcc]);
+split_iolist([<<Bin/binary>> | Rest], SplitAt, BeginAcc) ->
+    <<Begin:SplitAt/binary,End/binary>> = Bin,
+    split_iolist([End | Rest], 0, [Begin | BeginAcc]);
+split_iolist([Sublist| Rest], SplitAt, BeginAcc) when is_list(Sublist) ->
+    case split_iolist(Sublist, SplitAt, BeginAcc) of
+    {Begin, End} ->
+        {Begin, [End | Rest]};
+    Len ->
+        split_iolist(Rest, SplitAt - Len, [Sublist | BeginAcc])
+    end;
+split_iolist([Byte | Rest], SplitAt, BeginAcc) when is_integer(Byte) ->
+    split_iolist(Rest, SplitAt - 1, [Byte | BeginAcc]).

Modified: couchdb/trunk/src/couchdb/couch_httpd_db.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpd_db.erl?rev=778485&r1=778484&r2=778485&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpd_db.erl (original)
+++ couchdb/trunk/src/couchdb/couch_httpd_db.erl Mon May 25 19:52:28 2009
@@ -723,12 +723,7 @@
             % {"Content-Length", integer_to_list(couch_doc:bin_size(Bin))}
             ]),
         couch_doc:bin_foldl(Bin,
-            fun(BinSegment, []) ->
-                send_chunk(Resp, BinSegment),
-                {ok, []}
-            end,
-            []
-        ),
+                fun(BinSegment, _) -> send_chunk(Resp, BinSegment) end,[]),
         send_chunk(Resp, "")
     end;
 

Modified: couchdb/trunk/src/couchdb/couch_stream.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_stream.erl?rev=778485&r1=778484&r2=778485&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_stream.erl (original)
+++ couchdb/trunk/src/couchdb/couch_stream.erl Mon May 25 19:52:28 2009
@@ -13,14 +13,6 @@
 -module(couch_stream).
 -behaviour(gen_server).
 
--export([test/1]).
--export([open/1, open/2, close/1, read/3, read_term/2, write/2, write_term/2, get_state/1, foldl/5]).
--export([copy/4, copy_to_new_stream/4]).
--export([ensure_buffer/2, set_min_buffer/2]).
--export([init/1, terminate/2, handle_call/3]).
--export([handle_cast/2,code_change/3,handle_info/2]).
-
--include("couch_db.hrl").
 
 -define(FILE_POINTER_BYTES, 8).
 -define(FILE_POINTER_BITS, 8*(?FILE_POINTER_BYTES)).
@@ -32,125 +24,111 @@
 
 -define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data
 
+-export([test/0]).
+-export([open/1, close/1, write/2, foldl/4, old_foldl/5,old_copy_to_new_stream/4]).
+-export([copy_to_new_stream/3,old_read_term/2]).
+-export([init/1, terminate/2, handle_call/3]).
+-export([handle_cast/2,code_change/3,handle_info/2]).
 
--record(write_stream,
-    {fd = 0,
-    current_pos = 0,
-    bytes_remaining = 0,
-    next_alloc = 0,
-    min_alloc = 16#00010000
-    }).
+-include("couch_db.hrl").
 
 -record(stream,
-    {
-    pid,
-    fd
+    {fd = 0,
+    written_pointers=[],
+    buffer_list = [],
+    buffer_len = 0,
+    max_buffer = 4096,
+    written_len = 0
     }).
 
 
 %%% Interface functions %%%
 
 open(Fd) ->
-    open(nil, Fd).
-
-open(nil, Fd) ->
-    open({0,0}, Fd);
-open(State, Fd) ->
-    {ok, Pid} = gen_server:start_link(couch_stream, {State, Fd}, []),
-    {ok, #stream{pid = Pid, fd = Fd}}.
+    gen_server:start_link(couch_stream, Fd, []).
 
-close(#stream{pid = Pid, fd = _Fd}) ->
+close(Pid) ->
     gen_server:call(Pid, close, infinity).
 
-get_state(#stream{pid = Pid, fd = _Fd}) ->
-    gen_server:call(Pid, get_state, infinity).
-
-ensure_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) ->
-    gen_server:call(Pid, {ensure_buffer, Bytes}).
+copy_to_new_stream(Fd, PosList, DestFd) ->
+    {ok, Dest} = open(DestFd),
+    foldl(Fd, PosList,
+        fun(Bin, _) ->
+            ok = write(Dest, Bin)
+        end, ok),
+    close(Dest).
 
-set_min_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) ->
-    gen_server:call(Pid, {set_min_buffer, Bytes}).
 
-read(#stream{pid = _Pid, fd = Fd}, Sp, Num) ->
-    read(Fd, Sp, Num);
-read(Fd, Sp, Num) ->
-    {ok, RevBin, Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> {ok, [Bin | Acc]} end, []),
-    Bin = list_to_binary(lists:reverse(RevBin)),
-    {ok, Bin, Sp2}.
-
-copy_to_new_stream(Src, Sp, Len, DestFd) ->
+% 09 UPGRADE CODE
+old_copy_to_new_stream(Fd, Pos, Len, DestFd) ->
     {ok, Dest} = open(DestFd),
-    ok = set_min_buffer(Dest, 0),
-    {ok, NewSp} = copy(Src, Sp, Len, Dest),
-    close(Dest),
-    {ok, NewSp}.
-
-copy(#stream{pid = _Pid, fd = Fd}, Sp, Len, DestStream) ->
-    copy(Fd, Sp, Len, DestStream);
-copy(Fd, Sp, Len, DestStream) ->
-    {ok, NewSp, _Sp2} = stream_data(Fd, Sp, Len, ?HUGE_CHUNK,
-        fun(Bin, AccPointer) ->
-            {ok, NewPointer} = write(DestStream, Bin),
-            {ok, if AccPointer == null -> NewPointer; true -> AccPointer end}
-        end,
-        null),
-    {ok, NewSp}.
-
-foldl(#stream{pid = _Pid, fd = Fd}, Sp, Num, Fun, Acc) ->
-    foldl(Fd, Sp, Num, Fun, Acc);
-foldl(Fd, Sp, Num, Fun, Acc) ->
-    {ok, _Acc, _Sp} = stream_data(Fd, Sp, Num, ?DEFAULT_STREAM_CHUNK, Fun, Acc).
-
-read_term(#stream{pid = _Pid, fd = Fd}, Sp) ->
-    read_term(Fd, Sp);
-read_term(Fd, Sp) ->
-    {ok, <<TermLen:(?STREAM_OFFSET_BITS)>>, Sp2}
-        = read(Fd, Sp, ?STREAM_OFFSET_BYTES),
-    {ok, Bin, _Sp3} = read(Fd, Sp2, TermLen),
-    {ok, binary_to_term(Bin)}.
+    old_foldl(Fd, Pos, Len,
+        fun(Bin, _) ->
+            ok = write(Dest, Bin)
+        end, ok),
+    close(Dest).
+
+% 09 UPGRADE CODE    
+old_foldl(_Fd, null, 0, _Fun, Acc) ->
+    Acc;
+old_foldl(Fd, OldPointer, Len, Fun, Acc) when is_tuple(OldPointer)->
+    old_stream_data(Fd, OldPointer, Len, ?DEFAULT_STREAM_CHUNK, Fun, Acc).
+
+foldl(_Fd, [], _Fun, Acc) ->
+    Acc;
+foldl(Fd, [Pos|Rest], Fun, Acc) ->
+    {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
+    foldl(Fd, Rest, Fun, Fun(Bin, Acc)).
 
-write_term(Stream, Term) ->
-    Bin = term_to_binary(Term),
-    Size = size(Bin),
-    Bin2 = <<Size:(?STREAM_OFFSET_BITS), Bin/binary>>,
-    write(Stream, Bin2).
-
-write(#stream{}, <<>>) ->
-    {ok, {0,0}};
-write(#stream{pid = Pid}, Bin) when is_binary(Bin) ->
+write(_Pid, <<>>) ->
+    ok;
+write(Pid, Bin) ->
     gen_server:call(Pid, {write, Bin}, infinity).
 
 
-init({{Pos, BytesRemaining}, Fd}) ->
-    {ok, #write_stream
-        {fd = Fd,
-        current_pos = Pos,
-        bytes_remaining = BytesRemaining
-        }}.
+init(Fd) ->
+    {ok, #stream{fd = Fd}}.
 
 terminate(_Reason, _Stream) ->
     ok.
 
-handle_call(get_state, _From, Stream) ->
-    #write_stream{current_pos = Pos, bytes_remaining = BytesRemaining} = Stream,
-    {reply, {Pos, BytesRemaining}, Stream};
-handle_call({set_min_buffer, MinBuffer}, _From, Stream) ->
-    {reply, ok, Stream#write_stream{min_alloc = MinBuffer}};
-% set next_alloc if we need more room
-handle_call({ensure_buffer, BufferSizeRequested}, _From, Stream) ->
-    #write_stream{bytes_remaining = BytesRemainingInCurrentBuffer} = Stream,
-    case BytesRemainingInCurrentBuffer < BufferSizeRequested of
-        true ->  NextAlloc = BufferSizeRequested - BytesRemainingInCurrentBuffer;
-        false -> NextAlloc = 0 % enough room in current segment
-    end,
-    {reply, ok, Stream#write_stream{next_alloc = NextAlloc}};
 handle_call({write, Bin}, _From, Stream) ->
-    % ensure init is called first so we can get a pointer to the begining of the binary
-    {ok, Sp, Stream2} = write_data(Stream, Bin),
-    {reply, {ok, Sp}, Stream2};
+    BinSize = iolist_size(Bin),
+    #stream{
+        fd = Fd,
+        written_len = WrittenLen,
+        written_pointers = Written,
+        buffer_len = BufferLen,
+        buffer_list = Buffer,
+        max_buffer = Max} = Stream,
+    if BinSize + BufferLen > Max ->
+        {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer, Bin)),
+        {reply, ok, Stream#stream{
+                        written_len=WrittenLen + BufferLen + BinSize,
+                        written_pointers=[Pos|Written],
+                        buffer_list=[],
+                        buffer_len=0}};
+    true ->
+        {reply, ok, Stream#stream{
+                        buffer_list=[Bin|Buffer],
+                        buffer_len=BufferLen + BinSize}}
+    end;
 handle_call(close, _From, Stream) ->
-    #write_stream{current_pos=Pos, bytes_remaining = BytesRemaining} = Stream,
-    {stop, normal, {ok, {Pos, BytesRemaining}}, Stream}.
+    #stream{
+        fd = Fd,
+        written_len = WrittenLen,
+        written_pointers = Written,
+        buffer_len = BufferLen,
+        buffer_list = Buffer} = Stream,
+    
+    case Buffer of
+    [] ->
+        Result = {Written, WrittenLen};
+    _ ->
+        {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer)),
+        Result = {[Pos|Written], WrittenLen + BufferLen}
+    end,
+    {stop, normal, Result, Stream}.
 
 handle_cast(_Msg, State) ->
     {noreply,State}.
@@ -160,14 +138,27 @@
 
 handle_info(_Info, State) ->
     {noreply, State}.
+    
+
 
-%%% Internal function %%%
+% 09 UPGRADE CODE
+old_read_term(Fd, Sp) ->
+    {ok, <<TermLen:(?STREAM_OFFSET_BITS)>>, Sp2}
+        = old_read(Fd, Sp, ?STREAM_OFFSET_BYTES),
+    {ok, Bin, _Sp3} = old_read(Fd, Sp2, TermLen),
+    {ok, binary_to_term(Bin)}.
 
-stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) ->
+old_read(Fd, Sp, Num) ->
+    {ok, RevBin, Sp2} = old_stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> [Bin | Acc] end, []),
+    Bin = list_to_binary(lists:reverse(RevBin)),
+    {ok, Bin, Sp2}.
+
+% 09 UPGRADE CODE
+old_stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) ->
     {ok, Acc, Sp};
-stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) ->
+old_stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) ->
     {ok, <<NextPos:(?FILE_POINTER_BITS), NextOffset:(?STREAM_OFFSET_BITS)>>}
-        = couch_file:pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
+        = couch_file:old_pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
     Sp = {NextPos, NextOffset},
     % Check NextPos is past current Pos (this is always true in a stream)
     % Guards against potential infinite loops caused by corruption.
@@ -175,86 +166,47 @@
         true -> ok;
         false -> throw({error, stream_corruption})
     end,
-    stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc);
-stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) ->
+    old_stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc);
+old_stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) ->
     ReadAmount = lists:min([MaxChunk, Num, Offset]),
-    {ok, Bin} = couch_file:pread(Fd, Pos, ReadAmount),
+    {ok, Bin} = couch_file:old_pread(Fd, Pos, ReadAmount),
     Sp = {Pos + ReadAmount, Offset - ReadAmount},
-    case Fun(Bin, Acc) of
-    {ok, Acc2} ->
-        stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Acc2);
-    {stop, Acc2} ->
-        {ok, Acc2, Sp}
-    end.
-
-write_data(Stream, <<>>) ->
-    {ok, {0,0}, Stream};
-write_data(#write_stream{bytes_remaining=0} = Stream, Bin) ->
-    #write_stream {
-        fd = Fd,
-        current_pos = CurrentPos,
-        next_alloc = NextAlloc,
-        min_alloc = MinAlloc
-        }= Stream,
-
-    NewSize = lists:max([MinAlloc, NextAlloc, size(Bin)]),
-    % no space in the current segment, must alloc a new segment
-    {ok, NewPos} = couch_file:expand(Fd, NewSize + ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
-
-    case CurrentPos of
-    0 ->
-        ok;
-    _ ->
-        ok = couch_file:pwrite(Fd, CurrentPos, <<NewPos:(?FILE_POINTER_BITS), NewSize:(?STREAM_OFFSET_BITS)>>)
-    end,
-    Stream2 = Stream#write_stream{
-        current_pos=NewPos,
-        bytes_remaining=NewSize,
-        next_alloc=0},
-    write_data(Stream2, Bin);
-write_data(#write_stream{fd=Fd, current_pos=Pos, bytes_remaining=BytesRemaining} = Stream, Bin) ->
-    BytesToWrite = lists:min([size(Bin), BytesRemaining]),
-    {WriteBin, Rest} = split_binary(Bin, BytesToWrite),
-    ok = couch_file:pwrite(Fd, Pos, WriteBin),
-    Stream2 = Stream#write_stream{
-        bytes_remaining=BytesRemaining - BytesToWrite,
-        current_pos=Pos + BytesToWrite
-        },
-    {ok, _, Stream3} = write_data(Stream2, Rest),
-    {ok, {Pos, BytesRemaining}, Stream3}.
+    old_stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Fun(Bin, Acc)).
 
 
 
 %%% Tests %%%
 
-
-test(Term) ->
-    {ok, Fd} = couch_file:open("foo", [write]),
-    {ok, Stream} = open({0,0}, Fd),
-    {ok, Pos} = write_term(Stream, Term),
-    {ok, Pos2} = write_term(Stream, {Term, Term}),
-    close(Stream),
+read_all(Fd, PosList) ->
+    iolist_to_binary(foldl(Fd, PosList,
+        fun(Bin, Acc) ->
+            [Bin, Acc]
+        end, [])).
+
+
+test() ->
+    {ok, Fd} = couch_file:open("foo", [create,overwrite]),
+    ok = couch_file:write_header(Fd, {howdy, howdy}),
+    Bin = <<"damienkatz">>,
+    {ok, Pos} = couch_file:append_binary(Fd, Bin),
+    {ok, Bin} = couch_file:pread_binary(Fd, Pos),
+    {ok, {howdy, howdy}} = couch_file:read_header(Fd),
+    ok = couch_file:write_header(Fd, {foo, foo}),
+    {ok, {foo, foo}} = couch_file:read_header(Fd),
+    
+    {ok, Stream} = open(Fd),
+    ok = write(Stream, <<"food">>),
+    ok = write(Stream, <<"foob">>),
+    {PosList, 8} = close(Stream),
+    <<"foodfoob">> = read_all(Fd, PosList),
+    {ok, Stream2} = open(Fd),
+    OneBits = <<1:(8*10)>>,
+    ZeroBits = <<0:(8*10)>>,
+    ok = write(Stream2, OneBits),
+    ok = write(Stream2, ZeroBits),
+    {PosList2, 20} = close(Stream2),
+    AllBits = iolist_to_binary([OneBits,ZeroBits]),
+    AllBits = read_all(Fd, PosList2),
     couch_file:close(Fd),
-    {ok, Fd2} = couch_file:open("foo", [read, write]),
-    {ok, Stream2} = open({0,0}, Fd2),
-    {ok, Term1} = read_term(Fd2, Pos),
-    io:format("Term1: ~w ~n",[Term1]),
-    {ok, Term2} = read_term(Fd2, Pos2),
-    io:format("Term2: ~w ~n",[Term2]),
-    {ok, PointerList} = deep_write_test(Stream2, Term, 1000, []),
-    deep_read_test(Fd2, PointerList),
-    close(Stream2),
-    couch_file:close(Fd2).
+    PosList2.
 
-deep_read_test(_Fd, []) ->
-    ok;
-deep_read_test(Fd, [Pointer | RestPointerList]) ->
-    {ok, _Term} = read_term(Fd, Pointer),
-    deep_read_test(Fd, RestPointerList).
-
-deep_write_test(_Stream, _Term, 0, PointerList) ->
-    {ok, PointerList};
-deep_write_test(Stream, Term, N, PointerList) ->
-    WriteList = lists:duplicate(random:uniform(N), Term),
-    {ok, Pointer} = write_term(Stream, WriteList),
-    deep_write_test(Stream, Term, N-1, [Pointer | PointerList]).

Modified: couchdb/trunk/src/couchdb/couch_view_group.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_view_group.erl?rev=778485&r1=778484&r2=778485&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_view_group.erl (original)
+++ couchdb/trunk/src/couchdb/couch_view_group.erl Mon May 25 19:52:28 2009
@@ -205,7 +205,7 @@
     if CommittedSeq >= Group#group.current_seq ->
         % save the header
         Header = {Group#group.sig, get_index_header_data(Group)},
-        ok = couch_file:write_header(Group#group.fd, <<$r, $c, $k, 0>>, Header),
+        ok = couch_file:write_header(Group#group.fd, Header),
         {noreply, State#group_state{waiting_commit=false}};
     true ->
         % We can't commit the header because the database seq that's fully
@@ -261,7 +261,7 @@
 handle_info({'EXIT', _FromPid, normal}, State) ->
     {noreply, State};
     
-handle_info({'EXIT', FromPid, {{nocatch, Reason}, Trace}}, State) ->
+handle_info({'EXIT', FromPid, {{nocatch, Reason}, _Trace}}, State) ->
     ?LOG_DEBUG("Uncaught throw() in linked pid: ~p", [{FromPid, Reason}]),
     {stop, Reason, State};
 
@@ -313,7 +313,9 @@
             if ForceReset ->
                 {ok, reset_file(Db, Fd, DbName, Group)};
             true ->
-                case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of
+                % 09 UPGRADE CODE
+                ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>),
+                case (catch couch_file:read_header(Fd)) of
                 {ok, {Sig, HeaderInfo}} ->
                     % sigs match!
                     {ok, init_group(Db, Fd, Group, HeaderInfo)};
@@ -417,7 +419,7 @@
 reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
     ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]),
     ok = couch_file:truncate(Fd, 0),
-    ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}),
+    ok = couch_file:write_header(Fd, {Sig, nil}),
     init_group(Db, Fd, reset_group(Group), nil).
 
 delete_index_file(RootDir, DbName, GroupId) ->



Mime
View raw message