diff options
Diffstat (limited to 'apps/couch/src/couch_btree.erl')
-rw-r--r-- | apps/couch/src/couch_btree.erl | 676 |
1 files changed, 676 insertions, 0 deletions
diff --git a/apps/couch/src/couch_btree.erl b/apps/couch/src/couch_btree.erl new file mode 100644 index 00000000..0e47bac7 --- /dev/null +++ b/apps/couch/src/couch_btree.erl @@ -0,0 +1,676 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_btree). + +-export([open/2, open/3, query_modify/4, add/2, add_remove/3]). +-export([fold/4, full_reduce/1, final_reduce/2, foldl/3, foldl/4]). +-export([fold_reduce/4, lookup/2, get_state/1, set_options/2]). + +-define(CHUNK_THRESHOLD, 16#4ff). + +-record(btree, + {fd, + root, + extract_kv = fun({Key, Value}) -> {Key, Value} end, + assemble_kv = fun(Key, Value) -> {Key, Value} end, + less = fun(A, B) -> A < B end, + reduce = nil + }). + +extract(#btree{extract_kv=Extract}, Value) -> + Extract(Value). + +assemble(#btree{assemble_kv=Assemble}, Key, Value) -> + Assemble(Key, Value). + +less(#btree{less=Less}, A, B) -> + Less(A, B). + +% pass in 'nil' for State if a new Btree. +open(State, Fd) -> + {ok, #btree{root=State, fd=Fd}}. + +set_options(Bt, []) -> + Bt; +set_options(Bt, [{split, Extract}|Rest]) -> + set_options(Bt#btree{extract_kv=Extract}, Rest); +set_options(Bt, [{join, Assemble}|Rest]) -> + set_options(Bt#btree{assemble_kv=Assemble}, Rest); +set_options(Bt, [{less, Less}|Rest]) -> + set_options(Bt#btree{less=Less}, Rest); +set_options(Bt, [{reduce, Reduce}|Rest]) -> + set_options(Bt#btree{reduce=Reduce}, Rest). + +open(State, Fd, Options) -> + {ok, set_options(#btree{root=State, fd=Fd}, Options)}. + +get_state(#btree{root=Root}) -> + Root. + +final_reduce(#btree{reduce=Reduce}, Val) -> + final_reduce(Reduce, Val); +final_reduce(Reduce, {[], []}) -> + Reduce(reduce, []); +final_reduce(_Bt, {[], [Red]}) -> + Red; +final_reduce(Reduce, {[], Reductions}) -> + Reduce(rereduce, Reductions); +final_reduce(Reduce, {KVs, Reductions}) -> + Red = Reduce(reduce, KVs), + final_reduce(Reduce, {[], [Red | Reductions]}). + +fold_reduce(#btree{root=Root}=Bt, Fun, Acc, Options) -> + Dir = couch_util:get_value(dir, Options, fwd), + StartKey = couch_util:get_value(start_key, Options), + EndKey = couch_util:get_value(end_key, Options), + KeyGroupFun = couch_util:get_value(key_group_fun, Options, fun(_,_) -> true end), + {StartKey2, EndKey2} = + case Dir of + rev -> {EndKey, StartKey}; + fwd -> {StartKey, EndKey} + end, + try + {ok, Acc2, GroupedRedsAcc2, GroupedKVsAcc2, GroupedKey2} = + reduce_stream_node(Bt, Dir, Root, StartKey2, EndKey2, undefined, [], [], + KeyGroupFun, Fun, Acc), + if GroupedKey2 == undefined -> + {ok, Acc2}; + true -> + case Fun(GroupedKey2, {GroupedKVsAcc2, GroupedRedsAcc2}, Acc2) of + {ok, Acc3} -> {ok, Acc3}; + {stop, Acc3} -> {ok, Acc3} + end + end + catch + throw:{stop, AccDone} -> {ok, AccDone} + end. + +full_reduce(#btree{root=nil,reduce=Reduce}) -> + {ok, Reduce(reduce, [])}; +full_reduce(#btree{root={_P, Red}}) -> + {ok, Red}. + +% wraps a 2 arity function with the proper 3 arity function +convert_fun_arity(Fun) when is_function(Fun, 2) -> + fun(KV, _Reds, AccIn) -> Fun(KV, AccIn) end; +convert_fun_arity(Fun) when is_function(Fun, 3) -> + Fun. % Already arity 3 + +make_key_in_end_range_function(#btree{less=Less}, fwd, Options) -> + case couch_util:get_value(end_key_gt, Options) of + undefined -> + case couch_util:get_value(end_key, Options) of + undefined -> + fun(_Key) -> true end; + LastKey -> + fun(Key) -> not Less(LastKey, Key) end + end; + EndKey -> + fun(Key) -> Less(Key, EndKey) end + end; +make_key_in_end_range_function(#btree{less=Less}, rev, Options) -> + case couch_util:get_value(end_key_gt, Options) of + undefined -> + case couch_util:get_value(end_key, Options) of + undefined -> + fun(_Key) -> true end; + LastKey -> + fun(Key) -> not Less(Key, LastKey) end + end; + EndKey -> + fun(Key) -> Less(EndKey, Key) end + end. + + +foldl(Bt, Fun, Acc) -> + fold(Bt, Fun, Acc, []). + +foldl(Bt, Fun, Acc, Options) -> + fold(Bt, Fun, Acc, Options). + + +fold(#btree{root=nil}, _Fun, Acc, _Options) -> + {ok, {[], []}, Acc}; +fold(#btree{root=Root}=Bt, Fun, Acc, Options) -> + Dir = couch_util:get_value(dir, Options, fwd), + InRange = make_key_in_end_range_function(Bt, Dir, Options), + Result = + case couch_util:get_value(start_key, Options) of + undefined -> + stream_node(Bt, [], Bt#btree.root, InRange, Dir, + convert_fun_arity(Fun), Acc); + StartKey -> + stream_node(Bt, [], Bt#btree.root, StartKey, InRange, Dir, + convert_fun_arity(Fun), Acc) + end, + case Result of + {ok, Acc2}-> + {_P, FullReduction} = Root, + {ok, {[], [FullReduction]}, Acc2}; + {stop, LastReduction, Acc2} -> + {ok, LastReduction, Acc2} + end. + +add(Bt, InsertKeyValues) -> + add_remove(Bt, InsertKeyValues, []). + +add_remove(Bt, InsertKeyValues, RemoveKeys) -> + {ok, [], Bt2} = query_modify(Bt, [], InsertKeyValues, RemoveKeys), + {ok, Bt2}. + +query_modify(Bt, LookupKeys, InsertValues, RemoveKeys) -> + #btree{root=Root} = Bt, + InsertActions = lists:map( + fun(KeyValue) -> + {Key, Value} = extract(Bt, KeyValue), + {insert, Key, Value} + end, InsertValues), + RemoveActions = [{remove, Key, nil} || Key <- RemoveKeys], + FetchActions = [{fetch, Key, nil} || Key <- LookupKeys], + SortFun = + fun({OpA, A, _}, {OpB, B, _}) -> + case A == B of + % A and B are equal, sort by op. + true -> op_order(OpA) < op_order(OpB); + false -> + less(Bt, A, B) + end + end, + Actions = lists:sort(SortFun, lists:append([InsertActions, RemoveActions, FetchActions])), + {ok, KeyPointers, QueryResults, Bt2} = modify_node(Bt, Root, Actions, []), + {ok, NewRoot, Bt3} = complete_root(Bt2, KeyPointers), + {ok, QueryResults, Bt3#btree{root=NewRoot}}. + +% for ordering different operatations with the same key. +% fetch < remove < insert +op_order(fetch) -> 1; +op_order(remove) -> 2; +op_order(insert) -> 3. + +lookup(#btree{root=Root, less=Less}=Bt, Keys) -> + SortedKeys = lists:sort(Less, Keys), + {ok, SortedResults} = lookup(Bt, Root, SortedKeys), + % We want to return the results in the same order as the keys were input + % but we may have changed the order when we sorted. So we need to put the + % order back into the results. + couch_util:reorder_results(Keys, SortedResults). + +lookup(_Bt, nil, Keys) -> + {ok, [{Key, not_found} || Key <- Keys]}; +lookup(Bt, {Pointer, _Reds}, Keys) -> + {NodeType, NodeList} = get_node(Bt, Pointer), + case NodeType of + kp_node -> + lookup_kpnode(Bt, list_to_tuple(NodeList), 1, Keys, []); + kv_node -> + lookup_kvnode(Bt, list_to_tuple(NodeList), 1, Keys, []) + end. + +lookup_kpnode(_Bt, _NodeTuple, _LowerBound, [], Output) -> + {ok, lists:reverse(Output)}; +lookup_kpnode(_Bt, NodeTuple, LowerBound, Keys, Output) when tuple_size(NodeTuple) < LowerBound -> + {ok, lists:reverse(Output, [{Key, not_found} || Key <- Keys])}; +lookup_kpnode(Bt, NodeTuple, LowerBound, [FirstLookupKey | _] = LookupKeys, Output) -> + N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), FirstLookupKey), + {Key, PointerInfo} = element(N, NodeTuple), + SplitFun = fun(LookupKey) -> not less(Bt, Key, LookupKey) end, + case lists:splitwith(SplitFun, LookupKeys) of + {[], GreaterQueries} -> + lookup_kpnode(Bt, NodeTuple, N + 1, GreaterQueries, Output); + {LessEqQueries, GreaterQueries} -> + {ok, Results} = lookup(Bt, PointerInfo, LessEqQueries), + lookup_kpnode(Bt, NodeTuple, N + 1, GreaterQueries, lists:reverse(Results, Output)) + end. + + +lookup_kvnode(_Bt, _NodeTuple, _LowerBound, [], Output) -> + {ok, lists:reverse(Output)}; +lookup_kvnode(_Bt, NodeTuple, LowerBound, Keys, Output) when tuple_size(NodeTuple) < LowerBound -> + % keys not found + {ok, lists:reverse(Output, [{Key, not_found} || Key <- Keys])}; +lookup_kvnode(Bt, NodeTuple, LowerBound, [LookupKey | RestLookupKeys], Output) -> + N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), LookupKey), + {Key, Value} = element(N, NodeTuple), + case less(Bt, LookupKey, Key) of + true -> + % LookupKey is less than Key + lookup_kvnode(Bt, NodeTuple, N, RestLookupKeys, [{LookupKey, not_found} | Output]); + false -> + case less(Bt, Key, LookupKey) of + true -> + % LookupKey is greater than Key + lookup_kvnode(Bt, NodeTuple, N+1, RestLookupKeys, [{LookupKey, not_found} | Output]); + false -> + % LookupKey is equal to Key + lookup_kvnode(Bt, NodeTuple, N, RestLookupKeys, [{LookupKey, {ok, assemble(Bt, LookupKey, Value)}} | Output]) + end + end. + + +complete_root(Bt, []) -> + {ok, nil, Bt}; +complete_root(Bt, [{_Key, PointerInfo}])-> + {ok, PointerInfo, Bt}; +complete_root(Bt, KPs) -> + {ok, ResultKeyPointers, Bt2} = write_node(Bt, kp_node, KPs), + complete_root(Bt2, ResultKeyPointers). + +%%%%%%%%%%%%% The chunkify function sucks! %%%%%%%%%%%%% +% It is inaccurate as it does not account for compression when blocks are +% written. Plus with the "case byte_size(term_to_binary(InList)) of" code +% it's probably really inefficient. + +chunkify(InList) -> + case byte_size(term_to_binary(InList)) of + Size when Size > ?CHUNK_THRESHOLD -> + NumberOfChunksLikely = ((Size div ?CHUNK_THRESHOLD) + 1), + ChunkThreshold = Size div NumberOfChunksLikely, + chunkify(InList, ChunkThreshold, [], 0, []); + _Else -> + [InList] + end. + +chunkify([], _ChunkThreshold, [], 0, OutputChunks) -> + lists:reverse(OutputChunks); +chunkify([], _ChunkThreshold, OutList, _OutListSize, OutputChunks) -> + lists:reverse([lists:reverse(OutList) | OutputChunks]); +chunkify([InElement | RestInList], ChunkThreshold, OutList, OutListSize, OutputChunks) -> + case byte_size(term_to_binary(InElement)) of + Size when (Size + OutListSize) > ChunkThreshold andalso OutList /= [] -> + chunkify(RestInList, ChunkThreshold, [], 0, [lists:reverse([InElement | OutList]) | OutputChunks]); + Size -> + chunkify(RestInList, ChunkThreshold, [InElement | OutList], OutListSize + Size, OutputChunks) + end. + +modify_node(Bt, RootPointerInfo, Actions, QueryOutput) -> + case RootPointerInfo of + nil -> + NodeType = kv_node, + NodeList = []; + {Pointer, _Reds} -> + {NodeType, NodeList} = get_node(Bt, Pointer) + end, + NodeTuple = list_to_tuple(NodeList), + + {ok, NewNodeList, QueryOutput2, Bt2} = + case NodeType of + kp_node -> modify_kpnode(Bt, NodeTuple, 1, Actions, [], QueryOutput); + kv_node -> modify_kvnode(Bt, NodeTuple, 1, Actions, [], QueryOutput) + end, + case NewNodeList of + [] -> % no nodes remain + {ok, [], QueryOutput2, Bt2}; + NodeList -> % nothing changed + {LastKey, _LastValue} = element(tuple_size(NodeTuple), NodeTuple), + {ok, [{LastKey, RootPointerInfo}], QueryOutput2, Bt2}; + _Else2 -> + {ok, ResultList, Bt3} = write_node(Bt2, NodeType, NewNodeList), + {ok, ResultList, QueryOutput2, Bt3} + end. + +reduce_node(#btree{reduce=nil}, _NodeType, _NodeList) -> + []; +reduce_node(#btree{reduce=R}, kp_node, NodeList) -> + R(rereduce, [Red || {_K, {_P, Red}} <- NodeList]); +reduce_node(#btree{reduce=R}=Bt, kv_node, NodeList) -> + R(reduce, [assemble(Bt, K, V) || {K, V} <- NodeList]). + + +get_node(#btree{fd = Fd}, NodePos) -> + {ok, {NodeType, NodeList}} = couch_file:pread_term(Fd, NodePos), + {NodeType, NodeList}. + +write_node(Bt, NodeType, NodeList) -> + % split up nodes into smaller sizes + NodeListList = chunkify(NodeList), + % now write out each chunk and return the KeyPointer pairs for those nodes + ResultList = [ + begin + {ok, Pointer} = couch_file:append_term(Bt#btree.fd, {NodeType, ANodeList}), + {LastKey, _} = lists:last(ANodeList), + {LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList)}} + end + || + ANodeList <- NodeListList + ], + {ok, ResultList, Bt}. + +modify_kpnode(Bt, {}, _LowerBound, Actions, [], QueryOutput) -> + modify_node(Bt, nil, Actions, QueryOutput); +modify_kpnode(Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput) -> + {ok, lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound, + tuple_size(NodeTuple), [])), QueryOutput, Bt}; +modify_kpnode(Bt, NodeTuple, LowerBound, + [{_, FirstActionKey, _}|_]=Actions, ResultNode, QueryOutput) -> + Sz = tuple_size(NodeTuple), + N = find_first_gteq(Bt, NodeTuple, LowerBound, Sz, FirstActionKey), + case N =:= Sz of + true -> + % perform remaining actions on last node + {_, PointerInfo} = element(Sz, NodeTuple), + {ok, ChildKPs, QueryOutput2, Bt2} = + modify_node(Bt, PointerInfo, Actions, QueryOutput), + NodeList = lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound, + Sz - 1, ChildKPs)), + {ok, NodeList, QueryOutput2, Bt2}; + false -> + {NodeKey, PointerInfo} = element(N, NodeTuple), + SplitFun = fun({_ActionType, ActionKey, _ActionValue}) -> + not less(Bt, NodeKey, ActionKey) + end, + {LessEqQueries, GreaterQueries} = lists:splitwith(SplitFun, Actions), + {ok, ChildKPs, QueryOutput2, Bt2} = + modify_node(Bt, PointerInfo, LessEqQueries, QueryOutput), + ResultNode2 = lists:reverse(ChildKPs, bounded_tuple_to_revlist(NodeTuple, + LowerBound, N - 1, ResultNode)), + modify_kpnode(Bt2, NodeTuple, N+1, GreaterQueries, ResultNode2, QueryOutput2) + end. + +bounded_tuple_to_revlist(_Tuple, Start, End, Tail) when Start > End -> + Tail; +bounded_tuple_to_revlist(Tuple, Start, End, Tail) -> + bounded_tuple_to_revlist(Tuple, Start+1, End, [element(Start, Tuple)|Tail]). + +bounded_tuple_to_list(Tuple, Start, End, Tail) -> + bounded_tuple_to_list2(Tuple, Start, End, [], Tail). + +bounded_tuple_to_list2(_Tuple, Start, End, Acc, Tail) when Start > End -> + lists:reverse(Acc, Tail); +bounded_tuple_to_list2(Tuple, Start, End, Acc, Tail) -> + bounded_tuple_to_list2(Tuple, Start + 1, End, [element(Start, Tuple) | Acc], Tail). + +find_first_gteq(_Bt, _Tuple, Start, End, _Key) when Start == End -> + End; +find_first_gteq(Bt, Tuple, Start, End, Key) -> + Mid = Start + ((End - Start) div 2), + {TupleKey, _} = element(Mid, Tuple), + case less(Bt, TupleKey, Key) of + true -> + find_first_gteq(Bt, Tuple, Mid+1, End, Key); + false -> + find_first_gteq(Bt, Tuple, Start, Mid, Key) + end. + +modify_kvnode(Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput) -> + {ok, lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound, tuple_size(NodeTuple), [])), QueryOutput, Bt}; +modify_kvnode(Bt, NodeTuple, LowerBound, [{ActionType, ActionKey, ActionValue} | RestActions], ResultNode, QueryOutput) when LowerBound > tuple_size(NodeTuple) -> + case ActionType of + insert -> + modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput); + remove -> + % just drop the action + modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, QueryOutput); + fetch -> + % the key/value must not exist in the tree + modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, [{not_found, {ActionKey, nil}} | QueryOutput]) + end; +modify_kvnode(Bt, NodeTuple, LowerBound, [{ActionType, ActionKey, ActionValue} | RestActions], AccNode, QueryOutput) -> + N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), ActionKey), + {Key, Value} = element(N, NodeTuple), + ResultNode = bounded_tuple_to_revlist(NodeTuple, LowerBound, N - 1, AccNode), + case less(Bt, ActionKey, Key) of + true -> + case ActionType of + insert -> + % ActionKey is less than the Key, so insert + modify_kvnode(Bt, NodeTuple, N, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput); + remove -> + % ActionKey is less than the Key, just drop the action + modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, QueryOutput); + fetch -> + % ActionKey is less than the Key, the key/value must not exist in the tree + modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [{not_found, {ActionKey, nil}} | QueryOutput]) + end; + false -> + % ActionKey and Key are maybe equal. + case less(Bt, Key, ActionKey) of + false -> + case ActionType of + insert -> + modify_kvnode(Bt, NodeTuple, N+1, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput); + remove -> + modify_kvnode(Bt, NodeTuple, N+1, RestActions, ResultNode, QueryOutput); + fetch -> + % ActionKey is equal to the Key, insert into the QueryOuput, but re-process the node + % since an identical action key can follow it. + modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [{ok, assemble(Bt, Key, Value)} | QueryOutput]) + end; + true -> + modify_kvnode(Bt, NodeTuple, N + 1, [{ActionType, ActionKey, ActionValue} | RestActions], [{Key, Value} | ResultNode], QueryOutput) + end + end. + + +reduce_stream_node(_Bt, _Dir, nil, _KeyStart, _KeyEnd, GroupedKey, GroupedKVsAcc, + GroupedRedsAcc, _KeyGroupFun, _Fun, Acc) -> + {ok, Acc, GroupedRedsAcc, GroupedKVsAcc, GroupedKey}; +reduce_stream_node(Bt, Dir, {P, _R}, KeyStart, KeyEnd, GroupedKey, GroupedKVsAcc, + GroupedRedsAcc, KeyGroupFun, Fun, Acc) -> + case get_node(Bt, P) of + {kp_node, NodeList} -> + reduce_stream_kp_node(Bt, Dir, NodeList, KeyStart, KeyEnd, GroupedKey, + GroupedKVsAcc, GroupedRedsAcc, KeyGroupFun, Fun, Acc); + {kv_node, KVs} -> + reduce_stream_kv_node(Bt, Dir, KVs, KeyStart, KeyEnd, GroupedKey, + GroupedKVsAcc, GroupedRedsAcc, KeyGroupFun, Fun, Acc) + end. + +reduce_stream_kv_node(Bt, Dir, KVs, KeyStart, KeyEnd, + GroupedKey, GroupedKVsAcc, GroupedRedsAcc, + KeyGroupFun, Fun, Acc) -> + + GTEKeyStartKVs = + case KeyStart of + undefined -> + KVs; + _ -> + lists:dropwhile(fun({Key,_}) -> less(Bt, Key, KeyStart) end, KVs) + end, + KVs2 = + case KeyEnd of + undefined -> + GTEKeyStartKVs; + _ -> + lists:takewhile( + fun({Key,_}) -> + not less(Bt, KeyEnd, Key) + end, GTEKeyStartKVs) + end, + reduce_stream_kv_node2(Bt, adjust_dir(Dir, KVs2), GroupedKey, GroupedKVsAcc, GroupedRedsAcc, + KeyGroupFun, Fun, Acc). + + +reduce_stream_kv_node2(_Bt, [], GroupedKey, GroupedKVsAcc, GroupedRedsAcc, + _KeyGroupFun, _Fun, Acc) -> + {ok, Acc, GroupedRedsAcc, GroupedKVsAcc, GroupedKey}; +reduce_stream_kv_node2(Bt, [{Key, Value}| RestKVs], GroupedKey, GroupedKVsAcc, + GroupedRedsAcc, KeyGroupFun, Fun, Acc) -> + case GroupedKey of + undefined -> + reduce_stream_kv_node2(Bt, RestKVs, Key, + [assemble(Bt,Key,Value)], [], KeyGroupFun, Fun, Acc); + _ -> + + case KeyGroupFun(GroupedKey, Key) of + true -> + reduce_stream_kv_node2(Bt, RestKVs, GroupedKey, + [assemble(Bt,Key,Value)|GroupedKVsAcc], GroupedRedsAcc, KeyGroupFun, + Fun, Acc); + false -> + case Fun(GroupedKey, {GroupedKVsAcc, GroupedRedsAcc}, Acc) of + {ok, Acc2} -> + reduce_stream_kv_node2(Bt, RestKVs, Key, [assemble(Bt,Key,Value)], + [], KeyGroupFun, Fun, Acc2); + {stop, Acc2} -> + throw({stop, Acc2}) + end + end + end. + +reduce_stream_kp_node(Bt, Dir, NodeList, KeyStart, KeyEnd, + GroupedKey, GroupedKVsAcc, GroupedRedsAcc, + KeyGroupFun, Fun, Acc) -> + Nodes = + case KeyStart of + undefined -> + NodeList; + _ -> + lists:dropwhile( + fun({Key,_}) -> + less(Bt, Key, KeyStart) + end, NodeList) + end, + NodesInRange = + case KeyEnd of + undefined -> + Nodes; + _ -> + {InRange, MaybeInRange} = lists:splitwith( + fun({Key,_}) -> + less(Bt, Key, KeyEnd) + end, Nodes), + InRange ++ case MaybeInRange of [] -> []; [FirstMaybe|_] -> [FirstMaybe] end + end, + reduce_stream_kp_node2(Bt, Dir, adjust_dir(Dir, NodesInRange), KeyStart, KeyEnd, + GroupedKey, GroupedKVsAcc, GroupedRedsAcc, KeyGroupFun, Fun, Acc). + + +reduce_stream_kp_node2(Bt, Dir, [{_Key, NodeInfo} | RestNodeList], KeyStart, KeyEnd, + undefined, [], [], KeyGroupFun, Fun, Acc) -> + {ok, Acc2, GroupedRedsAcc2, GroupedKVsAcc2, GroupedKey2} = + reduce_stream_node(Bt, Dir, NodeInfo, KeyStart, KeyEnd, undefined, + [], [], KeyGroupFun, Fun, Acc), + reduce_stream_kp_node2(Bt, Dir, RestNodeList, KeyStart, KeyEnd, GroupedKey2, + GroupedKVsAcc2, GroupedRedsAcc2, KeyGroupFun, Fun, Acc2); +reduce_stream_kp_node2(Bt, Dir, NodeList, KeyStart, KeyEnd, + GroupedKey, GroupedKVsAcc, GroupedRedsAcc, KeyGroupFun, Fun, Acc) -> + {Grouped0, Ungrouped0} = lists:splitwith(fun({Key,_}) -> + KeyGroupFun(GroupedKey, Key) end, NodeList), + {GroupedNodes, UngroupedNodes} = + case Grouped0 of + [] -> + {Grouped0, Ungrouped0}; + _ -> + [FirstGrouped | RestGrouped] = lists:reverse(Grouped0), + {RestGrouped, [FirstGrouped | Ungrouped0]} + end, + GroupedReds = [R || {_, {_,R}} <- GroupedNodes], + case UngroupedNodes of + [{_Key, NodeInfo}|RestNodes] -> + {ok, Acc2, GroupedRedsAcc2, GroupedKVsAcc2, GroupedKey2} = + reduce_stream_node(Bt, Dir, NodeInfo, KeyStart, KeyEnd, GroupedKey, + GroupedKVsAcc, GroupedReds ++ GroupedRedsAcc, KeyGroupFun, Fun, Acc), + reduce_stream_kp_node2(Bt, Dir, RestNodes, KeyStart, KeyEnd, GroupedKey2, + GroupedKVsAcc2, GroupedRedsAcc2, KeyGroupFun, Fun, Acc2); + [] -> + {ok, Acc, GroupedReds ++ GroupedRedsAcc, GroupedKVsAcc, GroupedKey} + end. + +adjust_dir(fwd, List) -> + List; +adjust_dir(rev, List) -> + lists:reverse(List). + +stream_node(Bt, Reds, {Pointer, _Reds}, StartKey, InRange, Dir, Fun, Acc) -> + {NodeType, NodeList} = get_node(Bt, Pointer), + case NodeType of + kp_node -> + stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, InRange, Dir, Fun, Acc); + kv_node -> + stream_kv_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, InRange, Dir, Fun, Acc) + end. + +stream_node(Bt, Reds, {Pointer, _Reds}, InRange, Dir, Fun, Acc) -> + {NodeType, NodeList} = get_node(Bt, Pointer), + case NodeType of + kp_node -> + stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), InRange, Dir, Fun, Acc); + kv_node -> + stream_kv_node2(Bt, Reds, [], adjust_dir(Dir, NodeList), InRange, Dir, Fun, Acc) + end. + +stream_kp_node(_Bt, _Reds, [], _InRange, _Dir, _Fun, Acc) -> + {ok, Acc}; +stream_kp_node(Bt, Reds, [{_Key, {Pointer, Red}} | Rest], InRange, Dir, Fun, Acc) -> + case stream_node(Bt, Reds, {Pointer, Red}, InRange, Dir, Fun, Acc) of + {ok, Acc2} -> + stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc2); + {stop, LastReds, Acc2} -> + {stop, LastReds, Acc2} + end. + +drop_nodes(_Bt, Reds, _StartKey, []) -> + {Reds, []}; +drop_nodes(Bt, Reds, StartKey, [{NodeKey, {Pointer, Red}} | RestKPs]) -> + case less(Bt, NodeKey, StartKey) of + true -> drop_nodes(Bt, [Red | Reds], StartKey, RestKPs); + false -> {Reds, [{NodeKey, {Pointer, Red}} | RestKPs]} + end. + +stream_kp_node(Bt, Reds, KPs, StartKey, InRange, Dir, Fun, Acc) -> + {NewReds, NodesToStream} = + case Dir of + fwd -> + % drop all nodes sorting before the key + drop_nodes(Bt, Reds, StartKey, KPs); + rev -> + % keep all nodes sorting before the key, AND the first node to sort after + RevKPs = lists:reverse(KPs), + case lists:splitwith(fun({Key, _Pointer}) -> less(Bt, Key, StartKey) end, RevKPs) of + {_RevsBefore, []} -> + % everything sorts before it + {Reds, KPs}; + {RevBefore, [FirstAfter | Drop]} -> + {[Red || {_K,{_P,Red}} <- Drop] ++ Reds, + [FirstAfter | lists:reverse(RevBefore)]} + end + end, + case NodesToStream of + [] -> + {ok, Acc}; + [{_Key, {Pointer, Red}} | Rest] -> + case stream_node(Bt, NewReds, {Pointer, Red}, StartKey, InRange, Dir, Fun, Acc) of + {ok, Acc2} -> + stream_kp_node(Bt, [Red | NewReds], Rest, InRange, Dir, Fun, Acc2); + {stop, LastReds, Acc2} -> + {stop, LastReds, Acc2} + end + end. + +stream_kv_node(Bt, Reds, KVs, StartKey, InRange, Dir, Fun, Acc) -> + DropFun = + case Dir of + fwd -> + fun({Key, _}) -> less(Bt, Key, StartKey) end; + rev -> + fun({Key, _}) -> less(Bt, StartKey, Key) end + end, + {LTKVs, GTEKVs} = lists:splitwith(DropFun, KVs), + AssembleLTKVs = [assemble(Bt,K,V) || {K,V} <- LTKVs], + stream_kv_node2(Bt, Reds, AssembleLTKVs, GTEKVs, InRange, Dir, Fun, Acc). + +stream_kv_node2(_Bt, _Reds, _PrevKVs, [], _InRange, _Dir, _Fun, Acc) -> + {ok, Acc}; +stream_kv_node2(Bt, Reds, PrevKVs, [{K,V} | RestKVs], InRange, Dir, Fun, Acc) -> + case InRange(K) of + false -> + {stop, {PrevKVs, Reds}, Acc}; + true -> + AssembledKV = assemble(Bt, K, V), + case Fun(AssembledKV, {PrevKVs, Reds}, Acc) of + {ok, Acc2} -> + stream_kv_node2(Bt, Reds, [AssembledKV | PrevKVs], RestKVs, InRange, Dir, Fun, Acc2); + {stop, Acc2} -> + {stop, {PrevKVs, Reds}, Acc2} + end + end. |