couchdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cml...@apache.org
Subject svn commit: r642432 [15/16] - in /incubator/couchdb/trunk: ./ bin/ build-contrib/ etc/ etc/conf/ etc/default/ etc/init/ etc/launchd/ etc/logrotate.d/ share/ share/server/ share/www/ share/www/browse/ share/www/image/ share/www/script/ share/www/style/ ...
Date Fri, 28 Mar 2008 23:32:30 GMT
Added: incubator/couchdb/trunk/src/couchdb/couch_event_sup.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_event_sup.erl?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_event_sup.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_event_sup.erl Fri Mar 28 16:32:19 2008
@@ -0,0 +1,69 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License.  You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% The purpose of this module is to allow event handlers to particpate in Erlang
+%% supervisor trees. It provide a monitorable process that crashes if the event
+%% handler fails. The process, when shutdown, deregisters the event handler.
+
+-module(couch_event_sup).
+-behaviour(gen_server).
+
+-include("couch_db.hrl").
+
+-export([start_link/3,start_link/4, stop/1]).
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]).
+
+%
+% Instead calling the
+% ok = gen_event:add_sup_handler(error_logger, my_log, Args)
+%
+% do this:
+% {ok, LinkedPid} = couch_event_sup:start_link(error_logger, my_log, Args)
+%
+% The benefit is the event is now part of the process tree, and can be
+% started, restarted and shutdown consistently like the rest of the server
+% components.
+%
+% And now if the "event" crashes, the supervisor is notified and can restart
+% the event handler.
+%
+% Use this form to named process:
+% {ok, LinkedPid} = couch_event_sup:start_link({local, my_log}, error_logger, my_log, Args)
+%
+
+start_link(EventMgr, EventHandler, Args) ->
+    gen_server:start_link(couch_event_sup, {EventMgr, EventHandler, Args}, []).
+
+start_link(ServerName, EventMgr, EventHandler, Args) ->
+    gen_server:start_link(ServerName, couch_event_sup, {EventMgr, EventHandler, Args}, []).
+
+stop(Pid) ->
+    gen_server:cast(Pid, stop).
+
+init({EventMgr, EventHandler, Args}) ->
+    ok = gen_event:add_sup_handler(EventMgr, EventHandler, Args),
+    {ok, {EventMgr, EventHandler}}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+handle_call(_Whatever, _From, State) ->
+    {ok, State}.
+
+handle_cast(stop, State) ->
+    {stop, normal, State}.
+
+handle_info({gen_event_EXIT, _Handler, Reason}, State) ->
+    {stop, Reason, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.

Added: incubator/couchdb/trunk/src/couchdb/couch_file.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_file.erl?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_file.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_file.erl Fri Mar 28 16:32:19 2008
@@ -0,0 +1,323 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License.  You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_file).
+-behaviour(gen_server).
+
+-define(HEADER_SIZE, 2048). % size of each segment of the doubly written header
+
+-export([open/1, open/2, close/1, pread/3, pwrite/3, expand/2, bytes/1, sync/1]).
+-export([append_term/2, pread_term/2,write_header/3, read_header/2, truncate/2]).
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
+
+%%----------------------------------------------------------------------
+%% Args:   Valid Options are [create] and [create,overwrite].
+%%  Files are opened in read/write mode.
+%% Returns: On success, {ok, Fd}
+%%  or {error, Reason} if the file could not be opened.
+%%----------------------------------------------------------------------
+
+open(Filepath) ->
+    open(Filepath, []).
+    
+open(Filepath, Options) ->
+    case gen_server:start_link(couch_file, {Filepath, Options, self()}, []) of
+    {ok, FdPid} ->
+        % we got back an ok, but that doesn't really mean it was successful.
+        % Instead the true status has been sent back to us as a message.
+        % We do this because if the gen_server doesn't initialize properly,
+        % it generates a crash report that will get logged. This avoids
+        % that mess, because we don't want crash reports generated
+        % every time a file cannot be found.
+        receive
+        {FdPid, ok} ->
+            {ok, FdPid};
+        {FdPid, Error} ->
+            Error
+        end;
+    Error ->
+        Error
+    end.
+
+
+%%----------------------------------------------------------------------
+%% Args:    Pos is the offset from the beginning of the file, Bytes is
+%%  is the number of bytes to read.
+%% Returns: {ok, Binary} where Binary is a binary data from disk
+%%  or {error, Reason}.
+%%----------------------------------------------------------------------
+
+pread(Fd, Pos, Bytes) when Bytes > 0 ->
+    gen_server:call(Fd, {pread, Pos, Bytes}).
+
+
+%%----------------------------------------------------------------------
+%% Args:    Pos is the offset from the beginning of the file, Bin is
+%%  is the binary to write
+%% Returns: ok
+%%  or {error, Reason}.
+%%----------------------------------------------------------------------
+
+pwrite(Fd, Pos, Bin) ->
+    gen_server:call(Fd, {pwrite, Pos, Bin}).
+
+%%----------------------------------------------------------------------
+%% Purpose: To append a segment of zeros to the end of the file.
+%% Args:    Bytes is the number of bytes to append to the file.
+%% Returns: {ok, Pos} where Pos is the file offset to the beginning of
+%%  the new segments.
+%%  or {error, Reason}.
+%%----------------------------------------------------------------------
+
+expand(Fd, Bytes) when Bytes > 0 ->
+    gen_server:call(Fd, {expand, Bytes}).
+
+
+%%----------------------------------------------------------------------
+%% Purpose: To append an Erlang term to the end of the file.
+%% Args:    Erlang term to serialize and append to the file.
+%% Returns: {ok, Pos} where Pos is the file offset to the beginning the
+%%  serialized  term. Use pread_term to read the term back.
+%%  or {error, Reason}.
+%%----------------------------------------------------------------------
+
+append_term(Fd, Term) ->
+    gen_server:call(Fd, {append_term, Term}).
+
+
+%%----------------------------------------------------------------------
+%% Purpose: Reads a term from a file that was written with append_term
+%% Args:    Pos, the offset into the file where the term is serialized.
+%% Returns: {ok, Term}
+%%  or {error, Reason}.
+%%----------------------------------------------------------------------
+
+pread_term(Fd, Pos) ->
+    gen_server:call(Fd, {pread_term, Pos}).
+
+
+%%----------------------------------------------------------------------
+%% Purpose: The length of a file, in bytes.
+%% Returns: {ok, Bytes}
+%%  or {error, Reason}.
+%%----------------------------------------------------------------------
+
+% length in bytes
+bytes(Fd) ->
+    gen_server:call(Fd, bytes).
+
+%%----------------------------------------------------------------------
+%% Purpose: Truncate a file to the number of bytes.
+%% Returns: ok
+%%  or {error, Reason}.
+%%----------------------------------------------------------------------
+
+truncate(Fd, Pos) ->
+    gen_server:call(Fd, {truncate, Pos}).
+
+%%----------------------------------------------------------------------
+%% Purpose: Ensure all bytes written to the file are flushed to disk.
+%% Returns: ok
+%%  or {error, Reason}.
+%%----------------------------------------------------------------------
+
+sync(Fd) ->
+    gen_server:call(Fd, sync).
+
+%%----------------------------------------------------------------------
+%% Purpose: Close the file. Is performed asynchronously.
+%% Returns: ok
+%%----------------------------------------------------------------------
+close(Fd) ->
+    gen_server:cast(Fd, close).
+
+
+write_header(Fd, Prefix, Data) ->
+    % The leading bytes in every db file, the sig and the file version:
+    %the actual header data
+    TermBin = term_to_binary(Data),
+    % the size of all the bytes written to the header, including the md5 signature (16 bytes)
+    FilledSize = size(Prefix) + size(TermBin) + 16,
+    case FilledSize > ?HEADER_SIZE of
+    true ->
+        % too big!
+        {error, error_header_too_large};
+    false ->
+        % pad out the header with zeros, then take the md5 hash
+        PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize))>>,
+        Sig = erlang:md5([TermBin, PadZeros]),
+        % now we assemble the final header binary and write to disk
+        WriteBin = <<Prefix/binary, TermBin/binary, PadZeros/binary, Sig/binary>>,
+        ?HEADER_SIZE = size(WriteBin), % sanity check
+        DblWriteBin = [WriteBin, WriteBin],
+        ok = pwrite(Fd, 0, DblWriteBin)
+    end.
+
+
+read_header(Fd, Prefix) ->
+    {ok, Bin} = couch_file:pread(Fd, 0, 2*(?HEADER_SIZE)),
+    <<Bin1:(?HEADER_SIZE)/binary, Bin2:(?HEADER_SIZE)/binary>> = Bin,
+    % read the first header
+    case extract_header(Prefix, Bin1) of
+    {ok, Header1} ->
+        case extract_header(Prefix, Bin2) of
+        {ok, Header2} ->
+            case Header1 == Header2 of
+            true ->
+                % Everything is completely normal!
+                {ok, Header1};
+            false ->
+                % To get here we must have two different header versions with signatures intact.
+                % It's weird but possible (a commit failure right at the 2k boundary). Log it and take the first.
+                couch_log:info("Header version differences.~nPrimary Header: ~p~nSecondary Header: ~p", [Header1, Header2]),
+                {ok, Header1}
+            end;
+        {error, Error} ->
+            % error reading second header. It's ok, but log it.
+            couch_log:info("Secondary header corruption (error: ~p). Using primary header.", [Error]),
+            {ok, Header1}
+        end;
+    {error, Error} ->
+        % error reading primary header
+        case extract_header(Prefix, Bin2) of
+        {ok, Header2} ->
+            % log corrupt primary header. It's ok since the secondary is still good.
+            couch_log:info("Primary header corruption (error: ~p). Using secondary header.", [Error]),
+            {ok, Header2};
+        _ ->
+            % error reading secondary header too
+            % return the error, no need to log anything as the caller will be responsible for dealing with the error.
+            {error, Error}
+        end
+    end.
+
+
+extract_header(Prefix, Bin) ->
+    SizeOfPrefix = size(Prefix),
+    SizeOfTermBin = ?HEADER_SIZE -
+                    SizeOfPrefix -
+                    16,     % md5 sig
+
+    <<HeaderPrefix:SizeOfPrefix/binary, TermBin:SizeOfTermBin/binary, Sig:16/binary>> = Bin,
+
+    % check the header prefix
+    case HeaderPrefix of
+    Prefix ->
+        % check the integrity signature
+        case erlang:md5(TermBin) == Sig of
+        true ->
+            Header = binary_to_term(TermBin),
+            {ok, Header};
+        false ->
+            {error, header_corrupt}
+        end;
+    _ ->
+        {error, unknown_header_type}
+    end.
+
+
+
+init_status_ok(ReturnPid, Fd) ->
+    ReturnPid ! {self(), ok}, % signal back ok
+    {ok, Fd}.
+
+init_status_error(ReturnPid, Error) ->
+    ReturnPid ! {self(), Error}, % signal back error status
+    self() ! self_close, % tell ourself to close async
+    {ok, nil}.
+
+% server functions
+
+init({Filepath, Options, ReturnPid}) ->
+    case lists:member(create, Options) of
+    true ->
+        filelib:ensure_dir(Filepath),
+        case file:open(Filepath, [read, write, raw, binary]) of
+        {ok, Fd} ->
+            {ok, Length} = file:position(Fd, eof),
+            case Length > 0 of
+            true ->
+                % this means the file already exists and has data.
+                % FYI: We don't differentiate between empty files and non-existant
+                % files here.
+                case lists:member(overwrite, Options) of
+                true ->
+                    {ok, 0} = file:position(Fd, 0),
+                    ok = file:truncate(Fd),
+                    init_status_ok(ReturnPid, Fd);
+                false ->
+                    ok = file:close(Fd),
+                    init_status_error(ReturnPid, {error, file_exists})
+                end;
+            false ->
+                init_status_ok(ReturnPid, Fd)
+            end;
+        Error ->
+            init_status_error(ReturnPid, Error)
+        end;
+    false ->
+        % open in read mode first, so we don't create the file if it doesn't exist.
+        case file:open(Filepath, [read, raw]) of
+        {ok, Fd_Read} ->
+            {ok, Fd} = file:open(Filepath, [read, write, raw, binary]),
+            ok = file:close(Fd_Read),
+            init_status_ok(ReturnPid, Fd);
+        Error ->
+            init_status_error(ReturnPid, Error)
+        end
+    end.
+
+
+terminate(_Reason, nil) ->
+    ok;
+terminate(_Reason, Fd) ->
+    file:close(Fd),
+    ok.
+
+
+handle_call({pread, Pos, Bytes}, _From, Fd) ->
+    {reply, file:pread(Fd, Pos, Bytes), Fd};
+handle_call({pwrite, Pos, Bin}, _From, Fd) ->
+    {reply, file:pwrite(Fd, Pos, Bin), Fd};
+handle_call({expand, Num}, _From, Fd) ->
+    {ok, Pos} = file:position(Fd, eof),
+    {reply, {file:pwrite(Fd, Pos + Num - 1, <<0>>), Pos}, Fd};
+handle_call(bytes, _From, Fd) ->
+    {reply, file:position(Fd, eof), Fd};
+handle_call(sync, _From, Fd) ->
+    {reply, file:sync(Fd), Fd};
+handle_call({truncate, Pos}, _From, Fd) ->
+    {ok, Pos} = file:position(Fd, Pos),
+    {reply, file:truncate(Fd), Fd};
+handle_call({append_term, Term}, _From, Fd) ->
+    Bin = term_to_binary(Term, [compressed]),
+    TermLen = size(Bin),
+    Bin2 = <<TermLen:32, Bin/binary>>,
+    {ok, Pos} = file:position(Fd, eof),
+    {reply, {file:pwrite(Fd, Pos, Bin2), Pos}, Fd};
+handle_call({pread_term, Pos}, _From, Fd) ->
+    {ok, <<TermLen:32>>}
+        = file:pread(Fd, Pos, 4),
+    {ok, Bin} = file:pread(Fd, Pos + 4, TermLen),
+    {reply, {ok, binary_to_term(Bin)}, Fd}.
+
+
+handle_cast(close, Fd) ->
+    {stop,normal,Fd}. % causes terminate to be called
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+handle_info(self_close, State) ->
+    {stop,normal,State};
+handle_info(_Info, State) ->
+    {noreply, State}.

