summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2008-05-15 21:51:22 +0000
committerDamien F. Katz <damien@apache.org>2008-05-15 21:51:22 +0000
commita684f95cbcee7f2568a2ce04e7dc2bbb605a27b3 (patch)
tree3b2fafecde418e10834ec2e896efffb64b16a33a /src
parent41eeac05bff1b0a8b05f1110270c0beecd62991a (diff)
Incremental reduce first checkin. Warning! Disk format change.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@656861 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_btree.erl307
-rw-r--r--src/couchdb/couch_db.erl53
-rw-r--r--src/couchdb/couch_db.hrl1
-rw-r--r--src/couchdb/couch_httpd.erl111
-rw-r--r--src/couchdb/couch_key_tree.erl2
-rw-r--r--src/couchdb/couch_query_servers.erl111
-rw-r--r--src/couchdb/couch_server_sup.erl2
-rw-r--r--src/couchdb/couch_util.erl19
-rw-r--r--src/couchdb/couch_view.erl332
9 files changed, 647 insertions, 291 deletions
diff --git a/src/couchdb/couch_btree.erl b/src/couchdb/couch_btree.erl
index 5b7ac65a..6fd9b0c8 100644
--- a/src/couchdb/couch_btree.erl
+++ b/src/couchdb/couch_btree.erl
@@ -13,8 +13,8 @@
-module(couch_btree).
-export([open/2, open/3, query_modify/4, add/2, add_remove/3, foldl/3, foldl/4]).
--export([foldr/3, foldr/4, fold/4, fold/5, row_count/1]).
--export([lookup/2, get_state/1, test/1, test/0]).
+-export([foldr/3, foldr/4, fold/4, fold/5, reduce/3, partial_reduce/3, final_reduce/2]).
+-export([lookup/2, get_state/1, set_options/2, test/1, test/0]).
-define(CHUNK_THRESHOLD, 16#fff).
@@ -23,7 +23,8 @@
root,
extract_kv = fun({Key, Value}) -> {Key, Value} end,
assemble_kv = fun(Key, Value) -> {Key, Value} end,
- less = fun(A, B) -> A < B end
+ less = fun(A, B) -> A < B end,
+ reduce = nil
}).
extract(#btree{extract_kv=Extract}, Value) ->
@@ -46,7 +47,9 @@ set_options(Bt, [{split, Extract}|Rest]) ->
set_options(Bt, [{join, Assemble}|Rest]) ->
set_options(Bt#btree{assemble_kv=Assemble}, Rest);
set_options(Bt, [{less, Less}|Rest]) ->
- set_options(Bt#btree{less=Less}, Rest).
+ set_options(Bt#btree{less=Less}, Rest);
+set_options(Bt, [{reduce, Reduce}|Rest]) ->
+ set_options(Bt#btree{reduce=Reduce}, Rest).
open(State, Fd, Options) ->
{ok, set_options(#btree{root=State, fd=Fd}, Options)}.
@@ -54,10 +57,36 @@ open(State, Fd, Options) ->
get_state(#btree{root=Root}) ->
Root.
-row_count(#btree{root=nil}) ->
- 0;
-row_count(#btree{root={_RootPointer, Count}}) ->
- Count.
+final_reduce(#btree{reduce=Reduce}, Val) ->
+ final_reduce(Reduce, Val);
+final_reduce(Reduce, {[], []}) ->
+ Reduce(reduce, []);
+final_reduce(_Bt, {[], [Red]}) ->
+ Red;
+final_reduce(Reduce, {[], Reductions}) ->
+ Reduce(combine, Reductions);
+final_reduce(Reduce, {KVs, Reductions}) ->
+ Red = Reduce(reduce, KVs),
+ final_reduce(Reduce, {[], [Red | Reductions]}).
+
+reduce(Bt, Key1, Key2) ->
+ {ok, Reds} = partial_reduce(Bt, Key1, Key2),
+ {ok, final_reduce(Bt, Reds)}.
+
+partial_reduce(#btree{root=Root}=Bt, Key1, Key2) ->
+ {KeyStart, KeyEnd} =
+ case Key1 == nil orelse Key2 == nil orelse less(Bt, Key1, Key2) of
+ true -> {Key1, Key2};
+ false -> {Key2, Key1}
+ end,
+ case Root of
+ nil ->
+ {ok, {[], []}};
+ _ ->
+ {KVs, Nodes} = collect_node(Bt, Root, KeyStart, KeyEnd),
+ {ok, {KVs, [Red || {_K,{_P,Red}} <- Nodes]}}
+ end.
+
foldl(Bt, Fun, Acc) ->
fold(Bt, fwd, Fun, Acc).
@@ -73,16 +102,16 @@ foldr(Bt, Key, Fun, Acc) ->
% wraps a 2 arity function with the proper 3 arity function
convert_fun_arity(Fun) when is_function(Fun, 2) ->
- fun(KV, _Offset, AccIn) -> Fun(KV, AccIn) end;
+ fun(KV, _Reds, AccIn) -> Fun(KV, AccIn) end;
convert_fun_arity(Fun) when is_function(Fun, 3) ->
Fun. % Already arity 3
fold(Bt, Dir, Fun, Acc) ->
- {_ContinueFlag, Acc2} = stream_node(Bt, 0, Bt#btree.root, nil, Dir, convert_fun_arity(Fun), Acc),
+ {_ContinueFlag, Acc2} = stream_node(Bt, [], Bt#btree.root, nil, Dir, convert_fun_arity(Fun), Acc),
{ok, Acc2}.
fold(Bt, Key, Dir, Fun, Acc) ->
- {_ContinueFlag, Acc2} = stream_node(Bt, 0, Bt#btree.root, Key, Dir, convert_fun_arity(Fun), Acc),
+ {_ContinueFlag, Acc2} = stream_node(Bt, [], Bt#btree.root, Key, Dir, convert_fun_arity(Fun), Acc),
{ok, Acc2}.
add(Bt, InsertKeyValues) ->
@@ -136,7 +165,7 @@ lookup(#btree{root=Root, less=Less}=Bt, Keys) ->
lookup(_Bt, nil, Keys) ->
{ok, [{Key, not_found} || Key <- Keys]};
-lookup(Bt, {Pointer, _Count}, Keys) ->
+lookup(Bt, {Pointer, _Reds}, Keys) ->
{NodeType, NodeList} = get_node(Bt, Pointer),
case NodeType of
kp_node ->
@@ -229,7 +258,7 @@ modify_node(Bt, RootPointerInfo, Actions, QueryOutput) ->
nil ->
NodeType = kv_node,
NodeList = [];
- {Pointer, _count} ->
+ {Pointer, _Reds} ->
{NodeType, NodeList} = get_node(Bt, Pointer)
end,
case NodeType of
@@ -249,14 +278,12 @@ modify_node(Bt, RootPointerInfo, Actions, QueryOutput) ->
{ok, ResultList, QueryOutput2, Bt3}
end.
-
-count(kv_node, NodeList) ->
- length(NodeList);
-count(kp_node, NodeList) ->
- lists:foldl( fun({_Key, {_Pointer, Count}}, AccCount) ->
- Count + AccCount
- end,
- 0, NodeList).
+reduce_node(#btree{reduce=nil}, _NodeType, _NodeList) ->
+ [];
+reduce_node(#btree{reduce=R}, kp_node, NodeList) ->
+ R(combine, [Red || {_K, {_P, Red}} <- NodeList]);
+reduce_node(#btree{reduce=R}, kv_node, NodeList) ->
+ R(reduce, NodeList).
get_node(#btree{fd = Fd}, NodePos) ->
@@ -267,7 +294,7 @@ get_node(#btree{fd = Fd}, NodePos) ->
% Validating this prevents infinite loops should
% a disk corruption occur.
[throw({error, disk_corruption})
- || {_Key, {SubNodePos, _Count}}
+ || {_Key, {SubNodePos, _Reds}}
<- NodeList, SubNodePos >= NodePos];
kv_node ->
ok
@@ -282,7 +309,7 @@ write_node(Bt, NodeType, NodeList) ->
begin
{ok, Pointer} = couch_file:append_term(Bt#btree.fd, {NodeType, ANodeList}),
{LastKey, _} = lists:last(ANodeList),
- {LastKey, {Pointer, count(NodeType, ANodeList)}}
+ {LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList)}}
end
||
ANodeList <- NodeListList
@@ -362,93 +389,172 @@ modify_kvnode(Bt, [{Key, Value} | RestKVs], [{ActionType, ActionKey, ActionValue
end
end.
+
+collect_node(Bt, {P, R}, KeyStart, KeyEnd) ->
+ case get_node(Bt, P) of
+ {kp_node, NodeList} ->
+ collect_kp_node(Bt, NodeList, KeyStart, KeyEnd);
+ {kv_node, KVs} ->
+ GTEKeyStartKVs =
+ case KeyStart of
+ nil ->
+ KVs;
+ _ ->
+ lists:dropwhile(
+ fun({Key,_}) ->
+ less(Bt, Key, KeyStart)
+ end, KVs)
+ end,
+ KVs2 =
+ case KeyEnd of
+ nil ->
+ GTEKeyStartKVs;
+ _ ->
+ lists:dropwhile(
+ fun({Key,_}) ->
+ less(Bt, KeyEnd, Key)
+ end, lists:reverse(GTEKeyStartKVs))
+ end,
+ case length(KVs2) == length(KVs) of
+ true -> % got full node, return the already calculated reduction
+ {[], [{nil, {P, R}}]};
+ false -> % otherwise return the keyvalues for later reduction
+ {KVs2, []}
+ end
+ end.
+
+
+collect_kp_node(Bt, NodeList, KeyStart, KeyEnd) ->
+ Nodes =
+ case KeyStart of
+ nil ->
+ NodeList;
+ _ ->
+ lists:dropwhile(
+ fun({Key,_}) ->
+ less(Bt, Key, KeyStart)
+ end, NodeList)
+ end,
+
+ case KeyEnd of
+ nil ->
+ case Nodes of
+ [] ->
+ {[], []};
+ [{_, StartNodeInfo}|RestNodes] ->
+ {DownKVs, DownNodes} = collect_node(Bt, StartNodeInfo, KeyStart, KeyEnd),
+ {DownKVs, DownNodes ++ RestNodes}
+ end;
+ _ ->
+ {GTEKeyEndNodes, LTKeyEndNodes} = lists:splitwith(
+ fun({Key,_}) ->
+ not less(Bt, Key, KeyEnd)
+ end, lists:reverse(Nodes)),
+
+ {MatchingKVs, MatchingNodes} =
+ case lists:reverse(LTKeyEndNodes) of
+ [{_, StartNodeInfo}] ->
+ collect_node(Bt, StartNodeInfo, KeyStart, KeyEnd);
+ [{_, StartNodeInfo}|RestLTNodes] ->
+ % optimization, since we have more KP nodes in range, we don't need
+ % to provide the endkey when searching the start node, making
+ % collecting the node faster.
+ {DownKVs, DownNodes} = collect_node(Bt, StartNodeInfo, KeyStart, nil),
+ {DownKVs, DownNodes ++ RestLTNodes};
+ [] ->
+ {[], []}
+ end,
+
+ case lists:reverse(GTEKeyEndNodes) of
+ [{_, EndNodeInfo} | _] when LTKeyEndNodes == [] ->
+ collect_node(Bt, EndNodeInfo, KeyStart, KeyEnd);
+ [{_, EndNodeInfo} | _] ->
+ {KVs1, DownNodes1} = collect_node(Bt, EndNodeInfo, nil, KeyEnd),
+ {KVs1 ++ MatchingKVs, DownNodes1 ++ MatchingNodes};
+ [] ->
+ {MatchingKVs, MatchingNodes}
+ end
+ end.
+
+
adjust_dir(fwd, List) ->
List;
adjust_dir(rev, List) ->
lists:reverse(List).
-stream_node(Bt, Offset, PointerInfo, nil, Dir, Fun, Acc) ->
- stream_node(Bt, Offset, PointerInfo, Dir, Fun, Acc);
-stream_node(_Bt, _Offset, nil, _StartKey, _Dir, _Fun, Acc) ->
+stream_node(Bt, Reds, PointerInfo, nil, Dir, Fun, Acc) ->
+ stream_node(Bt, Reds, PointerInfo, Dir, Fun, Acc);
+stream_node(_Bt, _Reds, nil, _StartKey, _Dir, _Fun, Acc) ->
{ok, Acc};
-stream_node(Bt, Offset, {Pointer, _Count}, StartKey, Dir, Fun, Acc) ->
+stream_node(Bt, Reds, {Pointer, _Reds}, StartKey, Dir, Fun, Acc) ->
{NodeType, NodeList} = get_node(Bt, Pointer),
case NodeType of
kp_node ->
- stream_kp_node(Bt, Offset, adjust_dir(Dir, NodeList), StartKey, Dir, Fun, Acc);
+ stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, Dir, Fun, Acc);
kv_node ->
- stream_kv_node(Bt, Offset, adjust_dir(Dir, NodeList), StartKey, Dir, Fun, Acc)
+ stream_kv_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, Dir, Fun, Acc)
end.
-stream_node(_Bt, _Offset, nil, _Dir, _Fun, Acc) ->
+stream_node(_Bt, _Reds, nil, _Dir, _Fun, Acc) ->
{ok, Acc};
-stream_node(Bt, Offset, {Pointer, _Count}, Dir, Fun, Acc) ->
+stream_node(Bt, Reds, {Pointer, _Reds}, Dir, Fun, Acc) ->
{NodeType, NodeList} = get_node(Bt, Pointer),
case NodeType of
kp_node ->
- stream_kp_node(Bt, Offset, adjust_dir(Dir, NodeList), Dir, Fun, Acc);
+ stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), Dir, Fun, Acc);
kv_node ->
- stream_kv_node(Bt, Offset, adjust_dir(Dir, NodeList), Dir, Fun, Acc)
+ stream_kv_node2(Bt, Reds, [], adjust_dir(Dir, NodeList), Dir, Fun, Acc)
end.
-stream_kp_node(_Bt, _Offset, [], _Dir, _Fun, Acc) ->
+stream_kp_node(_Bt, _Reds, [], _Dir, _Fun, Acc) ->
{ok, Acc};
-stream_kp_node(Bt, Offset, [{_Key, {Pointer, Count}} | Rest], Dir, Fun, Acc) ->
- case stream_node(Bt, Offset, {Pointer, Count}, Dir, Fun, Acc) of
+stream_kp_node(Bt, Reds, [{_Key, {Pointer, Red}} | Rest], Dir, Fun, Acc) ->
+ case stream_node(Bt, Reds, {Pointer, Red}, Dir, Fun, Acc) of
{ok, Acc2} ->
- stream_kp_node(Bt, Offset + Count, Rest, Dir, Fun, Acc2);
+ stream_kp_node(Bt, [Red | Reds], Rest, Dir, Fun, Acc2);
{stop, Acc2} ->
{stop, Acc2}
end.
-drop_nodes(_Bt, Offset, _StartKey, []) ->
- {Offset, []};
-drop_nodes(Bt, Offset, StartKey, [{NodeKey, {Pointer, Count}} | RestKPs]) ->
+drop_nodes(_Bt, Reds, _StartKey, []) ->
+ {Reds, []};
+drop_nodes(Bt, Reds, StartKey, [{NodeKey, {Pointer, Red}} | RestKPs]) ->
case less(Bt, NodeKey, StartKey) of
- true -> drop_nodes(Bt, Offset + Count, StartKey, RestKPs);
- false -> {Offset, [{NodeKey, {Pointer, Count}} | RestKPs]}
+ true -> drop_nodes(Bt, [Red | Reds], StartKey, RestKPs);
+ false -> {Reds, [{NodeKey, {Pointer, Reds}} | RestKPs]}
end.
-stream_kp_node(Bt, Offset, KPs, StartKey, Dir, Fun, Acc) ->
- {NewOffset, NodesToStream} =
+stream_kp_node(Bt, Reds, KPs, StartKey, Dir, Fun, Acc) ->
+ {NewReds, NodesToStream} =
case Dir of
fwd ->
% drop all nodes sorting before the key
- drop_nodes(Bt, Offset, StartKey, KPs);
+ drop_nodes(Bt, Reds, StartKey, KPs);
rev ->
% keep all nodes sorting before the key, AND the first node to sort after
RevKPs = lists:reverse(KPs),
case lists:splitwith(fun({Key, _Pointer}) -> less(Bt, Key, StartKey) end, RevKPs) of
{_RevBefore, []} ->
% everything sorts before it
- {Offset, KPs};
+ {Reds, KPs};
{RevBefore, [FirstAfter | Drop]} ->
- {Offset + count(kp_node, Drop), [FirstAfter | lists:reverse(RevBefore)]}
+ {[Red || {_K,{_P,Red}} <- Drop] ++ Reds,
+ [FirstAfter | lists:reverse(RevBefore)]}
end
end,
case NodesToStream of
[] ->
{ok, Acc};
[{_Key, PointerInfo} | Rest] ->
- case stream_node(Bt, NewOffset, PointerInfo, StartKey, Dir, Fun, Acc) of
+ case stream_node(Bt, NewReds, PointerInfo, StartKey, Dir, Fun, Acc) of
{ok, Acc2} ->
- stream_kp_node(Bt, NewOffset, Rest, Dir, Fun, Acc2);
+ stream_kp_node(Bt, NewReds, Rest, Dir, Fun, Acc2);
{stop, Acc2} ->
{stop, Acc2}
end
end.
-stream_kv_node(_Bt, _Offset, [], _Dir, _Fun, Acc) ->
- {ok, Acc};
-stream_kv_node(Bt, Offset, [{K, V} | RestKVs], Dir, Fun, Acc) ->
- case Fun(assemble(Bt, K, V), Offset, Acc) of
- {ok, Acc2} ->
- stream_kv_node(Bt, Offset + 1, RestKVs, Dir, Fun, Acc2);
- {stop, Acc2} ->
- {stop, Acc2}
- end.
-
-stream_kv_node(Bt, Offset, KVs, StartKey, Dir, Fun, Acc) ->
+stream_kv_node(Bt, Reds, KVs, StartKey, Dir, Fun, Acc) ->
DropFun =
case Dir of
fwd ->
@@ -456,11 +562,36 @@ stream_kv_node(Bt, Offset, KVs, StartKey, Dir, Fun, Acc) ->
rev ->
fun({Key, _}) -> less(Bt, StartKey, Key) end
end,
- % drop all nodes preceding the key
- GTEKVs = lists:dropwhile(DropFun, KVs),
- LenSkipped = length(KVs) - length(GTEKVs),
- stream_kv_node(Bt, Offset + LenSkipped, GTEKVs, Dir, Fun, Acc).
+ {LTKVs, GTEKVs} = lists:splitwith(DropFun, KVs),
+ stream_kv_node2(Bt, Reds, LTKVs, GTEKVs, Dir, Fun, Acc).
+
+stream_kv_node2(_Bt, _Reds, _PrevKVs, [], _Dir, _Fun, Acc) ->
+ {ok, Acc};
+stream_kv_node2(Bt, Reds, PrevKVs, [{K,V} | RestKVs], Dir, Fun, Acc) ->
+ case Fun(assemble(Bt, K, V), {PrevKVs, Reds}, Acc) of
+ {ok, Acc2} ->
+ stream_kv_node2(Bt, Reds, [{K,V} | PrevKVs], RestKVs, Dir, Fun, Acc2);
+ {stop, Acc2} ->
+ {stop, Acc2}
+ end.
+shuffle(List) ->
+%% Determine the log n portion then randomize the list.
+ randomize(round(math:log(length(List)) + 0.5), List).
+
+randomize(1, List) ->
+ randomize(List);
+randomize(T, List) ->
+ lists:foldl(fun(_E, Acc) ->
+ randomize(Acc)
+ end, randomize(List), lists:seq(1, (T - 1))).
+
+randomize(List) ->
+ D = lists:map(fun(A) ->
+ {random:uniform(), A}
+ end, List),
+ {_, D1} = lists:unzip(lists:keysort(1, D)),
+ D1.
@@ -468,20 +599,58 @@ test()->
test(1000).
test(N) ->
- KeyValues = [{random:uniform(), random:uniform()} || _Seq <- lists:seq(1, N)],
- test_btree(KeyValues), % randomly distributed
- Sorted = lists:sort(KeyValues),
+ Sorted = [{Seq, random:uniform()} || Seq <- lists:seq(1, N)],
test_btree(Sorted), % sorted regular
- test_btree(lists:reverse(Sorted)). % sorted reverse
+ test_btree(lists:reverse(Sorted)), % sorted reverse
+ test_btree(shuffle(Sorted)). % randomly distributed
test_btree(KeyValues) ->
{ok, Fd} = couch_file:open("foo", [create,overwrite]),
{ok, Btree} = open(nil, Fd),
+ ReduceFun =
+ fun(reduce, KVs) ->
+ length(KVs);
+ (combine, Reds) ->
+ lists:sum(Reds)
+ end,
+ Btree1 = set_options(Btree, [{reduce, ReduceFun}]),
% first dump in all the values in one go
- {ok, Btree10} = add_remove(Btree, KeyValues, []),
+ {ok, Btree10} = add_remove(Btree1, KeyValues, []),
+
+
+ Len = length(KeyValues),
+
+ {ok, Len} = reduce(Btree10, nil, nil),
+
+ % Count of all from start to Val1
+ Val1 = Len div 3,
+ {ok, Val1} = reduce(Btree10, nil, Val1),
+ % Count of all from Val1 to end
+ CountVal1ToEnd = Len - Val1 + 1,
+ {ok, CountVal1ToEnd} = reduce(Btree10, Val1, nil),
+
+ % Count of all from Val1 to Val2
+ Val2 = 2*Len div 3,
+ CountValRange = Val2 - Val1 + 1,
+ {ok, CountValRange} = reduce(Btree10, Val1, Val2),
+
+ % get the leading reduction as we foldl/r
+ {ok, true} = foldl(Btree10, Val1, fun(_X, LeadingReds, _Acc) ->
+ CountToStart = Val1 - 1,
+ CountToStart = final_reduce(Btree10, LeadingReds),
+ {stop, true} % change Acc to 'true'
+ end,
+ false),
+ {ok, true} = foldr(Btree10, Val1, fun(_X, LeadingReds, _Acc) ->
+ CountToEnd = Len - Val1,
+ CountToEnd = final_reduce(Btree10, LeadingReds),
+ {stop, true} % change Acc to 'true'
+ end,
+ false),
+
ok = test_keys(Btree10, KeyValues),
% remove everything
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 2dc863dc..67e64730 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -17,10 +17,18 @@
-export([save_docs/2, save_docs/3, get_db_info/1, update_doc/3, update_docs/2, update_docs/3]).
-export([delete_doc/3,open_doc/2,open_doc/3,enum_docs_since/4,enum_docs_since/5]).
-export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]).
+-export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
-export([start_update_loop/2]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
-export([start_copy_compact_int/2]).
+-export([btree_by_id_split/1,
+ btree_by_id_join/2,
+ btree_by_id_reduce/2,
+ btree_by_seq_split/1,
+ btree_by_seq_join/2,
+ btree_by_seq_reduce/2]).
+
-include("couch_db.hrl").
-record(db_header,
@@ -363,6 +371,12 @@ doc_flush_binaries(Doc, Fd) ->
Doc#doc{attachments = NewBins}.
+enum_docs_since_reduce_to_count(Reds) ->
+ couch_btree:final_reduce(fun btree_by_seq_reduce/2, Reds).
+
+enum_docs_reduce_to_count(Reds) ->
+ couch_btree:final_reduce(fun btree_by_id_reduce/2, Reds).
+
enum_docs_since(MainPid, SinceSeq, Direction, InFun, Ctx) ->
Db = get_db(MainPid),
couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Ctx).
@@ -407,22 +421,38 @@ btree_by_seq_join(Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}) ->
deleted_conflict_revs = DelConflicts,
deleted = Deleted}.
-btree_by_name_split(#full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}) ->
- {Id, {Seq, Tree}}.
+btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq,
+ deleted=Deleted, rev_tree=Tree}) ->
+ {Id, {Seq, case Deleted of true -> 1; false-> 0 end, Tree}}.
-btree_by_name_join(Id, {Seq, Tree}) ->
- #full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}.
+btree_by_id_join(Id, {Seq, Deleted, Tree}) ->
+ #full_doc_info{id=Id, update_seq=Seq, deleted=Deleted==1, rev_tree=Tree}.
+
+btree_by_id_reduce(reduce, FullDocInfos) ->
+ % count the number of deleted documents
+ length([1 || #full_doc_info{deleted=false} <- FullDocInfos]);
+btree_by_id_reduce(combine, Reds) ->
+ lists:sum(Reds).
+
+btree_by_seq_reduce(reduce, DocInfos) ->
+ % count the number of deleted documents
+ length(DocInfos);
+btree_by_seq_reduce(combine, Reds) ->
+ lists:sum(Reds).
+
init_db(DbName, Filepath, Fd, Header) ->
{ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
ok = couch_stream:set_min_buffer(SummaryStream, 10000),
{ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
- [{split, fun(V) -> btree_by_name_split(V) end},
- {join, fun(K,V) -> btree_by_name_join(K,V) end}] ),
+ [{split, fun btree_by_id_split/1},
+ {join, fun btree_by_id_join/2},
+ {reduce, fun btree_by_id_reduce/2}]),
{ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
- [{split, fun(V) -> btree_by_seq_split(V) end},
- {join, fun(K,V) -> btree_by_seq_join(K,V) end}] ),
+ [{split, fun btree_by_seq_split/1},
+ {join, fun btree_by_seq_join/2},
+ {reduce, fun btree_by_seq_reduce/2}]),
{ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd),
#db{
@@ -437,8 +467,7 @@ init_db(DbName, Filepath, Fd, Header) ->
doc_count = Header#db_header.doc_count,
doc_del_count = Header#db_header.doc_del_count,
name = DbName,
- filepath=Filepath
- }.
+ filepath=Filepath }.
close_db(#db{fd=Fd,summary_stream=Ss}) ->
couch_file:close(Fd),
@@ -759,7 +788,9 @@ new_index_entries([FullDocInfo|RestInfos], DocCount, DelCount, AccById, AccBySeq
if Deleted -> {DocCount, DelCount + 1};
true -> {DocCount + 1, DelCount}
end,
- new_index_entries(RestInfos, DocCount2, DelCount2, [FullDocInfo|AccById], [DocInfo|AccBySeq]).
+ new_index_entries(RestInfos, DocCount2, DelCount2,
+ [FullDocInfo#full_doc_info{deleted=Deleted}|AccById],
+ [DocInfo|AccBySeq]).
update_docs_int(Db, DocsList, Options) ->
#db{
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index e4cf00ea..9ca5d815 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -45,6 +45,7 @@
-record(full_doc_info,
{id = "",
update_seq = 0,
+ deleted = false,
rev_tree = []
}).
diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl
index 11a380e9..9ebb0b93 100644
--- a/src/couchdb/couch_httpd.erl
+++ b/src/couchdb/couch_httpd.erl
@@ -13,7 +13,7 @@
-module(couch_httpd).
-include("couch_db.hrl").
--export([start_link/3, stop/0]).
+-export([start_link/3, stop/0, handle_request/2]).
-record(doc_query_args, {
options = [],
@@ -35,7 +35,7 @@
}).
start_link(BindAddress, Port, DocumentRoot) ->
- Loop = fun (Req) -> handle_request(Req, DocumentRoot) end,
+ Loop = fun (Req) -> apply(couch_httpd, handle_request, [Req, DocumentRoot]) end,
mochiweb_http:start([
{loop, Loop},
{name, ?MODULE},
@@ -47,6 +47,7 @@ stop() ->
mochiweb_http:stop(?MODULE).
handle_request(Req, DocumentRoot) ->
+
% alias HEAD to GET as mochiweb takes care of stripping the body
Method = case Req:get(method) of
'HEAD' -> 'GET';
@@ -263,18 +264,19 @@ handle_db_request(Req, 'GET', {_DbName, Db, ["_all_docs"]}) ->
true -> StartDocId
end,
- FoldlFun = make_view_fold_fun(Req, QueryArgs),
+ FoldlFun = make_view_fold_fun(Req, QueryArgs, TotalRowCount,
+ fun couch_db:enum_docs_reduce_to_count/1),
AdapterFun = fun(#full_doc_info{id=Id}=FullDocInfo, Offset, Acc) ->
case couch_doc:to_doc_info(FullDocInfo) of
#doc_info{deleted=false, rev=Rev} ->
- FoldlFun(Id, Id, {obj, [{rev, Rev}]}, Offset, TotalRowCount, Acc);
+ FoldlFun({{Id, Id}, {obj, [{rev, Rev}]}}, Offset, Acc);
#doc_info{deleted=true} ->
{ok, Acc}
end
end,
{ok, FoldResult} = couch_db:enum_docs(Db, StartId, Dir, AdapterFun,
{Count, SkipCount, undefined, []}),
- finish_view_fold(Req, {ok, TotalRowCount, FoldResult});
+ finish_view_fold(Req, TotalRowCount, {ok, FoldResult});
handle_db_request(_Req, _Method, {_DbName, _Db, ["_all_docs"]}) ->
throw({method_not_allowed, "GET,HEAD"});
@@ -290,7 +292,8 @@ handle_db_request(Req, 'GET', {_DbName, Db, ["_all_docs_by_seq"]}) ->
{ok, Info} = couch_db:get_db_info(Db),
TotalRowCount = proplists:get_value(doc_count, Info),
- FoldlFun = make_view_fold_fun(Req, QueryArgs),
+ FoldlFun = make_view_fold_fun(Req, QueryArgs, TotalRowCount,
+ fun couch_db:enum_docs_since_reduce_to_count/1),
StartKey2 = case StartKey of
nil -> 0;
<<>> -> 100000000000;
@@ -321,9 +324,9 @@ handle_db_request(Req, 'GET', {_DbName, Db, ["_all_docs_by_seq"]}) ->
false -> []
end
},
- FoldlFun(Id, UpdateSeq, Json, Offset, TotalRowCount, Acc)
+ FoldlFun({{UpdateSeq, Id}, Json}, Offset, Acc)
end, {Count, SkipCount, undefined, []}),
- finish_view_fold(Req, {ok, TotalRowCount, FoldResult});
+ finish_view_fold(Req, TotalRowCount, {ok, FoldResult});
handle_db_request(_Req, _Method, {_DbName, _Db, ["_all_docs_by_seq"]}) ->
throw({method_not_allowed, "GET,HEAD"});
@@ -331,17 +334,31 @@ handle_db_request(_Req, _Method, {_DbName, _Db, ["_all_docs_by_seq"]}) ->
handle_db_request(Req, 'GET', {DbName, _Db, ["_view", DocId, ViewName]}) ->
#view_query_args{
start_key = StartKey,
+ end_key = EndKey,
count = Count,
skip = SkipCount,
direction = Dir,
- start_docid = StartDocId
- } = QueryArgs = parse_view_query(Req),
- View = {DbName, "_design/" ++ DocId, ViewName},
- Start = {StartKey, StartDocId},
- FoldlFun = make_view_fold_fun(Req, QueryArgs),
- FoldAccInit = {Count, SkipCount, undefined, []},
- FoldResult = couch_view:fold(View, Start, Dir, FoldlFun, FoldAccInit),
- finish_view_fold(Req, FoldResult);
+ start_docid = StartDocId,
+ end_docid = EndDocId
+ } = QueryArgs = parse_view_query(Req),
+ case couch_view:get_map_view({DbName, "_design/" ++ DocId, ViewName}) of
+ {ok, View} ->
+ {ok, RowCount} = couch_view:get_row_count(View),
+ Start = {StartKey, StartDocId},
+ FoldlFun = make_view_fold_fun(Req, QueryArgs, RowCount,
+ fun couch_view:reduce_to_count/1),
+ FoldAccInit = {Count, SkipCount, undefined, []},
+ FoldResult = couch_view:fold(View, Start, Dir, FoldlFun, FoldAccInit),
+ finish_view_fold(Req, RowCount, FoldResult);
+ {not_found, Reason} ->
+ case couch_view:get_reduce_view({DbName, "_design/" ++ DocId, ViewName}) of
+ {ok, View} ->
+ {ok, Value} = couch_view:reduce(View, {StartKey, StartDocId}, {EndKey, EndDocId}),
+ send_json(Req, {obj, [{ok,true}, {result, Value}]});
+ _ ->
+ throw({not_found, Reason})
+ end
+ end;
handle_db_request(_Req, _Method, {_DbName, _Db, ["_view", _DocId, _ViewName]}) ->
throw({method_not_allowed, "GET,HEAD"});
@@ -358,10 +375,12 @@ handle_db_request(Req, 'POST', {_DbName, Db, ["_missing_revs"]}) ->
handle_db_request(Req, 'POST', {DbName, _Db, ["_temp_view"]}) ->
#view_query_args{
start_key = StartKey,
+ end_key = EndKey,
count = Count,
skip = SkipCount,
direction = Dir,
- start_docid = StartDocId
+ start_docid = StartDocId,
+ end_docid = EndDocId
} = QueryArgs = parse_view_query(Req),
ContentType = case Req:get_primary_header_value("content-type") of
@@ -370,13 +389,25 @@ handle_db_request(Req, 'POST', {DbName, _Db, ["_temp_view"]}) ->
Else ->
Else
end,
-
- View = {temp, DbName, ContentType, Req:recv_body()},
- Start = {StartKey, StartDocId},
- FoldlFun = make_view_fold_fun(Req, QueryArgs),
- FoldAccInit = {Count, SkipCount, undefined, []},
- FoldResult = couch_view:fold(View, Start, Dir, FoldlFun, FoldAccInit),
- finish_view_fold(Req, FoldResult);
+ case cjson:decode(Req:recv_body()) of
+ {obj, Props} ->
+ MapSrc = proplists:get_value("map",Props),
+ RedSrc = proplists:get_value("reduce",Props),
+ {ok, View} = couch_view:get_reduce_view(
+ {temp, DbName, ContentType, MapSrc, RedSrc}),
+ {ok, Value} = couch_view:reduce(View, {StartKey, StartDocId}, {EndKey, EndDocId}),
+ send_json(Req, {obj, [{ok,true}, {result, Value}]});
+ Src when is_list(Src) ->
+
+ {ok, View} = couch_view:get_map_view({temp, DbName, ContentType, Src}),
+ Start = {StartKey, StartDocId},
+ {ok, TotalRows} = couch_view:get_row_count(View),
+ FoldlFun = make_view_fold_fun(Req, QueryArgs, TotalRows,
+ fun couch_view:reduce_to_count/1),
+ FoldAccInit = {Count, SkipCount, undefined, []},
+ FoldResult = couch_view:fold(View, Start, Dir, FoldlFun, FoldAccInit),
+ finish_view_fold(Req, TotalRows, FoldResult)
+ end;
handle_db_request(_Req, _Method, {_DbName, _Db, ["_temp_view"]}) ->
throw({method_not_allowed, "POST"});
@@ -618,7 +649,8 @@ parse_view_query(Req) ->
end
end, #view_query_args{}, QueryList).
-make_view_fold_fun(Req, QueryArgs) ->
+
+make_view_fold_fun(Req, QueryArgs, TotalViewCount, ReduceCountFun) ->
#view_query_args{
end_key = EndKey,
end_docid = EndDocId,
@@ -626,7 +658,8 @@ make_view_fold_fun(Req, QueryArgs) ->
count = Count
} = QueryArgs,
- PassedEndFun = case Dir of
+ PassedEndFun =
+ case Dir of
fwd ->
fun(ViewKey, ViewId) ->
couch_view:less_json({EndKey, EndDocId}, {ViewKey, ViewId})
@@ -636,10 +669,11 @@ make_view_fold_fun(Req, QueryArgs) ->
couch_view:less_json({ViewKey, ViewId}, {EndKey, EndDocId})
end
end,
-
- NegCountFun = fun(Id, Key, Value, Offset, TotalViewCount,
+
+ NegCountFun = fun({{Key, DocId}, Value}, OffsetReds,
{AccCount, AccSkip, Resp, AccRevRows}) ->
- PassedEnd = PassedEndFun(Key, Id),
+ Offset = ReduceCountFun(OffsetReds),
+ PassedEnd = PassedEndFun(Key, DocId),
case {PassedEnd, AccCount, AccSkip, Resp} of
{true, _, _, _} -> % The stop key has been passed, stop looping.
{stop, {AccCount, AccSkip, Resp, AccRevRows}};
@@ -654,17 +688,18 @@ make_view_fold_fun(Req, QueryArgs) ->
JsonBegin = io_lib:format("{\"total_rows\":~w,\"offset\":~w,\"rows\":[",
[TotalViewCount, Offset2]),
Resp2:write_chunk(lists:flatten(JsonBegin)),
- JsonObj = {obj, [{id, Id}, {key, Key}, {value, Value}]},
+ JsonObj = {obj, [{id, DocId}, {key, Key}, {value, Value}]},
{ok, {AccCount + 1, 0, Resp2, [cjson:encode(JsonObj) | AccRevRows]}};
{_, AccCount, _, Resp} ->
- JsonObj = {obj, [{id, Id}, {key, Key}, {value, Value}]},
+ JsonObj = {obj, [{id, DocId}, {key, Key}, {value, Value}]},
{ok, {AccCount + 1, 0, Resp, [cjson:encode(JsonObj), "," | AccRevRows]}}
end
end,
- PosCountFun = fun(Id, Key, Value, Offset, TotalViewCount,
+ PosCountFun = fun({{Key, DocId}, Value}, OffsetReds,
{AccCount, AccSkip, Resp, AccRevRows}) ->
- PassedEnd = PassedEndFun(Key, Id),
+ Offset = ReduceCountFun(OffsetReds),
+ PassedEnd = PassedEndFun(Key, DocId),
case {PassedEnd, AccCount, AccSkip, Resp} of
{true, _, _, _} ->
% The stop key has been passed, stop looping.
@@ -678,11 +713,11 @@ make_view_fold_fun(Req, QueryArgs) ->
Resp2 = start_json_response(Req, 200),
JsonBegin = io_lib:format("{\"total_rows\":~w,\"offset\":~w,\"rows\":[\r\n",
[TotalViewCount, Offset]),
- JsonObj = {obj, [{id, Id}, {key, Key}, {value, Value}]},
+ JsonObj = {obj, [{id, DocId}, {key, Key}, {value, Value}]},
Resp2:write_chunk(lists:flatten(JsonBegin ++ cjson:encode(JsonObj))),
{ok, {AccCount - 1, 0, Resp2, AccRevRows}};
{_, AccCount, _, Resp} when (AccCount > 0) ->
- JsonObj = {obj, [{"id", Id}, {"key", Key}, {"value", Value}]},
+ JsonObj = {obj, [{"id", DocId}, {"key", Key}, {"value", Value}]},
Resp:write_chunk(",\r\n" ++ lists:flatten(cjson:encode(JsonObj))),
{ok, {AccCount - 1, 0, Resp, AccRevRows}}
end
@@ -692,16 +727,16 @@ make_view_fold_fun(Req, QueryArgs) ->
false -> NegCountFun
end.
-finish_view_fold(Req, FoldResult) ->
+finish_view_fold(Req, TotalRows, FoldResult) ->
case FoldResult of
- {ok, TotalRows, {_, _, undefined, _}} ->
+ {ok, {_, _, undefined, _}} ->
% nothing found in the view, nothing has been returned
% send empty view
send_json(Req, 200, {obj, [
{total_rows, TotalRows},
{rows, {}}
]});
- {ok, _TotalRows, {_, _, Resp, AccRevRows}} ->
+ {ok, {_, _, Resp, AccRevRows}} ->
% end the view
Resp:write_chunk(lists:flatten(AccRevRows) ++ "\r\n]}"),
end_json_response(Resp);
diff --git a/src/couchdb/couch_key_tree.erl b/src/couchdb/couch_key_tree.erl
index 80a7005f..80561b39 100644
--- a/src/couchdb/couch_key_tree.erl
+++ b/src/couchdb/couch_key_tree.erl
@@ -18,7 +18,7 @@
-export([map/2, get_all_leafs/1, get_leaf_keys/1, count_leafs/1]).
% a key tree looks like this:
-% Tree -> [] or [{Key, Value, Tree} | SiblingTree]
+% Tree -> [] or [{Key, Value, ChildTree} | SiblingTree]
% ChildTree -> Tree
% SiblingTree -> [] or [{SiblingKey, Value, Tree} | Tree]
% And each Key < SiblingKey
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl
index a6e1750a..1fb2462c 100644
--- a/src/couchdb/couch_query_servers.erl
+++ b/src/couchdb/couch_query_servers.erl
@@ -17,7 +17,7 @@
-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3,stop/0]).
-export([start_doc_map/2, map_docs/2, stop_doc_map/1]).
-
+-export([reduce/3, combine/3]).
-export([test/0, test/1]).
-include("couch_db.hrl").
@@ -65,32 +65,21 @@ writeline(Port, String) ->
% send command and get a response.
prompt(Port, Json) ->
writeline(Port, cjson:encode(Json)),
- read_json(Port).
+ case read_json(Port) of
+ {obj, [{"error", Id}, {"reason", Reason}]} ->
+ throw({list_to_atom(Id),Reason});
+ {obj, [{"reason", Reason}, {"error", Id}]} ->
+ throw({list_to_atom(Id),Reason});
+ Result ->
+ Result
+ end.
start_doc_map(Lang, Functions) ->
- Port =
- case gen_server:call(couch_query_servers, {get_port, Lang}) of
- {ok, Port0} ->
- link(Port0),
- Port0;
- {empty, Cmd} ->
- ?LOG_INFO("Spawning new ~s instance.", [Lang]),
- open_port({spawn, Cmd}, [stream,
- {line, 1000},
- exit_status,
- hide]);
- Error ->
- throw(Error)
- end,
- true = prompt(Port, {"reset"}),
+ Port = get_linked_port(Lang),
% send the functions as json strings
lists:foreach(fun(FunctionSource) ->
- case prompt(Port, {"add_fun", FunctionSource}) of
- true -> ok;
- {obj, [{"error", Id}, {"reason", Reason}]} ->
- throw({Id, Reason})
- end
+ true = prompt(Port, {"add_fun", FunctionSource})
end,
Functions),
{ok, {Lang, Port}}.
@@ -100,19 +89,13 @@ map_docs({_Lang, Port}, Docs) ->
Results = lists:map(
fun(Doc) ->
Json = couch_doc:to_json_obj(Doc, []),
- case prompt(Port, {"map_doc", Json}) of
- {obj, [{"error", Id}, {"reason", Reason}]} ->
- throw({list_to_atom(Id),Reason});
- {obj, [{"reason", Reason}, {"error", Id}]} ->
- throw({list_to_atom(Id),Reason});
- Results when is_tuple(Results) ->
- % the results are a json array of function map yields like this:
- % {FunResults1, FunResults2 ...}
- % where funresults is are json arrays of key value pairs:
- % {{Key1, Value1}, {Key2, Value2}}
- % Convert to real lists, execept the key, value pairs
- [tuple_to_list(FunResult) || FunResult <- tuple_to_list(Results)]
- end
+ Results = prompt(Port, {"map_doc", Json}),
+ % the results are a json array of function map yields like this:
+ % {FunResults1, FunResults2 ...}
+ % where funresults is are json arrays of key value pairs:
+ % {{Key1, Value1}, {Key2, Value2}}
+ % Convert to real lists, execept the key, value pairs
+ [tuple_to_list(FunResult) || FunResult <- tuple_to_list(Results)]
end,
Docs),
{ok, Results}.
@@ -121,10 +104,68 @@ map_docs({_Lang, Port}, Docs) ->
stop_doc_map(nil) ->
ok;
stop_doc_map({Lang, Port}) ->
+ return_linked_port(Lang, Port).
+
+get_linked_port(Lang) ->
+ case gen_server:call(couch_query_servers, {get_port, Lang}) of
+ {ok, Port0} ->
+ link(Port0),
+ true = prompt(Port0, {"reset"}),
+ Port0;
+ {empty, Cmd} ->
+ ?LOG_INFO("Spawning new ~s instance.", [Lang]),
+ open_port({spawn, Cmd}, [stream,
+ {line, 1000},
+ exit_status,
+ hide]);
+ Error ->
+ throw(Error)
+ end.
+
+return_linked_port(Lang, Port) ->
ok = gen_server:call(couch_query_servers, {return_port, {Lang, Port}}),
true = unlink(Port),
ok.
+group_reductions_results([]) ->
+ [];
+group_reductions_results(List) ->
+ {Heads, Tails} = lists:foldl(
+ fun([H|T], {HAcc,TAcc}) ->
+ {[H|HAcc], [T|TAcc]}
+ end, {[], []}, List),
+ case Tails of
+ [[]|_] -> % no tails left
+ [Heads];
+ _ ->
+ [Heads | group_reductions_results(Tails)]
+ end.
+
+combine(_Lang, [], _ReducedValues) ->
+ {ok, []};
+combine(Lang, RedSrcs, ReducedValues) ->
+ Port = get_linked_port(Lang),
+ Grouped = group_reductions_results(ReducedValues),
+ Results = lists:zipwith(
+ fun(FunSrc, Values) ->
+ {true, {Result}} =
+ prompt(Port, {"combine", {FunSrc}, list_to_tuple(Values)}),
+ Result
+ end, RedSrcs, Grouped),
+
+ return_linked_port(Lang, Port),
+ {ok, Results}.
+
+reduce(_Lang, [], _KVs) ->
+ {ok, []};
+reduce(Lang, RedSrcs, KVs) ->
+ Port = get_linked_port(Lang),
+ {true, Results} = prompt(Port,
+ {"reduce", list_to_tuple(RedSrcs), list_to_tuple(KVs)}),
+ return_linked_port(Lang, Port),
+ {ok, tuple_to_list(Results)}.
+
+
init(QueryServerList) ->
{ok, {QueryServerList, []}}.
diff --git a/src/couchdb/couch_server_sup.erl b/src/couchdb/couch_server_sup.erl
index bc9e5b84..ee555c3c 100644
--- a/src/couchdb/couch_server_sup.erl
+++ b/src/couchdb/couch_server_sup.erl
@@ -176,7 +176,7 @@ start_server(InputIniFilename) ->
case StartResult of
{ok,_} ->
% only output when startup was successful
- io:format("Find Futon, the management interface, at:~nhttp://~s:~s/_utils/index.html~n~n", [BindAddress, Port]),
+ %io:format("Find Futon, the management interface, at:~nhttp://~s:~s/_utils/index.html~n~n", [BindAddress, Port]),
io:format("Apache CouchDB has started. Time to relax.~n");
_ ->
% Since we failed startup, unconditionally dump configuration data to console
diff --git a/src/couchdb/couch_util.erl b/src/couchdb/couch_util.erl
index 0c51f39b..81234c2b 100644
--- a/src/couchdb/couch_util.erl
+++ b/src/couchdb/couch_util.erl
@@ -16,7 +16,7 @@
-export([parse_ini/1,should_flush/0, should_flush/1]).
-export([new_uuid/0, rand32/0, implode/2, collate/2, collate/3]).
-export([abs_pathname/1,abs_pathname/2, trim/1, ascii_lower/1]).
--export([encodeBase64/1, decodeBase64/1]).
+-export([encodeBase64/1, decodeBase64/1, to_hex/1]).
% arbitrarily chosen amount of memory to use before flushing to disk
@@ -30,20 +30,21 @@ start_driver(LibDir) ->
{error, already_loaded} -> ok;
Error -> exit(Error)
end.
-
-
+
+
+
new_uuid() ->
- to_hex(binary_to_list(crypto:rand_bytes(16))).
-
+ to_hex(crypto:rand_bytes(16)).
+
to_hex([]) ->
[];
+to_hex(Bin) when is_binary(Bin) ->
+ to_hex(binary_to_list(Bin));
to_hex([H|T]) ->
[to_digit(H div 16), to_digit(H rem 16) | to_hex(T)].
-to_digit(N) when N < 10 ->
- $0 + N;
-to_digit(N) ->
- $a + N-10.
+to_digit(N) when N < 10 -> $0 + N;
+to_digit(N) -> $a + N-10.
% returns a random integer
diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl
index d8006eba..cf03cccb 100644
--- a/src/couchdb/couch_view.erl
+++ b/src/couchdb/couch_view.erl
@@ -15,8 +15,9 @@
-module(couch_view).
-behaviour(gen_server).
--export([start_link/1,fold/4,fold/5,less_json/2, start_update_loop/3, start_temp_update_loop/4]).
+-export([start_link/1,fold/4,fold/5,less_json/2, start_update_loop/3, start_temp_update_loop/5]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,code_change/3]).
+-export([get_reduce_view/1, get_map_view/1,get_row_count/1,reduce_to_count/1, reduce/3]).
-include("couch_db.hrl").
@@ -26,16 +27,18 @@
name,
def_lang,
views,
- id_btree,
- current_seq,
+ reductions=[], % list of reduction names and id_num of view that contains it.
+ id_btree=nil,
+ current_seq=0,
query_server=nil
}).
-record(view,
{id_num,
- name,
- btree,
- def
+ map_names=[],
+ def,
+ btree=nil,
+ reduce_funs=[]
}).
-record(server,
@@ -47,8 +50,8 @@ start_link(RootDir) ->
-get_temp_updater(DbName, Type, Src) ->
- {ok, Pid} = gen_server:call(couch_view, {start_temp_updater, DbName, Type, Src}),
+get_temp_updater(DbName, Type, MapSrc, RedSrc) ->
+ {ok, Pid} = gen_server:call(couch_view, {start_temp_updater, DbName, Type, MapSrc, RedSrc}),
Pid.
get_updater(DbName, GroupId) ->
@@ -75,44 +78,135 @@ get_updated_group(Pid) ->
end
end.
-fold(ViewInfo, Dir, Fun, Acc) ->
- fold(ViewInfo, nil, Dir, Fun, Acc).
+get_row_count(#view{btree=Bt}) ->
+ {ok, Reds} = couch_btree:partial_reduce(Bt, nil, nil),
+ {ok, reduce_to_count(Reds)}.
+
+get_reduce_view({temp, DbName, Type, MapSrc, RedSrc}) ->
+ {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, MapSrc, RedSrc)),
+ {ok, {temp_reduce, View}};
+get_reduce_view({DbName, GroupId, Name}) ->
+ {ok, #group{views=Views,def_lang=Lang}} =
+ get_updated_group(get_updater(DbName, GroupId)),
+ get_reduce_view0(Name, Lang, Views).
+
+get_reduce_view0(_Name, _Lang, []) ->
+ {not_found, missing_named_view};
+get_reduce_view0(Name, Lang, [#view{reduce_funs=RedFuns}=View|Rest]) ->
+ case get_key_pos(Name, RedFuns, 0) of
+ 0 -> get_reduce_view0(Name, Lang, Rest);
+ N -> {ok, {reduce, N, Lang, View}}
+ end.
-fold({temp, DbName, Type, Src}, StartKey, Dir, Fun, Acc) ->
- {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, Src)),
- fold_view(View#view.btree, StartKey, Dir, Fun, Acc);
-fold({DbName, GroupId, ViewName}, StartKey, Dir, Fun, Acc) ->
+reduce({temp_reduce, #view{btree=Bt}}, Key1, Key2) ->
+ {ok, {_Count, [Reduction]}} = couch_btree:reduce(Bt, Key1, Key2),
+ {ok, Reduction};
+
+reduce({reduce, NthRed, Lang, #view{btree=Bt, reduce_funs=RedFuns}}, Key1, Key2) ->
+ {ok, PartialReductions} = couch_btree:partial_reduce(Bt, Key1, Key2),
+ PreResultPadding = lists:duplicate(NthRed - 1, []),
+ PostResultPadding = lists:duplicate(length(RedFuns) - NthRed, []),
+ {_Name, FunSrc} = lists:nth(NthRed,RedFuns),
+ ReduceFun =
+ fun(reduce, KVs) ->
+ {ok, Reduced} = couch_query_servers:reduce(Lang, [FunSrc], KVs),
+ {0, PreResultPadding ++ Reduced ++ PostResultPadding};
+ (combine, Reds) ->
+ UserReds = [[lists:nth(NthRed, UserRedsList)] || {_, UserRedsList} <- Reds],
+ {ok, Reduced} = couch_query_servers:combine(Lang, [FunSrc], UserReds),
+ {0, PreResultPadding ++ Reduced ++ PostResultPadding}
+ end,
+ {0, [FinalReduction]} = couch_btree:final_reduce(ReduceFun, PartialReductions),
+ {ok, FinalReduction}.
+
+get_key_pos(_Key, [], _N) ->
+ 0;
+get_key_pos(Key, [{Key1,_Value}|_], N) when Key == Key1 ->
+ N + 1;
+get_key_pos(Key, [_|Rest], N) ->
+ get_key_pos(Key, Rest, N+1).
+
+get_map_view({temp, DbName, Type, Src}) ->
+ {ok, #group{views=[View]}} = get_updated_group(get_temp_updater(DbName, Type, Src, [])),
+ {ok, View};
+get_map_view({DbName, GroupId, Name}) ->
{ok, #group{views=Views}} = get_updated_group(get_updater(DbName, GroupId)),
- Btree = get_view_btree(Views, ViewName),
- fold_view(Btree, StartKey, Dir, Fun, Acc).
+ get_map_view0(Name, Views).
+
+get_map_view0(_Name, []) ->
+ {not_found, missing_named_view};
+get_map_view0(Name, [#view{map_names=MapNames}=View|Rest]) ->
+ case lists:member(Name, MapNames) of
+ true -> {ok, View};
+ false -> get_map_view0(Name, Rest)
+ end.
+
+reduce_to_count(Reductions) ->
+ {Count, _} =
+ couch_btree:final_reduce(
+ fun(reduce, KVs) ->
+ {length(KVs), []};
+ (combine, Reds) ->
+ {lists:sum([Count0 || {Count0, _} <- Reds]), []}
+ end, Reductions),
+ Count.
+
+
+design_doc_to_view_group(#doc{id=Id,body={obj, Fields}}) ->
+ Language = proplists:get_value("language", Fields, "text/javascript"),
+ {obj, RawViews} = proplists:get_value("views", Fields, {obj, []}),
-fold_view(Btree, StartKey, Dir, Fun, Acc) ->
- TotalRowCount = couch_btree:row_count(Btree),
- WrapperFun = fun({{Key, DocId}, Value}, Offset, WrapperAcc) ->
- Fun(DocId, Key, Value, Offset, TotalRowCount, WrapperAcc)
- end,
- {ok, AccResult} = couch_btree:fold(Btree, StartKey, Dir, WrapperFun, Acc),
- {ok, TotalRowCount, AccResult}.
+ % extract the map/reduce views from the json fields and into lists
+ MapViewsRaw = [{Name, Src, nil} || {Name, Src} <- RawViews, is_list(Src)],
+ MapReduceViewsRaw =
+ [{Name,
+ proplists:get_value("map", MRFuns),
+ proplists:get_value("reduce", MRFuns)}
+ || {Name, {obj, MRFuns}} <- RawViews],
+
+ % add the views to a dictionary object, with the map source as the key
+ DictBySrc =
+ lists:foldl(
+ fun({Name, MapSrc, RedSrc}, DictBySrcAcc) ->
+ View =
+ case dict:find(MapSrc, DictBySrcAcc) of
+ {ok, View0} -> View0;
+ error -> #view{def=MapSrc} % create new view object
+ end,
+ View2 =
+ if RedSrc == nil ->
+ View#view{map_names=[Name|View#view.map_names]};
+ true ->
+ View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]}
+ end,
+ dict:store(MapSrc, View2, DictBySrcAcc)
+ end, dict:new(), MapViewsRaw ++ MapReduceViewsRaw),
+ % number the views
+ {Views, _N} = lists:mapfoldl(
+ fun({_Src, View}, N) ->
+ {View#view{id_num=N},N+1}
+ end, 0, dict:to_list(DictBySrc)),
+
+ reset_group(#group{name=Id, views=Views, def_lang=Language}).
+
+
+fold(#view{btree=Btree}, Dir, Fun, Acc) ->
+ {ok, _AccResult} = couch_btree:fold(Btree, Dir, Fun, Acc).
-get_view_btree([], _ViewName) ->
- throw({not_found, missing_named_view});
-get_view_btree([View | _RestViews], ViewName) when View#view.name == ViewName ->
- View#view.btree;
-get_view_btree([_View | RestViews], ViewName) ->
- get_view_btree(RestViews, ViewName).
+fold(#view{btree=Btree}, StartKey, Dir, Fun, Acc) ->
+ {ok, _AccResult} = couch_btree:fold(Btree, StartKey, Dir, Fun, Acc).
init(RootDir) ->
- UpdateNotifierFun =
+ couch_db_update_notifier:start_link(
fun({deleted, DbName}) ->
gen_server:cast(couch_view, {reset_indexes, DbName});
({created, DbName}) ->
gen_server:cast(couch_view, {reset_indexes, DbName});
(_Else) ->
ok
- end,
- couch_db_update_notifier:start_link(UpdateNotifierFun),
+ end),
ets:new(couch_views_by_db, [bag, private, named_table]),
ets:new(couch_views_by_name, [set, protected, named_table]),
ets:new(couch_views_by_updater, [set, private, named_table]),
@@ -127,8 +221,8 @@ terminate(_Reason, _) ->
catch ets:delete(couch_views_temp_fd_by_db).
-handle_call({start_temp_updater, DbName, Lang, Query}, _From, #server{root_dir=Root}=Server) ->
- <<SigInt:128/integer>> = erlang:md5(Lang ++ Query),
+handle_call({start_temp_updater, DbName, Lang, MapSrc, RedSrc}, _From, #server{root_dir=Root}=Server) ->
+ <<SigInt:128/integer>> = erlang:md5(Lang ++ [0] ++ MapSrc ++ [0] ++ RedSrc),
Name = lists:flatten(io_lib:format("_temp_~.36B",[SigInt])),
Pid =
case ets:lookup(couch_views_by_name, {DbName, Name}) of
@@ -142,7 +236,8 @@ handle_call({start_temp_updater, DbName, Lang, Query}, _From, #server{root_dir=R
ok
end,
?LOG_DEBUG("Spawning new temp update process for db ~s.", [DbName]),
- NewPid = spawn_link(couch_view, start_temp_update_loop, [DbName, Fd, Lang, Query]),
+ NewPid = spawn_link(couch_view, start_temp_update_loop,
+ [DbName, Fd, Lang, MapSrc, RedSrc]),
true = ets:insert(couch_views_temp_fd_by_db, {DbName, Fd, Count + 1}),
add_to_ets(NewPid, DbName, Name),
NewPid;
@@ -219,18 +314,22 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-start_temp_update_loop(DbName, Fd, Lang, Query) ->
+start_temp_update_loop(DbName, Fd, Lang, MapSrc, RedSrc) ->
NotifyPids = get_notify_pids(1000),
case couch_server:open(DbName) of
{ok, Db} ->
- View = #view{name="_temp", id_num=0, btree=nil, def=Query},
+ View = #view{map_names=["_temp"],
+ id_num=0,
+ btree=nil,
+ def=MapSrc,
+ reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end},
Group = #group{name="_temp",
db=Db,
views=[View],
current_seq=0,
def_lang=Lang,
id_btree=nil},
- Group2 = disk_group_to_mem(Fd, Group),
+ Group2 = disk_group_to_mem(Db, Fd, Group),
temp_update_loop(Group2, NotifyPids);
Else ->
exit(Else)
@@ -242,24 +341,24 @@ temp_update_loop(Group, NotifyPids) ->
garbage_collect(),
temp_update_loop(Group2, get_notify_pids(10000)).
+
+reset_group(#group{views=Views}=Group) ->
+ Views2 = [View#view{btree=nil} || View <- Views],
+ Group#group{db=nil,fd=nil,query_server=nil,current_seq=0,
+ id_btree=nil,views=Views2}.
+
start_update_loop(RootDir, DbName, GroupId) ->
% wait for a notify request before doing anything. This way, we can just
% exit and any exits will be noticed by the callers.
start_update_loop(RootDir, DbName, GroupId, get_notify_pids(1000)).
start_update_loop(RootDir, DbName, GroupId, NotifyPids) ->
- {Db, DefLang, Defs} =
- case couch_server:open(DbName) of
+ {Db, DbGroup} =
+ case (catch couch_server:open(DbName)) of
{ok, Db0} ->
- case couch_db:open_doc(Db0, GroupId) of
+ case (catch couch_db:open_doc(Db0, GroupId)) of
{ok, Doc} ->
- case couch_doc:get_view_functions(Doc) of
- none ->
- delete_index_file(RootDir, DbName, GroupId),
- exit({not_found, no_views_found});
- {DefLang0, Defs0} ->
- {Db0, DefLang0, Defs0}
- end;
+ {Db0, design_doc_to_view_group(Doc)};
Else ->
delete_index_file(RootDir, DbName, GroupId),
exit(Else)
@@ -268,26 +367,48 @@ start_update_loop(RootDir, DbName, GroupId, NotifyPids) ->
delete_index_file(RootDir, DbName, GroupId),
exit(Else)
end,
- Group = open_index_file(RootDir, DbName, GroupId, DefLang, Defs),
+ FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view",
+ Group =
+ case couch_file:open(FileName) of
+ {ok, Fd} ->
+ case couch_file:read_header(Fd, <<$r, $c, $k, 0>>) of
+ {ok, ExistingDiskGroup} ->
+ % validate all the view definitions in the index are correct.
+ case reset_group(ExistingDiskGroup) == reset_group(DbGroup) of
+ true -> disk_group_to_mem(Db, Fd, ExistingDiskGroup);
+ false -> reset_file(Db, Fd, DbName, DbGroup)
+ end;
+ _ ->
+ reset_file(Db, Fd, DbName, DbGroup)
+ end;
+ {error, enoent} ->
+ case couch_file:open(FileName, [create]) of
+ {ok, Fd} -> reset_file(Db, Fd, DbName, DbGroup);
+ Error -> throw(Error)
+ end
+ end,
- try update_loop(Group#group{db=Db}, NotifyPids) of
- _ -> ok
+ update_loop(RootDir, DbName, GroupId, Group, NotifyPids).
+
+reset_file(Db, Fd, DbName, #group{name=Name} = DiskReadyGroup) ->
+ ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]),
+ ok = couch_file:truncate(Fd, 0),
+ ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, DiskReadyGroup),
+ disk_group_to_mem(Db, Fd, DiskReadyGroup).
+
+update_loop(RootDir, DbName, GroupId, #group{fd=Fd}=Group, NotifyPids) ->
+ try update_group(Group) of
+ {ok, Group2} ->
+ ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, mem_group_to_disk(Group2)),
+ [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
+ garbage_collect(),
+ update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000))
catch
restart ->
couch_file:close(Group#group.fd),
start_update_loop(RootDir, DbName, GroupId, NotifyPids ++ get_notify_pids())
end.
-update_loop(#group{fd=Fd}=Group, NotifyPids) ->
- {ok, Group2} = update_group(Group),
- ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, mem_group_to_disk(Group2)),
- [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
- garbage_collect(),
- update_loop(Group2).
-
-update_loop(Group) ->
- update_loop(Group, get_notify_pids(100000)).
-
% wait for the first request to come in.
get_notify_pids(Wait) ->
receive
@@ -351,51 +472,29 @@ nuke_dir(Dir) ->
delete_index_file(RootDir, DbName, GroupId) ->
file:delete(RootDir ++ "/." ++ DbName ++ GroupId ++ ".view").
-
-open_index_file(RootDir, DbName, GroupId, ViewLang, ViewDefs) ->
- FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view",
- case couch_file:open(FileName) of
- {ok, Fd} ->
- case couch_file:read_header(Fd, <<$r, $c, $k, 0>>) of
- {ok, #group{views=Views}=Group} ->
- % validate all the view definitions in the index are correct.
- case same_view_def(Views, ViewDefs) of
- true -> disk_group_to_mem(Fd, Group);
- false -> reset_header(GroupId, Fd, ViewLang, ViewDefs)
- end;
- _ ->
- reset_header(GroupId, Fd, ViewLang, ViewDefs)
- end;
- _ ->
- case couch_file:open(FileName, [create]) of
- {ok, Fd} ->
- reset_header(GroupId, Fd, ViewLang, ViewDefs);
- Error ->
- throw(Error)
- end
- end.
-
-same_view_def([], []) ->
- true;
-same_view_def(DiskViews, ViewDefs) when DiskViews == [] orelse ViewDefs == []->
- false;
-same_view_def([#view{name=DiskName,def=DiskDef}|RestViews], [{Name, Def}|RestDefs]) ->
- if DiskName == Name andalso DiskDef == Def ->
- same_view_def(RestViews, RestDefs);
- true ->
- false
- end.
% Given a disk ready group structure, return an initialized, in-memory version.
-disk_group_to_mem(Fd, #group{id_btree=IdState,views=Views}=Group) ->
+disk_group_to_mem(Db, Fd, #group{id_btree=IdState,def_lang=Lang,views=Views}=Group) ->
{ok, IdBtree} = couch_btree:open(IdState, Fd),
Views2 = lists:map(
- fun(#view{btree=BtreeState}=View) ->
- {ok, Btree} = couch_btree:open(BtreeState, Fd, [{less, fun less_json/2}]),
+ fun(#view{btree=BtreeState,reduce_funs=RedFuns}=View) ->
+ FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns],
+ ReduceFun =
+ fun(reduce, KVs) ->
+ {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs, KVs),
+ {length(KVs), Reduced};
+ (combine, Reds) ->
+ Count = lists:sum([Count0 || {Count0, _} <- Reds]),
+ UserReds = [UserRedsList || {_, UserRedsList} <- Reds],
+ {ok, Reduced} = couch_query_servers:combine(Lang, FunSrcs, UserReds),
+ {Count, Reduced}
+ end,
+ {ok, Btree} = couch_btree:open(BtreeState, Fd,
+ [{less, fun less_json/2},{reduce, ReduceFun}]),
View#view{btree=Btree}
end,
Views),
- Group#group{fd=Fd, id_btree=IdBtree, views=Views2}.
+ Group#group{db=Db, fd=Fd, id_btree=IdBtree, views=Views2}.
% Given an initialized, in-memory group structure, return a disk ready version.
mem_group_to_disk(#group{id_btree=IdBtree,views=Views}=Group) ->
@@ -405,23 +504,7 @@ mem_group_to_disk(#group{id_btree=IdBtree,views=Views}=Group) ->
View#view{btree=State}
end,
Views),
- Group#group{fd=nil, id_btree=couch_btree:get_state(IdBtree), views=Views2}.
-
-reset_header(GroupId, Fd, DefLanguage, NamedViews) ->
- couch_file:truncate(Fd, 0),
- {Views, _N} = lists:mapfoldl(
- fun({Name, Definiton}, N) ->
- {#view{name=Name, id_num=N, btree=nil, def=Definiton}, N+1}
- end,
- 0, NamedViews),
- Group = #group{name=GroupId,
- fd=Fd,
- views=Views,
- current_seq=0,
- def_lang=DefLanguage,
- id_btree=nil},
- ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, Group),
- disk_group_to_mem(Fd, Group).
+ Group#group{db=nil, fd=nil, id_btree=couch_btree:get_state(IdBtree), views=Views2}.
@@ -506,17 +589,12 @@ process_doc(Db, DocInfo, {Docs, #group{name=GroupId}=Group, ViewKVs, DocIdViewId
% anything in the definition changed.
case couch_db:open_doc(Db, DocInfo) of
{ok, Doc} ->
- case couch_doc:get_view_functions(Doc) of
- none ->
- throw(restart);
- {DefLang, NewDefs} ->
- case Group#group.def_lang == DefLang andalso same_view_def(Group#group.views, NewDefs) of
- true ->
- % nothing changed, keeping on computing
- {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}};
- false ->
- throw(restart)
- end
+ case design_doc_to_view_group(Doc) == reset_group(Group) of
+ true ->
+ % nothing changed, keeping on computing
+ {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}};
+ false ->
+ throw(restart)
end;
{not_found, deleted} ->
throw(restart)