diff options
Diffstat (limited to 'apps/fabric/src/fabric_rpc.erl')
-rw-r--r-- | apps/fabric/src/fabric_rpc.erl | 388 |
1 files changed, 388 insertions, 0 deletions
diff --git a/apps/fabric/src/fabric_rpc.erl b/apps/fabric/src/fabric_rpc.erl new file mode 100644 index 00000000..f56e3f68 --- /dev/null +++ b/apps/fabric/src/fabric_rpc.erl @@ -0,0 +1,388 @@ +-module(fabric_rpc). + +-export([get_db_info/1, get_doc_count/1, get_update_seq/1]). +-export([open_doc/3, open_revs/4, get_missing_revs/2, update_docs/3]). +-export([all_docs/2, changes/3, map_view/4, reduce_view/4, group_info/2]). +-export([create_db/3, delete_db/3, reset_validation_funs/1, set_security/3, + set_revs_limit/3]). + +-include("fabric.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-record (view_acc, { + db, + limit, + include_docs, + offset = nil, + total_rows, + reduce_fun = fun couch_db:enum_docs_reduce_to_count/1, + group_level = 0 +}). + +%% rpc endpoints +%% call to with_db will supply your M:F with a #db{} and then remaining args + +all_docs(DbName, #view_query_args{keys=nil} = QueryArgs) -> + {ok, Db} = couch_db:open(DbName, []), + #view_query_args{ + start_key = StartKey, + start_docid = StartDocId, + end_key = EndKey, + end_docid = EndDocId, + limit = Limit, + skip = Skip, + include_docs = IncludeDocs, + direction = Dir, + inclusive_end = Inclusive + } = QueryArgs, + {ok, Total} = couch_db:get_doc_count(Db), + Acc0 = #view_acc{ + db = Db, + include_docs = IncludeDocs, + limit = Limit+Skip, + total_rows = Total + }, + EndKeyType = if Inclusive -> end_key; true -> end_key_gt end, + Options = [ + {dir, Dir}, + {start_key, if is_binary(StartKey) -> StartKey; true -> StartDocId end}, + {EndKeyType, if is_binary(EndKey) -> EndKey; true -> EndDocId end} + ], + {ok, _, Acc} = couch_db:enum_docs(Db, fun view_fold/3, Acc0, Options), + final_response(Total, Acc#view_acc.offset). + +changes(DbName, Args, StartSeq) -> + #changes_args{style=Style, dir=Dir} = Args, + case couch_db:open(DbName, []) of + {ok, Db} -> + Enum = fun changes_enumerator/2, + Opts = [{dir,Dir}], + Acc0 = {Db, StartSeq, Args}, + try + {ok, {_, LastSeq, _}} = + couch_db:changes_since(Db, Style, StartSeq, Enum, Opts, Acc0), + rexi:reply({complete, LastSeq}) + after + couch_db:close(Db) + end; + Error -> + rexi:reply(Error) + end. + +map_view(DbName, DDoc, ViewName, QueryArgs) -> + {ok, Db} = couch_db:open(DbName, []), + #view_query_args{ + limit = Limit, + skip = Skip, + keys = Keys, + include_docs = IncludeDocs, + stale = Stale, + view_type = ViewType + } = QueryArgs, + MinSeq = if Stale == ok -> 0; true -> couch_db:get_update_seq(Db) end, + Group0 = couch_view_group:design_doc_to_view_group(Db, DDoc), + {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), + {ok, Group} = couch_view_group:request_group(Pid, MinSeq), + View = fabric_view:extract_view(Pid, ViewName, Group#group.views, ViewType), + {ok, Total} = couch_view:get_row_count(View), + Acc0 = #view_acc{ + db = Db, + include_docs = IncludeDocs, + limit = Limit+Skip, + total_rows = Total, + reduce_fun = fun couch_view:reduce_to_count/1 + }, + case Keys of + nil -> + Options = couch_httpd_view:make_key_options(QueryArgs), + {ok, _, Acc} = couch_view:fold(View, fun view_fold/3, Acc0, Options); + _ -> + Acc = lists:foldl(fun(Key, AccIn) -> + KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key}, + Options = couch_httpd_view:make_key_options(KeyArgs), + {_Go, _, Out} = couch_view:fold(View, fun view_fold/3, AccIn, + Options), + Out + end, Acc0, Keys) + end, + final_response(Total, Acc#view_acc.offset). + +reduce_view(DbName, Group0, ViewName, QueryArgs) -> + {ok, Db} = couch_db:open(DbName, []), + #view_query_args{ + group_level = GroupLevel, + limit = Limit, + skip = Skip, + keys = Keys, + stale = Stale + } = QueryArgs, + GroupFun = group_rows_fun(GroupLevel), + MinSeq = if Stale == ok -> 0; true -> couch_db:get_update_seq(Db) end, + {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), + {ok, #group{views=Views, def_lang=Lang}} = couch_view_group:request_group( + Pid, MinSeq), + {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce), + ReduceView = {reduce, NthRed, Lang, View}, + Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip}, + case Keys of + nil -> + Options0 = couch_httpd_view:make_key_options(QueryArgs), + Options = [{key_group_fun, GroupFun} | Options0], + couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options); + _ -> + lists:map(fun(Key) -> + KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key}, + Options0 = couch_httpd_view:make_key_options(KeyArgs), + Options = [{key_group_fun, GroupFun} | Options0], + couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options) + end, Keys) + end, + rexi:reply(complete). + +create_db(DbName, Options, Doc) -> + mem3_util:write_db_doc(Doc), + rexi:reply(case couch_server:create(DbName, Options) of + {ok, _} -> + ok; + Error -> + Error + end). + +delete_db(DbName, Options, DocId) -> + mem3_util:delete_db_doc(DocId), + rexi:reply(couch_server:delete(DbName, Options)). + +get_db_info(DbName) -> + with_db(DbName, [], {couch_db, get_db_info, []}). + +get_doc_count(DbName) -> + with_db(DbName, [], {couch_db, get_doc_count, []}). + +get_update_seq(DbName) -> + with_db(DbName, [], {couch_db, get_update_seq, []}). + +set_security(DbName, SecObj, Options) -> + with_db(DbName, Options, {couch_db, set_security, [SecObj]}). + +set_revs_limit(DbName, Limit, Options) -> + with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}). + +open_doc(DbName, DocId, Options) -> + with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}). + +open_revs(DbName, Id, Revs, Options) -> + with_db(DbName, Options, {couch_db, open_doc_revs, [Id, Revs, Options]}). + +get_missing_revs(DbName, IdRevsList) -> + % reimplement here so we get [] for Ids with no missing revs in response + rexi:reply(case couch_db:open(DbName, []) of + {ok, Db} -> + Ids = [Id1 || {Id1, _Revs} <- IdRevsList], + {ok, lists:zipwith(fun({Id, Revs}, FullDocInfoResult) -> + case FullDocInfoResult of + {ok, #full_doc_info{rev_tree=RevisionTree} = FullInfo} -> + MissingRevs = couch_key_tree:find_missing(RevisionTree, Revs), + {Id, MissingRevs, possible_ancestors(FullInfo, MissingRevs)}; + not_found -> + {Id, Revs, []} + end + end, IdRevsList, couch_btree:lookup(Db#db.id_tree, Ids))}; + Error -> + Error + end). + +update_docs(DbName, Docs0, Options) -> + case proplists:get_value(replicated_changes, Options) of + true -> + X = replicated_changes; + _ -> + X = interactive_edit + end, + Docs = make_att_readers(Docs0), + with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}). + +group_info(DbName, Group0) -> + {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), + rexi:reply(couch_view_group:request_group_info(Pid)). + +reset_validation_funs(DbName) -> + case couch_db:open(DbName, []) of + {ok, #db{main_pid = Pid}} -> + gen_server:cast(Pid, {load_validation_funs, undefined}); + _ -> + ok + end. + +%% +%% internal +%% + +with_db(DbName, Options, {M,F,A}) -> + case couch_db:open(DbName, Options) of + {ok, Db} -> + rexi:reply(try + apply(M, F, [Db | A]) + catch Exception -> + Exception; + error:Reason -> + ?LOG_ERROR("~p ~p ~p~n~p", [?MODULE, {M,F}, Reason, + erlang:get_stacktrace()]), + {error, Reason} + end); + Error -> + rexi:reply(Error) + end. + +view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) -> + % matches for _all_docs and translates #full_doc_info{} -> KV pair + case couch_doc:to_doc_info(FullDocInfo) of + #doc_info{revs=[#rev_info{deleted=false, rev=Rev}|_]} -> + Id = FullDocInfo#full_doc_info.id, + Value = {[{rev,couch_doc:rev_to_str(Rev)}]}, + view_fold({{Id,Id}, Value}, OffsetReds, Acc); + #doc_info{revs=[#rev_info{deleted=true}|_]} -> + {ok, Acc} + end; +view_fold(KV, OffsetReds, #view_acc{offset=nil, total_rows=Total} = Acc) -> + % calculates the offset for this shard + #view_acc{reduce_fun=Reduce} = Acc, + Offset = Reduce(OffsetReds), + case rexi:sync_reply({total_and_offset, Total, Offset}) of + ok -> + view_fold(KV, OffsetReds, Acc#view_acc{offset=Offset}); + stop -> + exit(normal); + timeout -> + exit(timeout) + end; +view_fold(_KV, _Offset, #view_acc{limit=0} = Acc) -> + % we scanned through limit+skip local rows + {stop, Acc}; +view_fold({{Key,Id}, Value}, _Offset, Acc) -> + % the normal case + #view_acc{ + db = Db, + limit = Limit, + include_docs = IncludeDocs + } = Acc, + Doc = if not IncludeDocs -> undefined; true -> + case couch_db:open_doc(Db, Id, []) of + {not_found, deleted} -> + null; + {not_found, missing} -> + undefined; + {ok, Doc0} -> + couch_doc:to_json_obj(Doc0, []) + end + end, + case rexi:sync_reply(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of + ok -> + {ok, Acc#view_acc{limit=Limit-1}}; + timeout -> + exit(timeout) + end. + +final_response(Total, nil) -> + case rexi:sync_reply({total_and_offset, Total, Total}) of ok -> + rexi:reply(complete); + stop -> + ok; + timeout -> + exit(timeout) + end; +final_response(_Total, _Offset) -> + rexi:reply(complete). + +group_rows_fun(exact) -> + fun({Key1,_}, {Key2,_}) -> Key1 == Key2 end; +group_rows_fun(0) -> + fun(_A, _B) -> true end; +group_rows_fun(GroupLevel) when is_integer(GroupLevel) -> + fun({[_|_] = Key1,_}, {[_|_] = Key2,_}) -> + lists:sublist(Key1, GroupLevel) == lists:sublist(Key2, GroupLevel); + ({Key1,_}, {Key2,_}) -> + Key1 == Key2 + end. + +reduce_fold(_Key, _Red, #view_acc{limit=0} = Acc) -> + {stop, Acc}; +reduce_fold(_Key, Red, #view_acc{group_level=0} = Acc) -> + send(null, Red, Acc); +reduce_fold(Key, Red, #view_acc{group_level=exact} = Acc) -> + send(Key, Red, Acc); +reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) -> + send(lists:sublist(K, I), Red, Acc). + +send(Key, Value, #view_acc{limit=Limit} = Acc) -> + case rexi:sync_reply(#view_row{key=Key, value=Value}) of + ok -> + {ok, Acc#view_acc{limit=Limit-1}}; + stop -> + exit(normal); + timeout -> + exit(timeout) + end. + +changes_enumerator(DocInfo, {Db, _Seq, Args}) -> + #changes_args{include_docs=IncludeDocs, filter=FilterFun} = Args, + #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]} + = DocInfo, + case [Result || Result <- FilterFun(DocInfo), Result /= null] of + [] -> + {ok, {Db, Seq, Args}}; + Results -> + ChangesRow = changes_row(Db, Seq, Id, Results, Rev, Del, IncludeDocs), + Go = rexi:sync_reply(ChangesRow), + {Go, {Db, Seq, Args}} + end. + +changes_row(_, Seq, Id, Results, _, true, true) -> + #view_row{key=Seq, id=Id, value=Results, doc=deleted}; +changes_row(_, Seq, Id, Results, _, true, false) -> + #view_row{key=Seq, id=Id, value=Results, doc=deleted}; +changes_row(Db, Seq, Id, Results, Rev, false, true) -> + #view_row{key=Seq, id=Id, value=Results, doc=doc_member(Db, Id, Rev)}; +changes_row(_, Seq, Id, Results, _, false, false) -> + #view_row{key=Seq, id=Id, value=Results}. + +doc_member(Shard, Id, Rev) -> + case couch_db:open_doc_revs(Shard, Id, [Rev], []) of + {ok, [{ok,Doc}]} -> + couch_doc:to_json_obj(Doc, []); + Error -> + Error + end. + +possible_ancestors(_FullInfo, []) -> + []; +possible_ancestors(FullInfo, MissingRevs) -> + #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo), + LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo], + % Find the revs that are possible parents of this rev + lists:foldl(fun({LeafPos, LeafRevId}, Acc) -> + % this leaf is a "possible ancenstor" of the missing + % revs if this LeafPos lessthan any of the missing revs + case lists:any(fun({MissingPos, _}) -> + LeafPos < MissingPos end, MissingRevs) of + true -> + [{LeafPos, LeafRevId} | Acc]; + false -> + Acc + end + end, [], LeafRevs). + +make_att_readers([]) -> + []; +make_att_readers([#doc{atts=Atts0} = Doc | Rest]) -> + % % go through the attachments looking for 'follows' in the data, + % % replace with function that reads the data from MIME stream. + Atts = [Att#att{data=make_att_reader(D)} || #att{data=D} = Att <- Atts0], + [Doc#doc{atts = Atts} | make_att_readers(Rest)]. + +make_att_reader({follows, Parser}) -> + fun() -> + Parser ! {get_bytes, self()}, + receive {bytes, Bytes} -> Bytes end + end; +make_att_reader(Else) -> + Else. |