Added: incubator/couchdb/trunk/src/couchdb/couch_ft_query.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_ft_query.erl?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_ft_query.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_ft_query.erl Fri Mar 28 16:32:19 2008
@@ -0,0 +1,78 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License.  You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_ft_query).
+-behaviour(gen_server).
+
+-export([start_link/1, execute/2]).
+
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3, stop/0]).
+
+-define(ERR_HANDLE, {Port, {exit_status, Status}} -> {stop, {unknown_error, Status}, {unknown_error, Status}, Port}).
+
+start_link(QueryExec) ->
+    gen_server:start_link({local, couch_ft_query}, couch_ft_query, QueryExec, []).
+
+stop() ->
+    exit(whereis(couch_ft_query), close).
+
+execute(DatabaseName, QueryString) ->
+    gen_server:call(couch_ft_query, {ft_query, DatabaseName, QueryString}).
+
+init(QueryExec) ->
+    Port = open_port({spawn, QueryExec}, [{line, 1000}, exit_status, hide]),
+    {ok, Port}.
+
+terminate(_Reason, _Server) ->
+    ok.
+
+handle_call({ft_query, Database, QueryText}, _From, Port) ->
+    %% send the database name
+    true = port_command(Port, Database ++ "\n"),
+    true = port_command(Port, QueryText ++ "\n"),
+    case get_line(Port) of
+    "ok" ->
+        DocIds = read_query_results(Port, []),
+        {reply, {ok, DocIds}, Port};
+    "error" ->
+        ErrorId = get_line(Port),
+        ErrorMsg = get_line(Port),
+        {reply, {list_to_atom(ErrorId), ErrorMsg}, Port}
+    end.
+
+read_query_results(Port, Acc) ->
+    case get_line(Port) of
+    "" -> % line by itself means all done
+        lists:reverse(Acc);
+    DocId ->
+        Score = get_line(Port),
+        read_query_results(Port, [{DocId, Score} | Acc])
+    end.
+
+
+get_line(Port) ->
+    receive
+    {Port, {data, {eol, Line}}} ->
+        Line;
+    ?ERR_HANDLE
+    end.
+
+handle_cast(_Whatever, State) ->
+    {noreply, State}.
+
+handle_info({Port, {exit_status, Status}}, Port) ->
+    {stop, {os_process_exited, Status}, Port};
+handle_info(_Whatever, State) ->
+    {noreply, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.

Added: incubator/couchdb/trunk/src/couchdb/couch_js.c
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_js.c?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_js.c (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_js.c Fri Mar 28 16:32:19 2008
@@ -0,0 +1,452 @@
+/*
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not use
+this file except in compliance with the License.  You may obtain a copy of the
+License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software distributed
+under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+CONDITIONS OF ANY KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations under the License.
+
+*/
+
+#include <stdio.h>
+#include <jsapi.h>
+
+int gExitCode = 0;
+int gStackChunkSize = 8L * 1024L;
+
+int
+EncodeChar(uint8 *utf8Buffer, uint32 ucs4Char) {
+    int utf8Length = 1;
+
+    if (ucs4Char < 0x80) {
+        *utf8Buffer = (uint8)ucs4Char;
+    } else {
+        int i;
+        uint32 a = ucs4Char >> 11;
+        utf8Length = 2;
+        while (a) {
+            a >>= 5;
+            utf8Length++;
+        }
+        i = utf8Length;
+        while (--i) {
+            utf8Buffer[i] = (uint8)((ucs4Char & 0x3F) | 0x80);
+            ucs4Char >>= 6;
+        }
+        *utf8Buffer = (uint8)(0x100 - (1 << (8-utf8Length)) + ucs4Char);
+    }
+    return utf8Length;
+}
+
+JSBool
+EncodeString(const jschar *src, size_t srclen, char *dst, size_t *dstlenp) {
+    size_t i, utf8Len, dstlen = *dstlenp, origDstlen = dstlen;
+    jschar c, c2;
+    uint32 v;
+    uint8 utf8buf[6];
+
+    if (!dst)
+        dstlen = origDstlen = (size_t) -1;
+
+    while (srclen) {
+        c = *src++;
+        srclen--;
+        if ((c >= 0xDC00) && (c <= 0xDFFF))
+            goto badSurrogate;
+        if (c < 0xD800 || c > 0xDBFF) {
+            v = c;
+        } else {
+            if (srclen < 1)
+                goto bufferTooSmall;
+            c2 = *src++;
+            srclen--;
+            if ((c2 < 0xDC00) || (c2 > 0xDFFF)) {
+                c = c2;
+                goto badSurrogate;
+            }
+            v = ((c - 0xD800) << 10) + (c2 - 0xDC00) + 0x10000;
+        }
+        if (v < 0x0080) {
+            /* no encoding necessary - performance hack */
+            if (!dstlen)
+                goto bufferTooSmall;
+            if (dst)
+                *dst++ = (char) v;
+            utf8Len = 1;
+        } else {
+            utf8Len = EncodeChar(utf8buf, v);
+            if (utf8Len > dstlen)
+                goto bufferTooSmall;
+            if (dst) {
+                for (i = 0; i < utf8Len; i++)
+                    *dst++ = (char) utf8buf[i];
+            }
+        }
+        dstlen -= utf8Len;
+    }
+    *dstlenp = (origDstlen - dstlen);
+    return JS_TRUE;
+
+badSurrogate:
+    *dstlenp = (origDstlen - dstlen);
+    return JS_FALSE;
+
+bufferTooSmall:
+    *dstlenp = (origDstlen - dstlen);
+    return JS_FALSE;
+}
+
+static uint32
+DecodeChar(const uint8 *utf8Buffer, int utf8Length) {
+    uint32 ucs4Char;
+    uint32 minucs4Char;
+    /* from Unicode 3.1, non-shortest form is illegal */
+    static const uint32 minucs4Table[] = {
+        0x00000080, 0x00000800, 0x0001000, 0x0020000, 0x0400000
+    };
+
+    if (utf8Length == 1) {
+        ucs4Char = *utf8Buffer;
+    } else {
+        ucs4Char = *utf8Buffer++ & ((1<<(7-utf8Length))-1);
+        minucs4Char = minucs4Table[utf8Length-2];
+        while (--utf8Length) {
+            ucs4Char = ucs4Char<<6 | (*utf8Buffer++ & 0x3F);
+        }
+        if (ucs4Char < minucs4Char ||
+            ucs4Char == 0xFFFE || ucs4Char == 0xFFFF) {
+            ucs4Char = 0xFFFD;
+        }
+    }
+    return ucs4Char;
+}
+
+JSBool
+DecodeString(const char *src, size_t srclen, jschar *dst, size_t *dstlenp) {
+    uint32 v;
+    size_t offset = 0, j, n, dstlen = *dstlenp, origDstlen = dstlen;
+
+    if (!dst)
+        dstlen = origDstlen = (size_t) -1;
+
+    while (srclen) {
+        v = (uint8) *src;
+        n = 1;
+        if (v & 0x80) {
+            while (v & (0x80 >> n))
+                n++;
+            if (n > srclen)
+                goto bufferTooSmall;
+            if (n == 1 || n > 6)
+                goto badCharacter;
+            for (j = 1; j < n; j++) {
+                if ((src[j] & 0xC0) != 0x80)
+                    goto badCharacter;
+            }
+            v = DecodeChar((const uint8 *) src, n);
+            if (v >= 0x10000) {
+                v -= 0x10000;
+                if (v > 0xFFFFF || dstlen < 2) {
+                    *dstlenp = (origDstlen - dstlen);
+                    return JS_FALSE;
+                }
+                if (dstlen < 2)
+                    goto bufferTooSmall;
+                if (dst) {
+                    *dst++ = (jschar)((v >> 10) + 0xD800);
+                    v = (jschar)((v & 0x3FF) + 0xDC00);
+                }
+                dstlen--;
+            }
+        }
+        if (!dstlen)
+            goto bufferTooSmall;
+        if (dst)
+            *dst++ = (jschar) v;
+        dstlen--;
+        offset += n;
+        src += n;
+        srclen -= n;
+    }
+    *dstlenp = (origDstlen - dstlen);
+    return JS_TRUE;
+
+badCharacter:
+    *dstlenp = (origDstlen - dstlen);
+    return JS_FALSE;
+
+bufferTooSmall:
+    *dstlenp = (origDstlen - dstlen);
+    return JS_FALSE;
+}
+
+static JSBool
+EvalInContext(JSContext *context, JSObject *obj, uintN argc, jsval *argv,
+              jsval *rval) {
+    JSString *str;
+    JSObject *sandbox;
+    JSContext *sub_context;
+    const jschar *src;
+    size_t srclen;
+    JSBool ok;
+    jsval v;
+
+    sandbox = NULL;
+    if (!JS_ConvertArguments(context, argc, argv, "S / o", &str, &sandbox))
+        return JS_FALSE;
+
+    sub_context = JS_NewContext(JS_GetRuntime(context), gStackChunkSize);
+    if (!sub_context) {
+        JS_ReportOutOfMemory(context);
+        return JS_FALSE;
+    }
+
+    src = JS_GetStringChars(str);
+    srclen = JS_GetStringLength(str);
+
+    if (!sandbox) {
+        sandbox = JS_NewObject(sub_context, NULL, NULL, NULL);
+        if (!sandbox || !JS_InitStandardClasses(sub_context, sandbox)) {
+            ok = JS_FALSE;
+            goto out;
+        }
+    }
+
+    if (srclen == 0) {
+        *rval = OBJECT_TO_JSVAL(sandbox);
+        ok = JS_TRUE;
+    } else {
+        ok = JS_EvaluateUCScript(sub_context, sandbox, src, srclen, NULL, -1,
+                                 rval);
+    }
+
+out:
+    JS_DestroyContext(sub_context);
+    return ok;
+}
+
+static JSBool
+GC(JSContext *context, JSObject *obj, uintN argc, jsval *argv, jsval *rval) {
+    JS_GC(context);
+    return JS_TRUE;
+}
+
+static JSBool
+Print(JSContext *context, JSObject *obj, uintN argc, jsval *argv, jsval *rval) {
+    uintN i, n;
+    size_t cl, bl;
+    JSString *str;
+    jschar *chars;
+    char *bytes;
+
+    for (i = n = 0; i < argc; i++) {
+        str = JS_ValueToString(context, argv[i]);
+        if (!str)
+            return JS_FALSE;
+        chars = JS_GetStringChars(str);
+        cl = JS_GetStringLength(str);
+        if (!EncodeString(chars, cl, NULL, &bl))
+            return JS_FALSE;
+        bytes = JS_malloc(context, bl + 1);
+        bytes[bl] = '\0';
+        if (!EncodeString(chars, cl, bytes, &bl)) {
+            JS_free(context, bytes);
+            return JS_FALSE;
+        }
+        fprintf(stdout, "%s%s", i ? " " : "", bytes);
+        JS_free(context, bytes);
+    }
+    n++;
+    if (n)
+        fputc('\n', stdout);
+    fflush(stdout);
+    return JS_TRUE;
+}
+
+static JSBool
+Quit(JSContext *context, JSObject *obj, uintN argc, jsval *argv, jsval *rval) {
+    JS_ConvertArguments(context, argc, argv, "/ i", &gExitCode);
+    return JS_FALSE;
+}
+
+static JSBool
+ReadLine(JSContext *context, JSObject *obj, uintN argc, jsval *argv, jsval *rval) {
+    char *bytes, *tmp;
+    jschar *chars;
+    size_t bufsize, byteslen, charslen, readlen;
+    JSString *str;
+
+    JS_MaybeGC(context);
+
+    byteslen = 0;
+    bufsize = 256;
+    bytes = JS_malloc(context, bufsize);
+    if (!bytes)
+        return JS_FALSE;
+
+    while ((readlen = js_fgets(bytes + byteslen, bufsize - byteslen, stdin)) > 0) {
+        byteslen += readlen;
+
+        /* Are we done? */
+        if (bytes[byteslen - 1] == '\n') {
+            bytes[byteslen - 1] = '\0';
+            break;
+        }
+
+        /* Else, grow our buffer for another pass */
+        tmp = JS_realloc(context, bytes, bufsize * 2);
+        if (!tmp) {
+            JS_free(context, bytes);
+            return JS_FALSE;
+        }
+
+        bufsize *= 2;
+        bytes = tmp;
+    }
+
+    /* Treat the empty string specially */
+    if (byteslen == 0) {
+        *rval = JS_GetEmptyStringValue(context);
+        JS_free(context, bytes);
+        return JS_TRUE;
+    }
+
+    /* Shrink the buffer to the real size */
+    tmp = JS_realloc(context, bytes, byteslen);
+    if (!tmp) {
+        JS_free(context, bytes);
+        return JS_FALSE;
+    }
+    bytes = tmp;
+
+    /* Decode the string from UTF-8 */
+    if (!DecodeString(bytes, byteslen, NULL, &charslen)) {
+        JS_free(context, bytes);
+        return JS_FALSE;
+    }
+    chars = JS_malloc(context, (charslen + 1) * sizeof(jschar));
+    if (!DecodeString(bytes, byteslen, chars, &charslen)) {
+        JS_free(context, bytes);
+        JS_free(context, chars);
+        return JS_FALSE;
+    }
+    chars[charslen] = '\0';
+
+    /* Initialize a JSString object */
+    str = JS_NewUCString(context, chars, charslen - 1);
+    if (!str) {
+        JS_free(context, bytes);
+        JS_free(context, chars);
+        return JS_FALSE;
+    }
+
+    *rval = STRING_TO_JSVAL(str);
+    return JS_TRUE;
+}
+
+static JSBool
+Seal(JSContext *context, JSObject *obj, uintN argc, jsval *argv, jsval *rval) {
+    JSObject *target;
+    JSBool deep = JS_FALSE;
+
+    if (!JS_ConvertArguments(context, argc, argv, "o/b", &target, &deep))
+        return JS_FALSE;
+    if (!target)
+        return JS_TRUE;
+    return JS_SealObject(context, target, deep);
+}
+
+static void
+ExecuteScript(JSContext *context, JSObject *obj, const char *filename) {
+    FILE *file;
+    JSScript *script;
+    jsval result;
+
+    if (!filename || strcmp(filename, "-") == 0) {
+        file = stdin;
+    } else {
+        file = fopen(filename, "r");
+        if (!file) {
+            fprintf(stderr, "could not open script file %s\n", filename);
+            gExitCode = 1;
+            return;
+        }
+    }
+
+    script = JS_CompileFileHandle(context, obj, filename, file);
+    if (script) {
+        JS_ExecuteScript(context, obj, script, &result);
+        JS_DestroyScript(context, script);
+    }
+}
+
+static uint32 gBranchCount = 0;
+static uint32 gBranchLimit = 100 * 1024;
+
+static JSBool
+BranchCallback(JSContext *context, JSScript *script) {
+    if (++gBranchCount == gBranchLimit) {
+        gBranchCount = 0;
+        return JS_FALSE;
+    }
+    if ((gBranchCount & 0x3fff) == 1) {
+        JS_MaybeGC(context);
+    }
+    return JS_TRUE;
+}
+
+static void
+PrintError(JSContext *context, const char *message, JSErrorReport *report) {
+    if (!report || !JSREPORT_IS_WARNING(report->flags))
+        fprintf(stderr, "%s\n", message);
+}
+
+int
+main(int argc, const char * argv[]) {
+    JSRuntime *runtime;
+    JSContext *context;
+    JSObject *global;
+
+    runtime = JS_NewRuntime(64L * 1024L * 1024L);
+    if (!runtime)
+        return 1;
+    context = JS_NewContext(runtime, gStackChunkSize);
+    if (!context)
+        return 1;
+    JS_SetErrorReporter(context, PrintError);
+    JS_SetBranchCallback(context, BranchCallback);
+    JS_ToggleOptions(context, JSOPTION_NATIVE_BRANCH_CALLBACK);
+    JS_ToggleOptions(context, JSOPTION_XML);
+
+    global = JS_NewObject(context, NULL, NULL, NULL);
+    if (!global)
+        return 1;
+    if (!JS_InitStandardClasses(context, global))
+        return 1;
+    if (!JS_DefineFunction(context, global, "evalcx", EvalInContext, 0, 0)
+     || !JS_DefineFunction(context, global, "gc", GC, 0, 0)
+     || !JS_DefineFunction(context, global, "print", Print, 0, 0)
+     || !JS_DefineFunction(context, global, "quit", Quit, 0, 0)
+     || !JS_DefineFunction(context, global, "readline", ReadLine, 0, 0)
+     || !JS_DefineFunction(context, global, "seal", Seal, 0, 0))
+        return 1;
+
+    if (argc != 2) {
+        fprintf(stderr, "incorrect number of arguments\n\n");
+        fprintf(stderr, "usage: %s <scriptfile>\n", argv[0]);
+        return 2;
+    }
+
+    ExecuteScript(context, global, argv[1]);
+
+    JS_DestroyContext(context);
+    JS_DestroyRuntime(runtime);
+    JS_ShutDown();
+
+    return gExitCode;
+}

Propchange: incubator/couchdb/trunk/src/couchdb/couch_js.c
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/couchdb/trunk/src/couchdb/couch_key_tree.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_key_tree.erl?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_key_tree.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_key_tree.erl Fri Mar 28 16:32:19 2008
@@ -0,0 +1,139 @@
+%   Copyright 2007, 2008 Damien Katz <damien_katz@yahoo.com>
+%
+%   Licensed under the Apache License, Version 2.0 (the "License");
+%   you may not use this file except in compliance with the License.
+%   You may obtain a copy of the License at
+%
+%       http://www.apache.org/licenses/LICENSE-2.0
+%
+%   Unless required by applicable law or agreed to in writing, software
+%   distributed under the License is distributed on an "AS IS" BASIS,
+%   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%   See the License for the specific language governing permissions and
+%   limitations under the License.
+
+-module(couch_key_tree).
+
+-export([merge/2, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2]).
+-export([map/2, get_all_leafs/1, get_leaf_keys/1, count_leafs/1]).
+
+% a key tree looks like this:
+% Tree -> [] or [{Key, Value, Tree} | SiblingTree]
+% ChildTree -> Tree
+% SiblingTree -> [] or [{SiblingKey, Value, Tree} | Tree]
+% And each Key < SiblingKey
+
+
+
+% key tree functions
+
+% When the same key is found in the trees, the value in tree B is discarded.
+merge([], B) ->
+    B;
+merge(A, []) ->
+    A;
+merge([ATree | ANextTree], [BTree | BNextTree]) ->
+    {AKey, AValue, ASubTree} = ATree,
+    {BKey, _BValue, BSubTree} = BTree,
+    if
+    AKey == BKey ->
+        %same key
+        MergedSubTree = merge(ASubTree, BSubTree),
+        MergedNextTree = merge(ANextTree, BNextTree),
+        [{AKey, AValue, MergedSubTree} | MergedNextTree];
+    AKey < BKey ->
+        [ATree | merge(ANextTree, [BTree | BNextTree])];
+    true ->
+        [BTree | merge([ATree | ANextTree], BNextTree)]
+    end.
+
+find_missing(_Tree, []) ->
+    [];
+find_missing([], Keys) ->
+    Keys;
+find_missing([{Key, _, SubTree} | RestTree], Keys) ->
+    SrcKeys2 = Keys -- Key,
+    SrcKeys3 = find_missing(SubTree, SrcKeys2),
+    find_missing(RestTree, SrcKeys3).
+    
+
+% get the leafs in the tree matching the keys. The matching key nodes can be
+% leafs or an inner nodes. If an inner node, then the leafs for that node
+% are returned.
+get_key_leafs(Tree, Keys) ->
+    get_key_leafs(Tree, Keys, []).
+    
+get_key_leafs(_Tree, [], _KeyPathAcc) ->
+    {[], []};
+get_key_leafs([], KeysToGet, _KeyPathAcc) ->
+    {[], KeysToGet};
+get_key_leafs([{Key, _Value, SubTree}=Tree | RestTree], KeysToGet, KeyPathAcc) ->
+    case KeysToGet -- [Key] of
+    KeysToGet -> % same list, key not found    
+        {LeafsFound, KeysToGet2} = get_key_leafs(SubTree, KeysToGet, [Key | KeyPathAcc]),
+        {RestLeafsFound, KeysRemaining} = get_key_leafs(RestTree, KeysToGet2, KeyPathAcc),
+        {LeafsFound ++ RestLeafsFound, KeysRemaining};
+    KeysToGet2 ->
+        LeafsFound = get_all_leafs([Tree], KeyPathAcc),
+        LeafKeysFound = [LeafKeyFound || {LeafKeyFound, _, _} <- LeafsFound],
+        KeysToGet2 = KeysToGet2 -- LeafKeysFound,
+        {RestLeafsFound, KeysRemaining} = get_key_leafs(RestTree, KeysToGet2, KeyPathAcc),
+        {LeafsFound ++ RestLeafsFound, KeysRemaining}
+    end.
+
+get(Tree, KeysToGet) ->
+    {KeyPaths, KeysNotFound} = get_full_key_paths(Tree, KeysToGet),
+    FixedResults = [ {Key, Value, [Key0 || {Key0, _} <- Path]} || [{Key, Value}|_] = Path <- KeyPaths],
+    {FixedResults, KeysNotFound}.
+
+get_full_key_paths(Tree, Keys) ->
+    get_full_key_paths(Tree, Keys, []).
+    
+get_full_key_paths(_Tree, [], _KeyPathAcc) ->
+    {[], []};
+get_full_key_paths([], KeysToGet, _KeyPathAcc) ->
+    {[], KeysToGet};
+get_full_key_paths([{KeyId, Value, SubTree} | RestTree], KeysToGet, KeyPathAcc) ->
+    KeysToGet2 = KeysToGet -- [KeyId],
+    CurrentNodeResult =
+    case length(KeysToGet2) == length(KeysToGet) of
+    true -> % not in the key list.
+        [];
+    false -> % this node is the key list. return it
+        [[{KeyId, Value} | KeyPathAcc]]
+    end,
+    {KeysGotten, KeysRemaining} = get_full_key_paths(SubTree, KeysToGet2, [{KeyId, Value} | KeyPathAcc]),
+    {KeysGotten2, KeysRemaining2} = get_full_key_paths(RestTree, KeysRemaining, KeyPathAcc),
+    {CurrentNodeResult ++ KeysGotten ++ KeysGotten2, KeysRemaining2}.
+
+get_all_leafs(Tree) ->
+    get_all_leafs(Tree, []).
+    
+get_all_leafs([], _KeyPathAcc) ->
+    [];
+get_all_leafs([{KeyId, Value, []} | RestTree], KeyPathAcc) ->
+    [{KeyId, Value, [KeyId | KeyPathAcc]} | get_all_leafs(RestTree, KeyPathAcc)];
+get_all_leafs([{KeyId, _Value, SubTree} | RestTree], KeyPathAcc) ->
+    get_all_leafs(SubTree, [KeyId | KeyPathAcc]) ++ get_all_leafs(RestTree, KeyPathAcc).
+
+get_leaf_keys([]) ->
+    [];
+get_leaf_keys([{Key, _Value, []} | RestTree]) ->
+    [Key | get_leaf_keys(RestTree)];
+get_leaf_keys([{_Key, _Value, SubTree} | RestTree]) ->
+    get_leaf_keys(SubTree) ++ get_leaf_keys(RestTree).
+    
+count_leafs([]) ->
+    0;
+count_leafs([{_Key, _Value, []} | RestTree]) ->
+    1 + count_leafs(RestTree);
+count_leafs([{_Key, _Value, SubTree} | RestTree]) ->
+    count_leafs(SubTree) + count_leafs(RestTree).
+    
+
+map(_Fun, []) ->
+    [];
+map(Fun, [{Key, Value, SubTree} | RestTree]) ->
+    Value2 = Fun(Key, Value),
+    [{Key, Value2, map(Fun, SubTree)} | map(Fun, RestTree)].
+

Added: incubator/couchdb/trunk/src/couchdb/couch_log.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_log.erl?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_log.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_log.erl Fri Mar 28 16:32:19 2008
@@ -0,0 +1,130 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License.  You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_log).
+-behaviour(gen_event).
+
+-export([start_link/2,stop/0]).
+-export([error/1,error/2,info/1,info/2,debug/1,debug/2,get_level/0,get_level_integer/0, set_level/1]).
+-export([init/1, handle_event/2, terminate/2, code_change/3, handle_info/2, handle_call/2]).
+
+-define(LEVEL_ERROR, 3).
+-define(LEVEL_INFO, 2).
+-define(LEVEL_DEBUG, 1).
+-define(LEVEL_TMI, 0).
+
+level_integer(error)    -> ?LEVEL_ERROR;
+level_integer(info)     -> ?LEVEL_INFO;
+level_integer(debug)    -> ?LEVEL_DEBUG;
+level_integer(tmi)      -> ?LEVEL_TMI;
+level_integer(_Else)    -> ?LEVEL_ERROR. % anything else default to ERROR level
+
+level_atom(?LEVEL_ERROR) -> error;
+level_atom(?LEVEL_INFO) -> info;
+level_atom(?LEVEL_DEBUG) -> debug;
+level_atom(?LEVEL_TMI) -> tmi.
+
+
+start_link(Filename, Level) ->
+    couch_event_sup:start_link({local, couch_log}, error_logger, couch_log, {Filename, Level}).
+
+stop() ->
+    couch_event_sup:stop(couch_log).
+
+init({Filename, Level}) ->
+    {ok, Fd} = file:open(Filename, [append]),
+    {ok, {Fd, level_integer(Level)}}.
+
+error(Msg) ->
+    error("~s", [Msg]).
+
+error(Format, Args) ->
+    error_logger:error_report(couch_error, {Format, Args}).
+
+info(Msg) ->
+    info("~s", [Msg]).
+
+info(Format, Args) ->
+    case get_level_integer() =< ?LEVEL_INFO of
+    true ->
+        error_logger:info_report(couch_info, {Format, Args});
+    false ->
+        ok
+    end.
+
+debug(Msg) ->
+    debug("~s", [Msg]).
+
+debug(Format, Args) ->
+    case get_level_integer() =< ?LEVEL_DEBUG of
+    true ->
+        error_logger:info_report(couch_debug, {Format, Args});
+    false ->
+        ok
+    end.
+
+set_level(LevelAtom) ->
+    set_level_integer(level_integer(LevelAtom)).
+
+get_level() ->
+    level_atom(get_level_integer()).
+
+get_level_integer() ->
+    catch gen_event:call(error_logger, couch_log, get_level_integer).
+
+set_level_integer(Int) ->
+    gen_event:call(error_logger, couch_log, {set_level_integer, Int}).
+
+handle_event({error_report, _, {Pid, couch_error, {Format, Args}}}, {Fd, _LogLevel}=State) ->
+    log(Fd, Pid, error, Format, Args),
+    {ok, State};
+handle_event({error_report, _, {Pid, _, _}}=Event, {Fd, _LogLevel}=State) ->
+    log(Fd, Pid, error, "~p", [Event]),
+    {ok, State};
+handle_event({error, _, {Pid, Format, Args}}, {Fd, _LogLevel}=State) ->
+    log(Fd, Pid, error, Format, Args),
+    {ok, State};
+handle_event({info_report, _, {Pid, couch_info, {Format, Args}}}, {Fd, LogLevel}=State)
+when LogLevel =< ?LEVEL_INFO ->
+    log(Fd, Pid, info, Format, Args),
+    {ok, State};
+handle_event({info_report, _, {Pid, couch_debug, {Format, Args}}}, {Fd, LogLevel}=State)
+when LogLevel =< ?LEVEL_DEBUG ->
+    log(Fd, Pid, debug, Format, Args),
+    {ok, State};
+handle_event({_, _, {Pid, _, _}}=Event, {Fd, LogLevel}=State)
+when LogLevel =< ?LEVEL_TMI ->
+    % log every remaining event if tmi!
+    log(Fd, Pid, tmi, "~p", [Event]),
+    {ok, State};
+handle_event(_Event, State) ->
+    {ok, State}.
+
+handle_call(get_level_integer, {_Fd, LogLevel}=State) ->
+    {ok, LogLevel, State};
+handle_call({set_level_integer, NewLevel}, {Fd, _LogLevel}) ->
+    {ok, ok, {Fd, NewLevel}}.
+
+handle_info(_Info, State) ->
+    {ok, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+terminate(_Arg, {Fd, _LoggingLevel}) ->
+    file:close(Fd).
+
+log(Fd, Pid, Level, Format, Args) ->
+    Msg = io_lib:format(Format, Args),
+    ok = io:format("[~s] [~p] ~s~n", [Level, Pid, Msg]), % dump to console too
+    {ok, Msg2, _} = regexp:gsub(lists:flatten(Msg),"\\r\\n|\\r|\\n", "\r\n"),
+    ok = io:format(Fd, "[~s] [~s] [~p] ~s\r~n\r~n", [httpd_util:rfc1123_date(), Level, Pid, Msg2]).

Added: incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_query_servers.erl Fri Mar 28 16:32:19 2008
@@ -0,0 +1,206 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License.  You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_query_servers).
+-behaviour(gen_server).
+
+-export([start_link/1]).
+
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3,stop/0]).
+-export([start_doc_map/2, map_docs/2, stop_doc_map/1]).
+
+-export([test/0, test/1]).
+
+-include("couch_db.hrl").
+
+timeout() ->
+    % hardcoded 5 sec timeout per document
+    5000.
+
+start_link(QueryServerList) ->
+    gen_server:start_link({local, couch_query_servers}, couch_query_servers, QueryServerList, []).
+
+stop() ->
+    exit(whereis(couch_query_servers), close).
+
+readline(Port) ->
+    readline(Port, []).
+
+readline(Port, Acc) ->
+    Timer = erlang:send_after(timeout(), self(), timeout),
+    Result =
+    receive
+    {Port, {data, {noeol, Data}}} ->
+        readline(Port, [Data|Acc]);
+    {Port, {data, {eol, Data}}} ->
+        lists:flatten(lists:reverse(Acc, Data));
+    {Port, Err} ->
+        catch port_close(Port),
+        erlang:cancel_timer(Timer),
+        throw({map_process_error, Err});
+    timeout ->
+        catch port_close(Port),
+        throw({map_process_error, "map function timed out"})
+    end,
+    case erlang:cancel_timer(Timer) of
+    false ->
+        % message already sent. clear it
+        receive timeout -> ok end;
+    _ ->
+        ok
+    end,
+    Result.
+
+read_json(Port) ->
+    case cjson:decode(readline(Port)) of
+    {obj, [{"log", Msg}]} when is_list(Msg) ->
+        % we got a message to log. Log it and continue
+        couch_log:info("Query Server Log Message: ~s", [Msg]),
+        read_json(Port);
+    Else ->
+        Else
+    end.
+
+writeline(Port, String) ->
+    true = port_command(Port, String ++ "\n").
+
+% send command and get a response.
+prompt(Port, Json) ->
+    writeline(Port, cjson:encode(Json)),
+    read_json(Port).
+
+
+start_doc_map(Lang, Functions) ->
+    Port =
+    case gen_server:call(couch_query_servers, {get_port, Lang}) of
+    {ok, Port0} ->
+        link(Port0),
+        Port0;
+    {empty, Cmd} ->
+        couch_log:info("Spawning new ~s instance.", [Lang]),
+        open_port({spawn, Cmd}, [stream,
+                                    {line, 1000},
+                                    exit_status,
+                                    hide]);
+    Error ->
+        throw(Error)
+    end,
+    true = prompt(Port, {"reset"}),
+    % send the functions as json strings
+    lists:foreach(fun(FunctionSource) ->
+            case prompt(Port, {"add_fun", FunctionSource}) of
+            true -> ok;
+            {obj, [{"error", Id}, {"reason", Reason}]} ->
+                throw({Id, Reason})
+            end
+        end,
+        Functions),
+    {ok, {Lang, Port}}.
+
+map_docs({_Lang, Port}, Docs) ->
+    % send the documents
+    Results =
+    lists:map(
+        fun(Doc) ->
+            Json = couch_doc:to_json_obj(Doc, []),
+            case prompt(Port, {"map_doc", Json}) of
+            {obj, [{"error", Id}, {"reason", Reason}]} ->
+                throw({list_to_atom(Id),Reason});
+            {obj, [{"reason", Reason}, {"error", Id}]} ->
+                throw({list_to_atom(Id),Reason});
+            Results when is_tuple(Results) ->
+                % the results are a json array of function map yields like this:
+                % {FunResults1, FunResults2 ...}
+                % where funresults is are json arrays of key value pairs:
+                % {{Key1, Value1}, {Key2, Value2}}
+                % Convert to real lists, execept the key, value pairs
+                [tuple_to_list(FunResult) || FunResult <- tuple_to_list(Results)]
+            end
+        end,
+        Docs),
+    {ok, Results}.
+
+
+stop_doc_map(nil) ->
+    ok;
+stop_doc_map({Lang, Port}) ->
+    ok = gen_server:call(couch_query_servers, {return_port, {Lang, Port}}),
+    true = unlink(Port),
+    ok.
+
+init(QueryServerList) ->
+    {ok, {QueryServerList, []}}.
+
+terminate(_Reason, _Server) ->
+    ok.
+
+
+handle_call({get_port, Lang}, {FromPid, _}, {QueryServerList, LangPorts}) ->
+    case lists:keysearch(Lang, 1, LangPorts) of
+    {value, {_, Port}=LangPort} ->
+        Result =
+        case catch port_connect(Port, FromPid) of
+        true ->
+            true = unlink(Port),
+            {ok, Port};
+        Error ->
+            catch port_close(Port),
+            Error
+        end,
+        {reply, Result, {QueryServerList, LangPorts -- [LangPort]}};
+    false ->
+        case lists:keysearch(Lang, 1, QueryServerList) of
+        {value, {_, ServerCmd}} ->
+            {reply, {empty, ServerCmd}, {QueryServerList, LangPorts}};
+        false -> % not a supported language
+            {reply, {query_language_unknown, Lang}, {QueryServerList, LangPorts}}
+        end
+    end;
+handle_call({return_port, {Lang, Port}}, _From, {QueryServerList, LangPorts}) ->
+    case catch port_connect(Port, self()) of
+    true ->
+        {reply, ok, {QueryServerList, [{Lang, Port} | LangPorts]}};
+    _ ->
+        catch port_close(Port),
+        {reply, ok, {QueryServerList, LangPorts}}
+    end.
+
+handle_cast(_Whatever, {Cmd, Ports}) ->
+    {noreply, {Cmd, Ports}}.
+
+handle_info({Port, {exit_status, Status}}, {QueryServerList, LangPorts}) ->
+    case lists:keysearch(Port, 2, LangPorts) of
+    {value, {Lang, _}} ->
+        case Status of
+        0 -> ok;
+        _ -> couch_log:error("Abnormal shutdown of ~s query server process (exit_status: ~w).", [Lang, Status])
+        end,
+        {noreply, {QueryServerList,  lists:keydelete(Port, 2, LangPorts)}};
+    _ ->
+        couch_log:error("Unknown linked port/process crash: ~p", [Port])
+    end;
+handle_info(_Whatever, {Cmd, Ports}) ->
+    {noreply, {Cmd, Ports}}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+test() ->
+    test("../js/js -f main.js").
+
+test(Cmd) ->
+    start_link(Cmd),
+    {ok, DocMap} = start_doc_map("javascript", ["function(doc) {if (doc[0] == 'a') return doc[1];}"]),
+    {ok, Results} = map_docs(DocMap, [#doc{body={"a", "b"}}, #doc{body={"c", "d"}},#doc{body={"a", "c"}}]),
+    io:format("Results: ~w~n", [Results]),
+    stop_doc_map(DocMap),
+    ok.

Added: incubator/couchdb/trunk/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_rep.erl?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_rep.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_rep.erl Fri Mar 28 16:32:19 2008
@@ -0,0 +1,308 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License.  You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rep).
+
+-include("couch_db.hrl").
+
+-export([replicate/2, replicate/3, test/0, test_write_docs/3]).
+
+-record(stats, {
+    docs_read=0,
+    read_errors=0,
+    docs_copied=0,
+    copy_errors=0
+    }).
+
+
+url_encode([H|T]) ->
+    if
+    H >= $a, $z >= H ->
+        [H|url_encode(T)];
+    H >= $A, $Z >= H ->
+        [H|url_encode(T)];
+    H >= $0, $9 >= H ->
+        [H|url_encode(T)];
+    H == $_; H == $.; H == $-; H == $: ->
+        [H|url_encode(T)];
+    true ->
+        case lists:flatten(io_lib:format("~.16.0B", [H])) of
+        [X, Y] ->
+            [$%, X, Y | url_encode(T)];
+        [X] ->
+            [$%, $0, X | url_encode(T)]
+        end
+    end;
+url_encode([]) ->
+    [].
+
+
+replicate(DbNameA, DbNameB) ->
+    replicate(DbNameA, DbNameB, []).
+
+replicate(Source, Target, Options) ->
+    {ok, DbSrc} = open_db(Source),
+    {ok, DbTgt} = open_db(Target),
+    {ok, HostName} = inet:gethostname(),
+
+    RepRecKey = ?LOCAL_DOC_PREFIX ++ HostName ++ ":" ++ Source ++ ":" ++ Target,
+    StartTime = httpd_util:rfc1123_date(),
+    RepRecSrc =
+    case open_doc(DbSrc, RepRecKey, []) of
+    {ok, SrcDoc} -> SrcDoc;
+    _ -> #doc{id=RepRecKey}
+    end,
+
+    RepRecTgt =
+    case open_doc(DbTgt, RepRecKey, []) of
+    {ok, TgtDoc} -> TgtDoc;
+    _ -> #doc{id=RepRecKey}
+    end,
+
+    #doc{body={obj,OldRepHistoryProps}} = RepRecSrc,
+    #doc{body={obj,OldRepHistoryPropsTrg}} = RepRecTgt,
+
+    SeqNum0 =
+    case OldRepHistoryProps == OldRepHistoryPropsTrg of
+    true ->
+        % if the records are identical, then we have a valid replication history
+        proplists:get_value("source_last_seq", OldRepHistoryProps, 0);
+    false ->
+        0
+    end,
+
+    SeqNum =
+    case proplists:get_value(full, Options, false)
+        orelse proplists:get_value("full", Options, false) of
+    true -> 0;
+    false -> SeqNum0
+    end,
+
+    {NewSeqNum, Stats} = pull_rep(DbTgt, DbSrc, SeqNum, #stats{}),
+    case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of
+    true ->
+        % nothing changed, don't record results
+        {ok, {obj, OldRepHistoryProps}};
+    false ->
+        HistEntries =[
+            {obj,
+                [{"start_time", StartTime},
+                {"end_time", httpd_util:rfc1123_date()},
+                {"start_last_seq", SeqNum},
+                {"end_last_seq", NewSeqNum},
+                {"docs_read", Stats#stats.docs_read},
+                {"read_errors", Stats#stats.read_errors},
+                {"docs_copied", Stats#stats.docs_copied},
+                {"copy_errors", Stats#stats.copy_errors}]}
+            | tuple_to_list(proplists:get_value("history", OldRepHistoryProps, {}))],
+        % something changed, record results
+        NewRepHistory =
+            {obj,
+                [{"session_id", couch_util:new_uuid()},
+                {"source_last_seq", NewSeqNum},
+                {"history", list_to_tuple(lists:sublist(HistEntries, 50))}]},
+
+        {ok, _} = update_doc(DbSrc, RepRecSrc#doc{body=NewRepHistory}, []),
+        {ok, _} = update_doc(DbTgt, RepRecTgt#doc{body=NewRepHistory}, []),
+        {ok, NewRepHistory}
+    end.
+
+pull_rep(DbTarget, DbSource, SourceSeqNum, Stats) ->
+    {ok, NewSeq} =
+    enum_docs_since(DbSource, SourceSeqNum,
+        fun(#doc_info{update_seq=Seq}=SrcDocInfo, _, {_, AccStats}) ->
+            Stats2 = maybe_save_docs(DbTarget, DbSource, SrcDocInfo, AccStats),
+            {ok, {Seq, Stats2}}
+        end, {SourceSeqNum, Stats}),
+    NewSeq.
+
+
+maybe_save_docs(DbTarget, DbSource,
+        #doc_info{id=Id, rev=Rev, conflict_revs=Conflicts, deleted_conflict_revs=DelConflicts},
+        Stats) ->
+    SrcRevs = [Rev | Conflicts] ++ DelConflicts,
+    {ok, [{Id, MissingRevs}]} = get_missing_revs(DbTarget, [{Id, SrcRevs}]),
+
+    case MissingRevs of
+    [] ->
+        Stats;
+    _Else ->
+        % the 'ok' below validates no unrecoverable errors (like network failure, etc).
+        {ok, DocResults} = open_doc_revs(DbSource, Id, MissingRevs, [latest]),
+
+        Docs = [RevDoc || {ok, RevDoc} <- DocResults], % only match successful loads
+
+        Stats2 = Stats#stats{
+            docs_read=Stats#stats.docs_read + length(Docs),
+            read_errors=Stats#stats.read_errors + length(DocResults) - length(Docs)},
+
+        case Docs of
+        [] ->
+            Stats2;
+        _ ->
+            % the 'ok' below validates no unrecoverable errors (like network failure, etc).
+            ok = save_docs(DbTarget, Docs, []),
+            Stats2#stats{docs_copied=Stats2#stats.docs_copied+length(Docs)}
+        end
+    end.
+
+
+do_http_request(Url, Action) ->
+    do_http_request(Url, Action, []).
+
+do_http_request(Url, Action, JsonBody) ->
+    couch_log:debug("couch_rep HTTP client request:"),
+    couch_log:debug("\tAction: ~p", [Action]),
+    couch_log:debug("\tUrl: ~p", [Url]),
+    Request =
+    case JsonBody of
+    [] ->
+        {Url, []};
+    _ ->
+        {Url, [], "application/json; charset=utf-8", lists:flatten(cjson:encode(JsonBody))}
+    end,
+    {ok, {{_, ResponseCode,_},_Headers, ResponseBody}} = http:request(Action, Request, [], []),
+    if
+    ResponseCode >= 200, ResponseCode < 500 ->
+        cjson:decode(ResponseBody)
+    end.
+
+enum_docs0(_InFun, [], Acc) ->
+    Acc;
+enum_docs0(InFun, [DocInfo | Rest], Acc) ->
+    case InFun(DocInfo, 0, Acc) of
+    {ok, Acc2} -> enum_docs0(InFun, Rest, Acc2);
+    {stop, Acc2} -> Acc2
+    end.
+
+open_db("http" ++ DbName)->
+    case lists:last(DbName) of
+    $/ ->
+        {ok, "http" ++ DbName};
+    _ ->
+        {ok, "http" ++ DbName ++ "/"}
+    end;
+open_db(DbName)->
+    couch_server:open(DbName).
+
+
+enum_docs_since(DbUrl, StartSeq, InFun, InAcc) when is_list(DbUrl) ->
+    Url = DbUrl ++ "_all_docs_by_seq?startkey=" ++ integer_to_list(StartSeq),
+    {obj, Results} = do_http_request(Url, get),
+    DocInfoList=
+    lists:map(fun({obj, RowInfoList}) ->
+        {obj, RowValueProps} = proplists:get_value("value", RowInfoList),
+        #doc_info{
+            id=proplists:get_value("id", RowInfoList),
+            rev=proplists:get_value("rev", RowValueProps),
+            update_seq = proplists:get_value("key", RowInfoList),
+            conflict_revs =
+                tuple_to_list(proplists:get_value("conflicts", RowValueProps, {})),
+            deleted_conflict_revs =
+                tuple_to_list(proplists:get_value("deleted_conflicts", RowValueProps, {})),
+            deleted = proplists:get_value("deleted", RowValueProps, false)}
+        end, tuple_to_list(proplists:get_value("rows", Results))),
+    {ok, enum_docs0(InFun, DocInfoList, InAcc)};
+enum_docs_since(DbSource, StartSeq, Fun, Acc) ->
+    couch_db:enum_docs_since(DbSource, StartSeq, Fun, Acc).
+
+get_missing_revs(DbUrl, DocIdRevsList) when is_list(DbUrl) ->
+    JsonDocIdRevsList = {obj,
+        [{Id, list_to_tuple(RevList)} || {Id, RevList} <- DocIdRevsList]},
+    {obj, ResponseMembers} =
+    do_http_request(DbUrl ++ "_missing_revs",
+        post, JsonDocIdRevsList),
+    {obj, DocMissingRevsList} = proplists:get_value("missing_revs", ResponseMembers),
+    {ok, [{Id, tuple_to_list(MissingRevs)} || {Id, MissingRevs} <- DocMissingRevsList]};
+get_missing_revs(Db, DocId) ->
+    couch_db:get_missing_revs(Db, DocId).
+
+
+update_doc(DbUrl, #doc{id=DocId}=Doc, _Options) when is_list(DbUrl) ->
+    Url = DbUrl ++ url_encode(DocId),
+    {obj, ResponseMembers} =
+        do_http_request(Url, put, couch_doc:to_json_obj(Doc, [revs,attachments])),
+    RevId = proplists:get_value("_rev", ResponseMembers),
+    {ok, RevId};
+update_doc(Db, Doc, Options) ->
+    couch_db:update_doc(Db, Doc, Options).
+
+save_docs(_, [], _) ->
+    ok;
+save_docs(DbUrl, Docs, []) when is_list(DbUrl) ->
+    JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
+    {obj, Returned} =
+        do_http_request(DbUrl ++ "_bulk_docs", post, {obj, [{new_edits, false}, {docs, list_to_tuple(JsonDocs)}]}),
+    true = proplists:get_value("ok", Returned),
+    ok;
+save_docs(Db, Docs, Options) ->
+    couch_db:save_docs(Db, Docs, Options).
+
+
+open_doc(DbUrl, DocId, []) when is_list(DbUrl) ->
+    case do_http_request(DbUrl ++ url_encode(DocId), get) of
+    {obj, [{"error", ErrId}, {"reason", Reason}]} ->
+        {list_to_atom(ErrId), Reason};
+    Doc  ->
+        {ok, couch_doc:from_json_obj(Doc)}
+    end;
+open_doc(Db, DocId, Options) when not is_list(Db) ->
+    couch_db:open_doc(Db, DocId, Options).
+
+
+open_doc_revs(DbUrl, DocId, Revs, Options) when is_list(DbUrl) ->
+    QueryOptionStrs =
+    lists:map(fun(latest) ->
+            % latest is only option right now
+            "latest=true"
+        end, Options),
+    RevsQueryStrs = lists:flatten(cjson:encode(list_to_tuple(Revs))),
+    Url = DbUrl ++ DocId ++ "?" ++ couch_util:implode(["revs=true", "attachments=true", "open_revs=" ++ RevsQueryStrs ] ++ QueryOptionStrs, "&"),
+    JsonResults = do_http_request(Url, get, []),
+    Results =
+    lists:map(
+        fun({obj, [{"missing", Rev}]}) ->
+            {{not_found, missing}, Rev};
+        ({obj, [{"ok", JsonDoc}]}) ->
+            {ok, couch_doc:from_json_obj(JsonDoc)}
+        end, tuple_to_list(JsonResults)),
+    {ok, Results};
+open_doc_revs(Db, DocId, Revs, Options) ->
+    couch_db:open_doc_revs(Db, DocId, Revs, Options).
+
+
+
+
+
+test() ->
+    couch_server:start(),
+    %{ok, LocalA} = couch_server:open("replica_a"),
+    {ok, LocalA} = couch_server:create("replica_a", [overwrite]),
+    {ok, _} = couch_server:create("replica_b", [overwrite]),
+    %DbA = "replica_a",
+    DbA = "http://localhost:5984/replica_a/",
+    %DbB = "replica_b",
+    DbB = "http://localhost:5984/replica_b/",
+    _DocUnids = test_write_docs(10, LocalA, []),
+    replicate(DbA, DbB),
+    %{ok, _Rev} = couch_db:delete_doc(LocalA, lists:nth(1, DocUnids), any),
+    % replicate(DbA, DbB),
+    ok.
+
+test_write_docs(0, _Db, Output) ->
+    lists:reverse(Output);
+test_write_docs(N, Db, Output) ->
+    Doc = #doc{
+        id=integer_to_list(N),
+        body={obj, [{"foo", integer_to_list(N)}, {"num", N}, {"bar", "blah"}]}},
+    couch_db:save_doc(Db, Doc, []),
+    test_write_docs(N-1, Db, [integer_to_list(N) | Output]).

Added: incubator/couchdb/trunk/src/couchdb/couch_server.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_server.erl?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_server.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_server.erl Fri Mar 28 16:32:19 2008
@@ -0,0 +1,215 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License.  You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_server).
+-behaviour(gen_server).
+-behaviour(application).
+
+-export([start/0,start/1,start/2,stop/0,stop/1]).
+-export([open/1,create/2,delete/1,all_databases/0,get_version/0]).
+-export([init/1, handle_call/3,sup_start_link/2]).
+-export([handle_cast/2,code_change/3,handle_info/2,terminate/2]).
+-export([dev_start/0,remote_restart/0]).
+
+-include("couch_db.hrl").
+
+-record(server,{
+    root_dir = [],
+    dbname_regexp,
+    options=[]
+    }).
+
+start() ->
+    start("").
+
+start(IniFile) when is_atom(IniFile) ->
+    couch_server_sup:start_link(atom_to_list(IniFile) ++ ".ini");
+start(IniNum) when is_integer(IniNum) ->
+    couch_server_sup:start_link("couch" ++ integer_to_list(IniNum) ++ ".ini");
+start(IniFile) ->
+    couch_server_sup:start_link(IniFile).
+
+start(_Type, _Args) ->
+    start().
+
+stop() ->
+    couch_server_sup:stop().
+
+stop(_Reason) ->
+    stop().
+
+dev_start() ->
+    stop(),
+    up_to_date = make:all([load, debug_info]),
+    start().
+
+get_version() ->
+    Apps = application:loaded_applications(),
+    case lists:keysearch(couch, 1, Apps) of
+    {value, {_, _, Vsn}} ->
+        Vsn;
+    false ->
+        "0.0.0"
+    end.
+
+sup_start_link(RootDir, Options) ->
+    gen_server:start_link({local, couch_server}, couch_server, {RootDir, Options}, []).
+
+open(Filename) ->
+    gen_server:call(couch_server, {open, Filename}).
+
+create(Filename, Options) ->
+    gen_server:call(couch_server, {create, Filename, Options}).
+
+delete(Filename) ->
+    gen_server:call(couch_server, {delete, Filename}).
+
+remote_restart() ->
+    gen_server:call(couch_server, remote_restart).
+
+init({RootDir, Options}) ->
+    {ok, RegExp} = regexp:parse("^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*$"),
+    {ok, #server{root_dir=RootDir, dbname_regexp=RegExp, options=Options}}.
+
+check_filename(#server{dbname_regexp=RegExp}, Filename) ->
+    case regexp:match(Filename, RegExp) of
+    nomatch ->
+        {error, illegal_database_name};
+    _Match ->
+        ok
+    end.
+
+get_full_filename(Server, Filename) ->
+    filename:join([Server#server.root_dir, "./" ++ Filename ++ ".couch"]).
+
+
+terminate(_Reason, _Server) ->
+    ok.
+
+all_databases() ->
+    {ok, Root} = gen_server:call(couch_server, get_root),
+    Filenames =
+    filelib:fold_files(Root, "^[a-z0-9\\_\\$()\\+\\-]*[\\.]couch$", true,
+        fun(Filename, AccIn) ->
+            case Filename -- Root of
+            [$/ | RelativeFilename] -> ok;
+            RelativeFilename -> ok
+            end,
+            [filename:rootname(RelativeFilename, ".couch") | AccIn]
+        end, []),
+    {ok, Filenames}.
+
+
+handle_call(get_root, _From, #server{root_dir=Root}=Server) ->
+    {reply, {ok, Root}, Server};
+handle_call({open, Filename}, From, Server) ->
+    case check_filename(Server, Filename) of
+    {error, Error} ->
+        {reply, {error, Error}, Server};
+    ok ->
+        Filepath = get_full_filename(Server, Filename),
+        Result = supervisor:start_child(couch_server_sup,
+            {Filename,
+                {couch_db, open, [Filename, Filepath]},
+                transient ,
+                infinity,
+                supervisor,
+                [couch_db]}),
+        case Result of
+        {ok, Db} ->
+            {reply, {ok, Db}, Server};
+        {error, already_present} ->
+            ok = supervisor:delete_child(couch_server_sup, Filename),
+            % call self recursively
+            handle_call({open, Filename}, From, Server);
+        {error, {already_started, Db}} ->
+            {reply, {ok, Db}, Server};
+        {error, {not_found, _}} ->
+            {reply, not_found, Server};
+        {error, {Error, _}} ->
+            {reply, {error, Error}, Server}
+        end
+    end;
+handle_call({create, Filename, Options}, _From, Server) ->
+    case check_filename(Server, Filename) of
+    {error, Error} ->
+        {reply, {error, Error}, Server};
+    ok ->
+        Filepath = get_full_filename(Server, Filename),
+        ChildSpec = {Filename,
+                {couch_db, create, [Filename, Filepath, Options]},
+                transient,
+                infinity,
+                supervisor,
+                [couch_db]},
+        Result =
+        case supervisor:delete_child(couch_server_sup, Filename) of
+        ok ->
+            sup_start_child(couch_server_sup, ChildSpec);
+        {error, not_found} ->
+            sup_start_child(couch_server_sup, ChildSpec);
+        {error, running} ->
+            % a server process for this database already started. Maybe kill it
+            case lists:member(overwrite, Options) of
+            true ->
+                supervisor:terminate_child(couch_server_sup, Filename),
+                ok = supervisor:delete_child(couch_server_sup, Filename),
+                sup_start_child(couch_server_sup, ChildSpec);
+            false ->
+                {error, database_already_exists}
+            end
+        end,
+        case Result of
+        {ok, _Db} -> couch_db_update_notifier:notify({created, Filename});
+        _ -> ok
+        end,
+        {reply, Result, Server}
+    end;
+handle_call({delete, Filename}, _From, Server) ->
+    FullFilepath = get_full_filename(Server, Filename),
+    supervisor:terminate_child(couch_server_sup, Filename),
+    supervisor:delete_child(couch_server_sup, Filename),
+    case file:delete(FullFilepath) of
+    ok ->
+        couch_db_update_notifier:notify({deleted, Filename}),
+        {reply, ok, Server};
+    {error, enoent} ->
+        {reply, not_found, Server};
+    Else ->
+        {reply, Else, Server}
+    end;
+handle_call(remote_restart, _From, #server{options=Options}=Server) ->
+    case proplists:get_value(remote_restart, Options) of
+    true ->
+        exit(self(), restart);
+    _ ->
+        ok
+    end,
+    {reply, ok, Server}.
+
+% this function is just to strip out the child spec error stuff if hit
+sup_start_child(couch_server_sup, ChildSpec) ->
+    case supervisor:start_child(couch_server_sup, ChildSpec) of
+    {error, {Error, _ChildInfo}} ->
+        {error, Error};
+    Else ->
+        Else
+    end.
+
+handle_cast(_Msg, State) ->
+    {noreply,State}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.

Added: incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl Fri Mar 28 16:32:19 2008
@@ -0,0 +1,185 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License.  You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_server_sup).
+-behaviour(supervisor).
+
+-define(DEFAULT_INI, "couch.ini").
+
+-export([start_link/1,stop/0]).
+
+%% supervisor callbacks
+-export([init/1]).
+
+start_link(IniFilename) ->
+    case whereis(couch_server_sup) of
+    undefined ->
+        start_server(IniFilename);
+    _Else ->
+        {error, already_started}
+    end.
+
+start_server("") ->
+        % no ini file specified, check the command line args
+    IniFile =
+    case init:get_argument(couchini) of
+    {ok, [CmdLineIniFilename]} ->
+        CmdLineIniFilename;
+    _Else ->
+        ?DEFAULT_INI
+    end,
+    start_server(IniFile);
+start_server(InputIniFilename) ->
+
+    case init:get_argument(pidfile) of
+    {ok, [PidFile]} ->
+        case file:write_file(PidFile, os:getpid()) of
+        ok -> ok;
+        Error -> io:format("Failed to write PID file ~s, error: ~p", [PidFile, Error])
+        end;
+    _ -> ok
+    end,
+
+    {ok, Cwd} = file:get_cwd(),
+    IniFilename = couch_util:abs_pathname(InputIniFilename),
+    IniBin =
+    case file:read_file(IniFilename) of
+    {ok, IniBin0} ->
+        IniBin0;
+    {error, enoent} ->
+        Msg = io_lib:format("Couldn't find server configuration file ~s.", [InputIniFilename]),
+        io:format("~s~n", [Msg]),
+        throw({startup_error, Msg})
+    end,
+    {ok, Ini} = couch_util:parse_ini(binary_to_list(IniBin)),
+
+    ConsoleStartupMsg = proplists:get_value({"Couch", "ConsoleStartupMsg"}, Ini, "Apache CouchDB is starting."),
+    LogLevel = list_to_atom(proplists:get_value({"Couch", "LogLevel"}, Ini, "error")),
+    DbRootDir = proplists:get_value({"Couch", "DbRootDir"}, Ini, "."),
+    HttpConfigFile = proplists:get_value({"Couch", "HttpConfigFile"}, Ini, "couch_httpd.conf"),
+    LogFile = proplists:get_value({"Couch", "LogFile"}, Ini, "couchdb.log"),
+    UtilDriverDir = proplists:get_value({"Couch", "UtilDriverDir"}, Ini, ""),
+    UpdateNotifierExes = proplists:get_all_values({"Couch", "DbUpdateNotificationProcess"}, Ini),
+    FtSearchQueryServer = proplists:get_value({"Couch", "FullTextSearchQueryServer"}, Ini, ""),
+    RemoteRestart = list_to_atom(proplists:get_value({"Couch", "AllowRemoteRestart"}, Ini, "undefined")),
+    ServerOptions = [{remote_restart, RemoteRestart}],
+    QueryServers = [{Lang, QueryExe} || {{"Couch Query Servers", Lang}, QueryExe} <- Ini],
+    
+    ChildProcesses =
+        [{couch_log,
+            {couch_log, start_link, [LogFile, LogLevel]},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_server]},
+        {couch_db_update_event,
+            {gen_event, start_link, [{local, couch_db_update}]},
+            permanent,
+            1000,
+            supervisor,
+            dynamic},
+        {couch_server,
+            {couch_server, sup_start_link, [DbRootDir, ServerOptions]},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_server]},
+        {couch_util,
+            {couch_util, start_link, [UtilDriverDir]},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_util]},
+        {couch_query_servers,
+            {couch_query_servers, start_link, [QueryServers]},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_query_servers]},
+        {couch_view,
+            {couch_view, start_link, [DbRootDir]},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_view]},
+        {httpd,
+            {httpd, start_link, [HttpConfigFile]},
+            permanent,
+            1000,
+            supervisor,
+            [httpd]}
+        ] ++
+        lists:map(fun(UpdateNotifierExe) ->
+            {UpdateNotifierExe,
+                {couch_db_update_notifier, start_link, [UpdateNotifierExe]},
+                permanent,
+                1000,
+                supervisor,
+                [couch_db_update_notifier]}
+            end, UpdateNotifierExes)
+        ++
+        case FtSearchQueryServer of
+        "" ->
+            [];
+        _ ->
+            [{couch_ft_query,
+                {couch_ft_query, start_link, [FtSearchQueryServer]},
+                permanent,
+                1000,
+                supervisor,
+                [httpd]}]
+        end,
+
+    io:format("couch ~s (LogLevel=~s)~n", [couch_server:get_version(), LogLevel]),
+    io:format("~s~n", [ConsoleStartupMsg]),
+
+    process_flag(trap_exit, true),
+    StartResult = (catch supervisor:start_link(
+        {local, couch_server_sup}, couch_server_sup, ChildProcesses)),
+
+    ConfigInfo = io_lib:format("Config Info ~s:~n\tCurrentWorkingDir=~s~n" ++
+        "\tDbRootDir=~s~n" ++
+        "\tHttpConfigFile=~s~n" ++
+        "\tLogFile=~s~n" ++
+        "\tUtilDriverDir=~s~n" ++
+        "\tDbUpdateNotificationProcesses=~s~n" ++
+        "\tFullTextSearchQueryServer=~s~n" ++
+        "~s",
+            [IniFilename,
+            Cwd,
+            DbRootDir,
+            HttpConfigFile,
+            LogFile,
+            UtilDriverDir,
+            UpdateNotifierExes,
+            FtSearchQueryServer,
+            [lists:flatten(io_lib:format("\t~s=~s~n", [Lang, QueryExe])) || {Lang, QueryExe} <- QueryServers]]),
+    couch_log:debug("~s", [ConfigInfo]),
+
+    case StartResult of
+    {ok,_} ->
+        % only output when startup was successful
+        io:format("Apache CouchDB has started. Time to relax.~n");
+    _ ->
+        % Since we failed startup, unconditionally dump configuration data to console
+        io:format("~s", [ConfigInfo]),
+        ok
+    end,
+    process_flag(trap_exit, false),
+    StartResult.
+
+stop() ->
+    catch exit(whereis(couch_server_sup), normal),
+    couch_log:stop().
+
+init(ChildProcesses) ->
+    {ok, {{one_for_one, 10, 3600}, ChildProcesses}}.

Added: incubator/couchdb/trunk/src/couchdb/couch_stream.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_stream.erl?rev=642432&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_stream.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_stream.erl Fri Mar 28 16:32:19 2008
@@ -0,0 +1,252 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License.  You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_stream).
+-behaviour(gen_server).
+
+-export([test/1]).
+-export([open/1, open/2, close/1, read/3, read_term/2, write/2, write_term/2, get_state/1, foldl/5]).
+-export([copy/4]).
+-export([ensure_buffer/2, set_min_buffer/2]).
+-export([init/1, terminate/2, handle_call/3]).
+-export([handle_cast/2,code_change/3,handle_info/2]).
+
+-include("couch_db.hrl").
+
+-define(FILE_POINTER_BYTES, 8).
+-define(FILE_POINTER_BITS, 8*(?FILE_POINTER_BYTES)).
+
+-define(STREAM_OFFSET_BYTES, 4).
+-define(STREAM_OFFSET_BITS, 8*(?STREAM_OFFSET_BYTES)).
+
+-define(HUGE_CHUNK, 1000000000). % Huge chuck size when reading all in one go
+
+-define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data
+
+
+-record(write_stream,
+    {fd = 0,
+    current_pos = 0,
+    bytes_remaining = 0,
+    next_alloc = 0,
+    min_alloc = 16#00010000
+    }).
+
+-record(stream,
+    {
+    pid,
+    fd
+    }).
+
+
+%%% Interface functions %%%
+
+open(Fd) ->
+    open(nil, Fd).
+
+open(nil, Fd) ->
+    open({0,0}, Fd);
+open(State, Fd) ->
+    {ok, Pid} = gen_server:start_link(couch_stream, {State, Fd}, []),
+    {ok, #stream{pid = Pid, fd = Fd}}.
+
+close(#stream{pid = Pid, fd = _Fd}) ->
+    gen_server:call(Pid, close).
+
+get_state(#stream{pid = Pid, fd = _Fd}) ->
+    gen_server:call(Pid, get_state).
+
+ensure_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) ->
+    gen_server:call(Pid, {ensure_buffer, Bytes}).
+
+set_min_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) ->
+    gen_server:call(Pid, {set_min_buffer, Bytes}).
+
+read(#stream{pid = _Pid, fd = Fd}, Sp, Num) ->
+    read(Fd, Sp, Num);
+read(Fd, Sp, Num) ->
+    {ok, RevBin, Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> {ok, [Bin | Acc]} end, []),
+    Bin = list_to_binary(lists:reverse(RevBin)),
+    {ok, Bin, Sp2}.
+
+copy(#stream{pid = _Pid, fd = Fd}, Sp, Num, DestStream) ->
+    copy(Fd, Sp, Num, DestStream);
+copy(Fd, Sp, Num, DestStream) ->
+    {ok, NewSp, _Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK,
+        fun(Bin, AccPointer) ->
+            {ok, NewPointer} = write(Bin, DestStream),
+            if AccPointer == null -> NewPointer; true -> AccPointer end
+        end,
+        null),
+    {ok, NewSp}.
+
+foldl(#stream{pid = _Pid, fd = Fd}, Sp, Num, Fun, Acc) ->
+    foldl(Fd, Sp, Num, Fun, Acc);
+foldl(Fd, Sp, Num, Fun, Acc) ->
+    {ok, _Acc, _Sp} = stream_data(Fd, Sp, Num, ?DEFAULT_STREAM_CHUNK, Fun, Acc).
+
+read_term(#stream{pid = _Pid, fd = Fd}, Sp) ->
+    read_term(Fd, Sp);
+read_term(Fd, Sp) ->
+    {ok, <<TermLen:(?STREAM_OFFSET_BITS)>>, Sp2}
+        = read(Fd, Sp, ?STREAM_OFFSET_BYTES),
+    {ok, Bin, _Sp3} = read(Fd, Sp2, TermLen),
+    {ok, binary_to_term(Bin)}.
+
+write_term(Stream, Term) ->
+    Bin = term_to_binary(Term),
+    Size = size(Bin),
+    Bin2 = <<Size:(?STREAM_OFFSET_BITS), Bin/binary>>,
+    write(Stream, Bin2).
+
+write(#stream{}, <<>>) ->
+    {ok, {0,0}};
+write(#stream{pid = Pid}, Bin) when is_binary(Bin) ->
+    gen_server:call(Pid, {write, Bin}).
+
+
+init({{Pos, BytesRemaining}, Fd}) ->
+    {ok, #write_stream
+        {fd = Fd,
+        current_pos = Pos,
+        bytes_remaining = BytesRemaining
+        }}.
+
+terminate(_Reason, _Stream) ->
+    ok.
+
+handle_call(get_state, _From, Stream) ->
+    #write_stream{current_pos = Pos, bytes_remaining = BytesRemaining} = Stream,
+    {reply, {Pos, BytesRemaining}, Stream};
+handle_call({set_min_buffer, MinBuffer}, _From, Stream) ->
+    {reply, ok, Stream#write_stream{min_alloc = MinBuffer}};
+handle_call({ensure_buffer, BufferSizeRequested}, _From, Stream) ->
+    #write_stream{bytes_remaining = BytesRemainingInCurrentBuffer} = Stream,
+    case BytesRemainingInCurrentBuffer < BufferSizeRequested of
+        true ->  NextAlloc = BufferSizeRequested - BytesRemainingInCurrentBuffer;
+        false -> NextAlloc = 0 % enough room in current segment
+    end,
+    {reply, ok, Stream#write_stream{next_alloc = NextAlloc}};
+handle_call({write, Bin}, _From, Stream) ->
+    % ensure init is called first so we can get a pointer to the begining of the binary
+    {ok, Sp, Stream2} = write_data(Stream, Bin),
+    {reply, {ok, Sp}, Stream2};
+handle_call(close, _From, Stream) ->
+    #write_stream{current_pos=Pos, bytes_remaining = BytesRemaining} = Stream,
+    {stop, normal, {ok, {Pos, BytesRemaining}}, Stream}.
+
+handle_cast(_Msg, State) ->
+    {noreply,State}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+%%% Internal function %%%
+
+stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) ->
+    {ok, Acc, Sp};
+stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) ->
+    {ok, <<NextPos:(?FILE_POINTER_BITS), NextOffset:(?STREAM_OFFSET_BITS)>>}
+        = couch_file:pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
+    Sp = {NextPos, NextOffset},
+    % Check NextPos is past current Pos (this is always true in a stream)
+    % Guards against potential infinite loops caused by corruption.
+    case NextPos > Pos of
+        true -> ok;
+        false -> throw({error, stream_corruption})
+    end,
+    stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc);
+stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) ->
+    ReadAmount = lists:min([MaxChunk, Num, Offset]),
+    {ok, Bin} = couch_file:pread(Fd, Pos, ReadAmount),
+    Sp = {Pos + ReadAmount, Offset - ReadAmount},
+    case Fun(Bin, Acc) of
+    {ok, Acc2} ->
+        stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Acc2);
+    {stop, Acc2} ->
+        {ok, Acc2, Sp}
+    end.
+
+write_data(Stream, <<>>) ->
+    {ok, {0,0}, Stream};
+write_data(#write_stream{bytes_remaining=0} = Stream, Bin) ->
+    #write_stream {
+        fd = Fd,
+        current_pos = CurrentPos,
+        next_alloc = NextAlloc,
+        min_alloc = MinAlloc
+        }= Stream,
+
+    NewSize = lists:max([MinAlloc, NextAlloc, size(Bin)]),
+    % no space in the current segment, must alloc a new segment
+    {ok, NewPos} = couch_file:expand(Fd, NewSize + ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
+
+    case CurrentPos of
+    0 ->
+        ok;
+    _ ->
+        ok = couch_file:pwrite(Fd, CurrentPos, <<NewPos:(?FILE_POINTER_BITS), NewSize:(?STREAM_OFFSET_BITS)>>)
+    end,
+    Stream2 = Stream#write_stream{
+        current_pos=NewPos,
+        bytes_remaining=NewSize,
+        next_alloc=0},
+    write_data(Stream2, Bin);
+write_data(#write_stream{fd=Fd, current_pos=Pos, bytes_remaining=BytesRemaining} = Stream, Bin) ->
+    BytesToWrite = lists:min([size(Bin), BytesRemaining]),
+    {WriteBin, Rest} = split_binary(Bin, BytesToWrite),
+    ok = couch_file:pwrite(Fd, Pos, WriteBin),
+    Stream2 = Stream#write_stream{
+        bytes_remaining=BytesRemaining - BytesToWrite,
+        current_pos=Pos + BytesToWrite
+        },
+    {ok, _, Stream3} = write_data(Stream2, Rest),
+    {ok, {Pos, BytesRemaining}, Stream3}.
+
+
+
+%%% Tests %%%
+
+
+test(Term) ->
+    {ok, Fd} = couch_file:open("foo", [write]),
+    {ok, Stream} = open({0,0}, Fd),
+    {ok, Pos} = write_term(Stream, Term),
+    {ok, Pos2} = write_term(Stream, {Term, Term}),
+    close(Stream),
+    couch_file:close(Fd),
+    {ok, Fd2} = couch_file:open("foo", [read, write]),
+    {ok, Stream2} = open({0,0}, Fd2),
+    {ok, Term1} = read_term(Fd2, Pos),
+    io:format("Term1: ~w ~n",[Term1]),
+    {ok, Term2} = read_term(Fd2, Pos2),
+    io:format("Term2: ~w ~n",[Term2]),
+    {ok, PointerList} = deep_write_test(Stream2, Term, 1000, []),
+    deep_read_test(Fd2, PointerList),
+    close(Stream2),
+    couch_file:close(Fd2).
+
+deep_read_test(_Fd, []) ->
+    ok;
+deep_read_test(Fd, [Pointer | RestPointerList]) ->
+    {ok, _Term} = read_term(Fd, Pointer),
+    deep_read_test(Fd, RestPointerList).
+
+deep_write_test(_Stream, _Term, 0, PointerList) ->
+    {ok, PointerList};
+deep_write_test(Stream, Term, N, PointerList) ->
+    WriteList = lists:duplicate(random:uniform(N), Term),
+    {ok, Pointer} = write_term(Stream, WriteList),
+    deep_write_test(Stream, Term, N-1, [Pointer | PointerList]).



Mime
View raw message