From a684f95cbcee7f2568a2ce04e7dc2bbb605a27b3 Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Thu, 15 May 2008 21:51:22 +0000 Subject: Incremental reduce first checkin. Warning! Disk format change. git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@656861 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_btree.erl | 307 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 238 insertions(+), 69 deletions(-) (limited to 'src/couchdb/couch_btree.erl') diff --git a/src/couchdb/couch_btree.erl b/src/couchdb/couch_btree.erl index 5b7ac65a..6fd9b0c8 100644 --- a/src/couchdb/couch_btree.erl +++ b/src/couchdb/couch_btree.erl @@ -13,8 +13,8 @@ -module(couch_btree). -export([open/2, open/3, query_modify/4, add/2, add_remove/3, foldl/3, foldl/4]). --export([foldr/3, foldr/4, fold/4, fold/5, row_count/1]). --export([lookup/2, get_state/1, test/1, test/0]). +-export([foldr/3, foldr/4, fold/4, fold/5, reduce/3, partial_reduce/3, final_reduce/2]). +-export([lookup/2, get_state/1, set_options/2, test/1, test/0]). -define(CHUNK_THRESHOLD, 16#fff). @@ -23,7 +23,8 @@ root, extract_kv = fun({Key, Value}) -> {Key, Value} end, assemble_kv = fun(Key, Value) -> {Key, Value} end, - less = fun(A, B) -> A < B end + less = fun(A, B) -> A < B end, + reduce = nil }). extract(#btree{extract_kv=Extract}, Value) -> @@ -46,7 +47,9 @@ set_options(Bt, [{split, Extract}|Rest]) -> set_options(Bt, [{join, Assemble}|Rest]) -> set_options(Bt#btree{assemble_kv=Assemble}, Rest); set_options(Bt, [{less, Less}|Rest]) -> - set_options(Bt#btree{less=Less}, Rest). + set_options(Bt#btree{less=Less}, Rest); +set_options(Bt, [{reduce, Reduce}|Rest]) -> + set_options(Bt#btree{reduce=Reduce}, Rest). open(State, Fd, Options) -> {ok, set_options(#btree{root=State, fd=Fd}, Options)}. @@ -54,10 +57,36 @@ open(State, Fd, Options) -> get_state(#btree{root=Root}) -> Root. -row_count(#btree{root=nil}) -> - 0; -row_count(#btree{root={_RootPointer, Count}}) -> - Count. +final_reduce(#btree{reduce=Reduce}, Val) -> + final_reduce(Reduce, Val); +final_reduce(Reduce, {[], []}) -> + Reduce(reduce, []); +final_reduce(_Bt, {[], [Red]}) -> + Red; +final_reduce(Reduce, {[], Reductions}) -> + Reduce(combine, Reductions); +final_reduce(Reduce, {KVs, Reductions}) -> + Red = Reduce(reduce, KVs), + final_reduce(Reduce, {[], [Red | Reductions]}). + +reduce(Bt, Key1, Key2) -> + {ok, Reds} = partial_reduce(Bt, Key1, Key2), + {ok, final_reduce(Bt, Reds)}. + +partial_reduce(#btree{root=Root}=Bt, Key1, Key2) -> + {KeyStart, KeyEnd} = + case Key1 == nil orelse Key2 == nil orelse less(Bt, Key1, Key2) of + true -> {Key1, Key2}; + false -> {Key2, Key1} + end, + case Root of + nil -> + {ok, {[], []}}; + _ -> + {KVs, Nodes} = collect_node(Bt, Root, KeyStart, KeyEnd), + {ok, {KVs, [Red || {_K,{_P,Red}} <- Nodes]}} + end. + foldl(Bt, Fun, Acc) -> fold(Bt, fwd, Fun, Acc). @@ -73,16 +102,16 @@ foldr(Bt, Key, Fun, Acc) -> % wraps a 2 arity function with the proper 3 arity function convert_fun_arity(Fun) when is_function(Fun, 2) -> - fun(KV, _Offset, AccIn) -> Fun(KV, AccIn) end; + fun(KV, _Reds, AccIn) -> Fun(KV, AccIn) end; convert_fun_arity(Fun) when is_function(Fun, 3) -> Fun. % Already arity 3 fold(Bt, Dir, Fun, Acc) -> - {_ContinueFlag, Acc2} = stream_node(Bt, 0, Bt#btree.root, nil, Dir, convert_fun_arity(Fun), Acc), + {_ContinueFlag, Acc2} = stream_node(Bt, [], Bt#btree.root, nil, Dir, convert_fun_arity(Fun), Acc), {ok, Acc2}. fold(Bt, Key, Dir, Fun, Acc) -> - {_ContinueFlag, Acc2} = stream_node(Bt, 0, Bt#btree.root, Key, Dir, convert_fun_arity(Fun), Acc), + {_ContinueFlag, Acc2} = stream_node(Bt, [], Bt#btree.root, Key, Dir, convert_fun_arity(Fun), Acc), {ok, Acc2}. add(Bt, InsertKeyValues) -> @@ -136,7 +165,7 @@ lookup(#btree{root=Root, less=Less}=Bt, Keys) -> lookup(_Bt, nil, Keys) -> {ok, [{Key, not_found} || Key <- Keys]}; -lookup(Bt, {Pointer, _Count}, Keys) -> +lookup(Bt, {Pointer, _Reds}, Keys) -> {NodeType, NodeList} = get_node(Bt, Pointer), case NodeType of kp_node -> @@ -229,7 +258,7 @@ modify_node(Bt, RootPointerInfo, Actions, QueryOutput) -> nil -> NodeType = kv_node, NodeList = []; - {Pointer, _count} -> + {Pointer, _Reds} -> {NodeType, NodeList} = get_node(Bt, Pointer) end, case NodeType of @@ -249,14 +278,12 @@ modify_node(Bt, RootPointerInfo, Actions, QueryOutput) -> {ok, ResultList, QueryOutput2, Bt3} end. - -count(kv_node, NodeList) -> - length(NodeList); -count(kp_node, NodeList) -> - lists:foldl( fun({_Key, {_Pointer, Count}}, AccCount) -> - Count + AccCount - end, - 0, NodeList). +reduce_node(#btree{reduce=nil}, _NodeType, _NodeList) -> + []; +reduce_node(#btree{reduce=R}, kp_node, NodeList) -> + R(combine, [Red || {_K, {_P, Red}} <- NodeList]); +reduce_node(#btree{reduce=R}, kv_node, NodeList) -> + R(reduce, NodeList). get_node(#btree{fd = Fd}, NodePos) -> @@ -267,7 +294,7 @@ get_node(#btree{fd = Fd}, NodePos) -> % Validating this prevents infinite loops should % a disk corruption occur. [throw({error, disk_corruption}) - || {_Key, {SubNodePos, _Count}} + || {_Key, {SubNodePos, _Reds}} <- NodeList, SubNodePos >= NodePos]; kv_node -> ok @@ -282,7 +309,7 @@ write_node(Bt, NodeType, NodeList) -> begin {ok, Pointer} = couch_file:append_term(Bt#btree.fd, {NodeType, ANodeList}), {LastKey, _} = lists:last(ANodeList), - {LastKey, {Pointer, count(NodeType, ANodeList)}} + {LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList)}} end || ANodeList <- NodeListList @@ -362,93 +389,172 @@ modify_kvnode(Bt, [{Key, Value} | RestKVs], [{ActionType, ActionKey, ActionValue end end. + +collect_node(Bt, {P, R}, KeyStart, KeyEnd) -> + case get_node(Bt, P) of + {kp_node, NodeList} -> + collect_kp_node(Bt, NodeList, KeyStart, KeyEnd); + {kv_node, KVs} -> + GTEKeyStartKVs = + case KeyStart of + nil -> + KVs; + _ -> + lists:dropwhile( + fun({Key,_}) -> + less(Bt, Key, KeyStart) + end, KVs) + end, + KVs2 = + case KeyEnd of + nil -> + GTEKeyStartKVs; + _ -> + lists:dropwhile( + fun({Key,_}) -> + less(Bt, KeyEnd, Key) + end, lists:reverse(GTEKeyStartKVs)) + end, + case length(KVs2) == length(KVs) of + true -> % got full node, return the already calculated reduction + {[], [{nil, {P, R}}]}; + false -> % otherwise return the keyvalues for later reduction + {KVs2, []} + end + end. + + +collect_kp_node(Bt, NodeList, KeyStart, KeyEnd) -> + Nodes = + case KeyStart of + nil -> + NodeList; + _ -> + lists:dropwhile( + fun({Key,_}) -> + less(Bt, Key, KeyStart) + end, NodeList) + end, + + case KeyEnd of + nil -> + case Nodes of + [] -> + {[], []}; + [{_, StartNodeInfo}|RestNodes] -> + {DownKVs, DownNodes} = collect_node(Bt, StartNodeInfo, KeyStart, KeyEnd), + {DownKVs, DownNodes ++ RestNodes} + end; + _ -> + {GTEKeyEndNodes, LTKeyEndNodes} = lists:splitwith( + fun({Key,_}) -> + not less(Bt, Key, KeyEnd) + end, lists:reverse(Nodes)), + + {MatchingKVs, MatchingNodes} = + case lists:reverse(LTKeyEndNodes) of + [{_, StartNodeInfo}] -> + collect_node(Bt, StartNodeInfo, KeyStart, KeyEnd); + [{_, StartNodeInfo}|RestLTNodes] -> + % optimization, since we have more KP nodes in range, we don't need + % to provide the endkey when searching the start node, making + % collecting the node faster. + {DownKVs, DownNodes} = collect_node(Bt, StartNodeInfo, KeyStart, nil), + {DownKVs, DownNodes ++ RestLTNodes}; + [] -> + {[], []} + end, + + case lists:reverse(GTEKeyEndNodes) of + [{_, EndNodeInfo} | _] when LTKeyEndNodes == [] -> + collect_node(Bt, EndNodeInfo, KeyStart, KeyEnd); + [{_, EndNodeInfo} | _] -> + {KVs1, DownNodes1} = collect_node(Bt, EndNodeInfo, nil, KeyEnd), + {KVs1 ++ MatchingKVs, DownNodes1 ++ MatchingNodes}; + [] -> + {MatchingKVs, MatchingNodes} + end + end. + + adjust_dir(fwd, List) -> List; adjust_dir(rev, List) -> lists:reverse(List). -stream_node(Bt, Offset, PointerInfo, nil, Dir, Fun, Acc) -> - stream_node(Bt, Offset, PointerInfo, Dir, Fun, Acc); -stream_node(_Bt, _Offset, nil, _StartKey, _Dir, _Fun, Acc) -> +stream_node(Bt, Reds, PointerInfo, nil, Dir, Fun, Acc) -> + stream_node(Bt, Reds, PointerInfo, Dir, Fun, Acc); +stream_node(_Bt, _Reds, nil, _StartKey, _Dir, _Fun, Acc) -> {ok, Acc}; -stream_node(Bt, Offset, {Pointer, _Count}, StartKey, Dir, Fun, Acc) -> +stream_node(Bt, Reds, {Pointer, _Reds}, StartKey, Dir, Fun, Acc) -> {NodeType, NodeList} = get_node(Bt, Pointer), case NodeType of kp_node -> - stream_kp_node(Bt, Offset, adjust_dir(Dir, NodeList), StartKey, Dir, Fun, Acc); + stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, Dir, Fun, Acc); kv_node -> - stream_kv_node(Bt, Offset, adjust_dir(Dir, NodeList), StartKey, Dir, Fun, Acc) + stream_kv_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, Dir, Fun, Acc) end. -stream_node(_Bt, _Offset, nil, _Dir, _Fun, Acc) -> +stream_node(_Bt, _Reds, nil, _Dir, _Fun, Acc) -> {ok, Acc}; -stream_node(Bt, Offset, {Pointer, _Count}, Dir, Fun, Acc) -> +stream_node(Bt, Reds, {Pointer, _Reds}, Dir, Fun, Acc) -> {NodeType, NodeList} = get_node(Bt, Pointer), case NodeType of kp_node -> - stream_kp_node(Bt, Offset, adjust_dir(Dir, NodeList), Dir, Fun, Acc); + stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), Dir, Fun, Acc); kv_node -> - stream_kv_node(Bt, Offset, adjust_dir(Dir, NodeList), Dir, Fun, Acc) + stream_kv_node2(Bt, Reds, [], adjust_dir(Dir, NodeList), Dir, Fun, Acc) end. -stream_kp_node(_Bt, _Offset, [], _Dir, _Fun, Acc) -> +stream_kp_node(_Bt, _Reds, [], _Dir, _Fun, Acc) -> {ok, Acc}; -stream_kp_node(Bt, Offset, [{_Key, {Pointer, Count}} | Rest], Dir, Fun, Acc) -> - case stream_node(Bt, Offset, {Pointer, Count}, Dir, Fun, Acc) of +stream_kp_node(Bt, Reds, [{_Key, {Pointer, Red}} | Rest], Dir, Fun, Acc) -> + case stream_node(Bt, Reds, {Pointer, Red}, Dir, Fun, Acc) of {ok, Acc2} -> - stream_kp_node(Bt, Offset + Count, Rest, Dir, Fun, Acc2); + stream_kp_node(Bt, [Red | Reds], Rest, Dir, Fun, Acc2); {stop, Acc2} -> {stop, Acc2} end. -drop_nodes(_Bt, Offset, _StartKey, []) -> - {Offset, []}; -drop_nodes(Bt, Offset, StartKey, [{NodeKey, {Pointer, Count}} | RestKPs]) -> +drop_nodes(_Bt, Reds, _StartKey, []) -> + {Reds, []}; +drop_nodes(Bt, Reds, StartKey, [{NodeKey, {Pointer, Red}} | RestKPs]) -> case less(Bt, NodeKey, StartKey) of - true -> drop_nodes(Bt, Offset + Count, StartKey, RestKPs); - false -> {Offset, [{NodeKey, {Pointer, Count}} | RestKPs]} + true -> drop_nodes(Bt, [Red | Reds], StartKey, RestKPs); + false -> {Reds, [{NodeKey, {Pointer, Reds}} | RestKPs]} end. -stream_kp_node(Bt, Offset, KPs, StartKey, Dir, Fun, Acc) -> - {NewOffset, NodesToStream} = +stream_kp_node(Bt, Reds, KPs, StartKey, Dir, Fun, Acc) -> + {NewReds, NodesToStream} = case Dir of fwd -> % drop all nodes sorting before the key - drop_nodes(Bt, Offset, StartKey, KPs); + drop_nodes(Bt, Reds, StartKey, KPs); rev -> % keep all nodes sorting before the key, AND the first node to sort after RevKPs = lists:reverse(KPs), case lists:splitwith(fun({Key, _Pointer}) -> less(Bt, Key, StartKey) end, RevKPs) of {_RevBefore, []} -> % everything sorts before it - {Offset, KPs}; + {Reds, KPs}; {RevBefore, [FirstAfter | Drop]} -> - {Offset + count(kp_node, Drop), [FirstAfter | lists:reverse(RevBefore)]} + {[Red || {_K,{_P,Red}} <- Drop] ++ Reds, + [FirstAfter | lists:reverse(RevBefore)]} end end, case NodesToStream of [] -> {ok, Acc}; [{_Key, PointerInfo} | Rest] -> - case stream_node(Bt, NewOffset, PointerInfo, StartKey, Dir, Fun, Acc) of + case stream_node(Bt, NewReds, PointerInfo, StartKey, Dir, Fun, Acc) of {ok, Acc2} -> - stream_kp_node(Bt, NewOffset, Rest, Dir, Fun, Acc2); + stream_kp_node(Bt, NewReds, Rest, Dir, Fun, Acc2); {stop, Acc2} -> {stop, Acc2} end end. -stream_kv_node(_Bt, _Offset, [], _Dir, _Fun, Acc) -> - {ok, Acc}; -stream_kv_node(Bt, Offset, [{K, V} | RestKVs], Dir, Fun, Acc) -> - case Fun(assemble(Bt, K, V), Offset, Acc) of - {ok, Acc2} -> - stream_kv_node(Bt, Offset + 1, RestKVs, Dir, Fun, Acc2); - {stop, Acc2} -> - {stop, Acc2} - end. - -stream_kv_node(Bt, Offset, KVs, StartKey, Dir, Fun, Acc) -> +stream_kv_node(Bt, Reds, KVs, StartKey, Dir, Fun, Acc) -> DropFun = case Dir of fwd -> @@ -456,11 +562,36 @@ stream_kv_node(Bt, Offset, KVs, StartKey, Dir, Fun, Acc) -> rev -> fun({Key, _}) -> less(Bt, StartKey, Key) end end, - % drop all nodes preceding the key - GTEKVs = lists:dropwhile(DropFun, KVs), - LenSkipped = length(KVs) - length(GTEKVs), - stream_kv_node(Bt, Offset + LenSkipped, GTEKVs, Dir, Fun, Acc). + {LTKVs, GTEKVs} = lists:splitwith(DropFun, KVs), + stream_kv_node2(Bt, Reds, LTKVs, GTEKVs, Dir, Fun, Acc). + +stream_kv_node2(_Bt, _Reds, _PrevKVs, [], _Dir, _Fun, Acc) -> + {ok, Acc}; +stream_kv_node2(Bt, Reds, PrevKVs, [{K,V} | RestKVs], Dir, Fun, Acc) -> + case Fun(assemble(Bt, K, V), {PrevKVs, Reds}, Acc) of + {ok, Acc2} -> + stream_kv_node2(Bt, Reds, [{K,V} | PrevKVs], RestKVs, Dir, Fun, Acc2); + {stop, Acc2} -> + {stop, Acc2} + end. +shuffle(List) -> +%% Determine the log n portion then randomize the list. + randomize(round(math:log(length(List)) + 0.5), List). + +randomize(1, List) -> + randomize(List); +randomize(T, List) -> + lists:foldl(fun(_E, Acc) -> + randomize(Acc) + end, randomize(List), lists:seq(1, (T - 1))). + +randomize(List) -> + D = lists:map(fun(A) -> + {random:uniform(), A} + end, List), + {_, D1} = lists:unzip(lists:keysort(1, D)), + D1. @@ -468,20 +599,58 @@ test()-> test(1000). test(N) -> - KeyValues = [{random:uniform(), random:uniform()} || _Seq <- lists:seq(1, N)], - test_btree(KeyValues), % randomly distributed - Sorted = lists:sort(KeyValues), + Sorted = [{Seq, random:uniform()} || Seq <- lists:seq(1, N)], test_btree(Sorted), % sorted regular - test_btree(lists:reverse(Sorted)). % sorted reverse + test_btree(lists:reverse(Sorted)), % sorted reverse + test_btree(shuffle(Sorted)). % randomly distributed test_btree(KeyValues) -> {ok, Fd} = couch_file:open("foo", [create,overwrite]), {ok, Btree} = open(nil, Fd), + ReduceFun = + fun(reduce, KVs) -> + length(KVs); + (combine, Reds) -> + lists:sum(Reds) + end, + Btree1 = set_options(Btree, [{reduce, ReduceFun}]), % first dump in all the values in one go - {ok, Btree10} = add_remove(Btree, KeyValues, []), + {ok, Btree10} = add_remove(Btree1, KeyValues, []), + + + Len = length(KeyValues), + + {ok, Len} = reduce(Btree10, nil, nil), + + % Count of all from start to Val1 + Val1 = Len div 3, + {ok, Val1} = reduce(Btree10, nil, Val1), + % Count of all from Val1 to end + CountVal1ToEnd = Len - Val1 + 1, + {ok, CountVal1ToEnd} = reduce(Btree10, Val1, nil), + + % Count of all from Val1 to Val2 + Val2 = 2*Len div 3, + CountValRange = Val2 - Val1 + 1, + {ok, CountValRange} = reduce(Btree10, Val1, Val2), + + % get the leading reduction as we foldl/r + {ok, true} = foldl(Btree10, Val1, fun(_X, LeadingReds, _Acc) -> + CountToStart = Val1 - 1, + CountToStart = final_reduce(Btree10, LeadingReds), + {stop, true} % change Acc to 'true' + end, + false), + {ok, true} = foldr(Btree10, Val1, fun(_X, LeadingReds, _Acc) -> + CountToEnd = Len - Val1, + CountToEnd = final_reduce(Btree10, LeadingReds), + {stop, true} % change Acc to 'true' + end, + false), + ok = test_keys(Btree10, KeyValues), % remove everything -- cgit v1.2.3