diff options
Diffstat (limited to 'deps/fabric/src/fabric_rpc.erl')
-rw-r--r-- | deps/fabric/src/fabric_rpc.erl | 485 |
1 files changed, 485 insertions, 0 deletions
diff --git a/deps/fabric/src/fabric_rpc.erl b/deps/fabric/src/fabric_rpc.erl new file mode 100644 index 00000000..3f25dfd7 --- /dev/null +++ b/deps/fabric/src/fabric_rpc.erl @@ -0,0 +1,485 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_rpc). + +-export([get_db_info/1, get_doc_count/1, get_update_seq/1]). +-export([open_doc/3, open_revs/4, get_missing_revs/2, get_missing_revs/3, + update_docs/3]). +-export([all_docs/2, changes/3, map_view/4, reduce_view/4, group_info/2]). +-export([create_db/1, delete_db/1, reset_validation_funs/1, set_security/3, + set_revs_limit/3, create_shard_db_doc/2, delete_shard_db_doc/2]). + +-include("fabric.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-record (view_acc, { + db, + limit, + include_docs, + conflicts, + doc_info = nil, + offset = nil, + total_rows, + reduce_fun = fun couch_db:enum_docs_reduce_to_count/1, + group_level = 0 +}). + +%% rpc endpoints +%% call to with_db will supply your M:F with a #db{} and then remaining args + +all_docs(DbName, #view_query_args{keys=nil} = QueryArgs) -> + {ok, Db} = get_or_create_db(DbName, []), + #view_query_args{ + start_key = StartKey, + start_docid = StartDocId, + end_key = EndKey, + end_docid = EndDocId, + limit = Limit, + skip = Skip, + include_docs = IncludeDocs, + conflicts = Conflicts, + direction = Dir, + inclusive_end = Inclusive, + extra = Extra + } = QueryArgs, + set_io_priority(DbName, Extra), + {ok, Total} = couch_db:get_doc_count(Db), + Acc0 = #view_acc{ + db = Db, + include_docs = IncludeDocs, + conflicts = Conflicts, + limit = Limit+Skip, + total_rows = Total + }, + EndKeyType = if Inclusive -> end_key; true -> end_key_gt end, + Options = [ + {dir, Dir}, + {start_key, if is_binary(StartKey) -> StartKey; true -> StartDocId end}, + {EndKeyType, if is_binary(EndKey) -> EndKey; true -> EndDocId end} + ], + {ok, _, Acc} = couch_db:enum_docs(Db, fun view_fold/3, Acc0, Options), + final_response(Total, Acc#view_acc.offset). + +changes(DbName, Args, StartSeq) -> + erlang:put(io_priority, {interactive, DbName}), + #changes_args{dir=Dir} = Args, + case get_or_create_db(DbName, []) of + {ok, Db} -> + Enum = fun changes_enumerator/2, + Opts = [{dir,Dir}], + Acc0 = {Db, StartSeq, Args}, + try + {ok, {_, LastSeq, _}} = + couch_db:changes_since(Db, StartSeq, Enum, Opts, Acc0), + rexi:reply({complete, LastSeq}) + after + couch_db:close(Db) + end; + Error -> + rexi:reply(Error) + end. + +map_view(DbName, DDoc, ViewName, QueryArgs) -> + {ok, Db} = get_or_create_db(DbName, []), + #view_query_args{ + limit = Limit, + skip = Skip, + keys = Keys, + include_docs = IncludeDocs, + conflicts = Conflicts, + stale = Stale, + view_type = ViewType, + extra = Extra + } = QueryArgs, + set_io_priority(DbName, Extra), + {LastSeq, MinSeq} = calculate_seqs(Db, Stale), + Group0 = couch_view_group:design_doc_to_view_group(DDoc), + {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), + {ok, Group} = couch_view_group:request_group(Pid, MinSeq), + maybe_update_view_group(Pid, LastSeq, Stale), + erlang:monitor(process, Group#group.fd), + View = fabric_view:extract_view(Pid, ViewName, Group#group.views, ViewType), + {ok, Total} = couch_view:get_row_count(View), + Acc0 = #view_acc{ + db = Db, + include_docs = IncludeDocs, + conflicts = Conflicts, + limit = Limit+Skip, + total_rows = Total, + reduce_fun = fun couch_view:reduce_to_count/1 + }, + case Keys of + nil -> + Options = couch_httpd_view:make_key_options(QueryArgs), + {ok, _, Acc} = couch_view:fold(View, fun view_fold/3, Acc0, Options); + _ -> + Acc = lists:foldl(fun(Key, AccIn) -> + KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key}, + Options = couch_httpd_view:make_key_options(KeyArgs), + {_Go, _, Out} = couch_view:fold(View, fun view_fold/3, AccIn, + Options), + Out + end, Acc0, Keys) + end, + final_response(Total, Acc#view_acc.offset). + +reduce_view(DbName, Group0, ViewName, QueryArgs) -> + erlang:put(io_priority, {interactive, DbName}), + {ok, Db} = get_or_create_db(DbName, []), + #view_query_args{ + group_level = GroupLevel, + limit = Limit, + skip = Skip, + keys = Keys, + stale = Stale, + extra = Extra + } = QueryArgs, + set_io_priority(DbName, Extra), + GroupFun = group_rows_fun(GroupLevel), + {LastSeq, MinSeq} = calculate_seqs(Db, Stale), + {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), + {ok, Group} = couch_view_group:request_group(Pid, MinSeq), + maybe_update_view_group(Pid, LastSeq, Stale), + #group{views=Views, def_lang=Lang, fd=Fd} = Group, + erlang:monitor(process, Fd), + {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce), + ReduceView = {reduce, NthRed, Lang, View}, + Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip}, + case Keys of + nil -> + Options0 = couch_httpd_view:make_key_options(QueryArgs), + Options = [{key_group_fun, GroupFun} | Options0], + couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options); + _ -> + lists:map(fun(Key) -> + KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key}, + Options0 = couch_httpd_view:make_key_options(KeyArgs), + Options = [{key_group_fun, GroupFun} | Options0], + couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options) + end, Keys) + end, + rexi:reply(complete). + +calculate_seqs(Db, Stale) -> + LastSeq = couch_db:get_update_seq(Db), + if + Stale == ok orelse Stale == update_after -> + {LastSeq, 0}; + true -> + {LastSeq, LastSeq} + end. + +maybe_update_view_group(GroupPid, LastSeq, update_after) -> + couch_view_group:trigger_group_update(GroupPid, LastSeq); +maybe_update_view_group(_, _, _) -> + ok. + +create_db(DbName) -> + rexi:reply(case couch_server:create(DbName, []) of + {ok, _} -> + ok; + Error -> + Error + end). + +create_shard_db_doc(_, Doc) -> + rexi:reply(mem3_util:write_db_doc(Doc)). + +delete_db(DbName) -> + couch_server:delete(DbName, []). + +delete_shard_db_doc(_, DocId) -> + rexi:reply(mem3_util:delete_db_doc(DocId)). + +get_db_info(DbName) -> + with_db(DbName, [], {couch_db, get_db_info, []}). + +get_doc_count(DbName) -> + with_db(DbName, [], {couch_db, get_doc_count, []}). + +get_update_seq(DbName) -> + with_db(DbName, [], {couch_db, get_update_seq, []}). + +set_security(DbName, SecObj, Options) -> + with_db(DbName, Options, {couch_db, set_security, [SecObj]}). + +set_revs_limit(DbName, Limit, Options) -> + with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}). + +open_doc(DbName, DocId, Options) -> + with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}). + +open_revs(DbName, Id, Revs, Options) -> + with_db(DbName, Options, {couch_db, open_doc_revs, [Id, Revs, Options]}). + +get_missing_revs(DbName, IdRevsList) -> + get_missing_revs(DbName, IdRevsList, []). + +get_missing_revs(DbName, IdRevsList, Options) -> + % reimplement here so we get [] for Ids with no missing revs in response + set_io_priority(DbName, Options), + rexi:reply(case get_or_create_db(DbName, Options) of + {ok, Db} -> + Ids = [Id1 || {Id1, _Revs} <- IdRevsList], + {ok, lists:zipwith(fun({Id, Revs}, FullDocInfoResult) -> + case FullDocInfoResult of + {ok, #full_doc_info{rev_tree=RevisionTree} = FullInfo} -> + MissingRevs = couch_key_tree:find_missing(RevisionTree, Revs), + {Id, MissingRevs, possible_ancestors(FullInfo, MissingRevs)}; + not_found -> + {Id, Revs, []} + end + end, IdRevsList, couch_btree:lookup(Db#db.id_tree, Ids))}; + Error -> + Error + end). + +update_docs(DbName, Docs0, Options) -> + case proplists:get_value(replicated_changes, Options) of + true -> + X = replicated_changes; + _ -> + X = interactive_edit + end, + Docs = make_att_readers(Docs0), + with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}). + +group_info(DbName, Group0) -> + {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), + rexi:reply(couch_view_group:request_group_info(Pid)). + +reset_validation_funs(DbName) -> + case get_or_create_db(DbName, []) of + {ok, #db{main_pid = Pid}} -> + gen_server:cast(Pid, {load_validation_funs, undefined}); + _ -> + ok + end. + +%% +%% internal +%% + +with_db(DbName, Options, {M,F,A}) -> + set_io_priority(DbName, Options), + case get_or_create_db(DbName, Options) of + {ok, Db} -> + rexi:reply(try + apply(M, F, [Db | A]) + catch Exception -> + Exception; + error:Reason -> + twig:log(error, "rpc ~p:~p/~p ~p ~p", [M, F, length(A)+1, Reason, + clean_stack()]), + {error, Reason} + end); + Error -> + rexi:reply(Error) + end. + +get_or_create_db(DbName, Options) -> + case couch_db:open_int(DbName, Options) of + {not_found, no_db_file} -> + twig:log(warn, "~p creating ~s", [?MODULE, DbName]), + couch_server:create(DbName, Options); + Else -> + Else + end. + +view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) -> + % matches for _all_docs and translates #full_doc_info{} -> KV pair + case couch_doc:to_doc_info(FullDocInfo) of + #doc_info{id=Id, revs=[#rev_info{deleted=false, rev=Rev}|_]} = DI -> + Value = {[{rev,couch_doc:rev_to_str(Rev)}]}, + view_fold({{Id,Id}, Value}, OffsetReds, Acc#view_acc{doc_info=DI}); + #doc_info{revs=[#rev_info{deleted=true}|_]} -> + {ok, Acc} + end; +view_fold(KV, OffsetReds, #view_acc{offset=nil, total_rows=Total} = Acc) -> + % calculates the offset for this shard + #view_acc{reduce_fun=Reduce} = Acc, + Offset = Reduce(OffsetReds), + case rexi:sync_reply({total_and_offset, Total, Offset}) of + ok -> + view_fold(KV, OffsetReds, Acc#view_acc{offset=Offset}); + stop -> + exit(normal); + timeout -> + exit(timeout) + end; +view_fold(_KV, _Offset, #view_acc{limit=0} = Acc) -> + % we scanned through limit+skip local rows + {stop, Acc}; +view_fold({{Key,Id}, Value}, _Offset, Acc) -> + % the normal case + #view_acc{ + db = Db, + doc_info = DocInfo, + limit = Limit, + conflicts = Conflicts, + include_docs = IncludeDocs + } = Acc, + case Value of {Props} -> + LinkedDocs = (couch_util:get_value(<<"_id">>, Props) =/= undefined); + _ -> + LinkedDocs = false + end, + if LinkedDocs -> + % we'll embed this at a higher level b/c the doc may be non-local + Doc = undefined; + IncludeDocs -> + IdOrInfo = if DocInfo =/= nil -> DocInfo; true -> Id end, + Options = if Conflicts -> [conflicts]; true -> [] end, + case couch_db:open_doc(Db, IdOrInfo, Options) of + {not_found, deleted} -> + Doc = null; + {not_found, missing} -> + Doc = undefined; + {ok, Doc0} -> + Doc = couch_doc:to_json_obj(Doc0, []) + end; + true -> + Doc = undefined + end, + case rexi:sync_reply(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of + ok -> + {ok, Acc#view_acc{limit=Limit-1}}; + timeout -> + exit(timeout) + end. + +final_response(Total, nil) -> + case rexi:sync_reply({total_and_offset, Total, Total}) of ok -> + rexi:reply(complete); + stop -> + ok; + timeout -> + exit(timeout) + end; +final_response(_Total, _Offset) -> + rexi:reply(complete). + +%% TODO: handle case of bogus group level +group_rows_fun(exact) -> + fun({Key1,_}, {Key2,_}) -> Key1 == Key2 end; +group_rows_fun(0) -> + fun(_A, _B) -> true end; +group_rows_fun(GroupLevel) when is_integer(GroupLevel) -> + fun({[_|_] = Key1,_}, {[_|_] = Key2,_}) -> + lists:sublist(Key1, GroupLevel) == lists:sublist(Key2, GroupLevel); + ({Key1,_}, {Key2,_}) -> + Key1 == Key2 + end. + +reduce_fold(_Key, _Red, #view_acc{limit=0} = Acc) -> + {stop, Acc}; +reduce_fold(_Key, Red, #view_acc{group_level=0} = Acc) -> + send(null, Red, Acc); +reduce_fold(Key, Red, #view_acc{group_level=exact} = Acc) -> + send(Key, Red, Acc); +reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) -> + send(lists:sublist(K, I), Red, Acc); +reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0 -> + send(K, Red, Acc). + + +send(Key, Value, #view_acc{limit=Limit} = Acc) -> + case rexi:sync_reply(#view_row{key=Key, value=Value}) of + ok -> + {ok, Acc#view_acc{limit=Limit-1}}; + stop -> + exit(normal); + timeout -> + exit(timeout) + end. + +changes_enumerator(DocInfo, {Db, _Seq, Args}) -> + #changes_args{ + include_docs = IncludeDocs, + filter = Acc, + conflicts = Conflicts + } = Args, + #doc_info{high_seq=Seq, revs=[#rev_info{deleted=Del}|_]} = DocInfo, + case [X || X <- couch_changes:filter(DocInfo, Acc), X /= null] of + [] -> + {ok, {Db, Seq, Args}}; + Results -> + Opts = if Conflicts -> [conflicts]; true -> [] end, + ChangesRow = changes_row(Db, DocInfo, Results, Del, IncludeDocs, Opts), + Go = rexi:sync_reply(ChangesRow), + {Go, {Db, Seq, Args}} + end. + +changes_row(Db, #doc_info{id=Id, high_seq=Seq}=DI, Results, Del, true, Opts) -> + Doc = doc_member(Db, DI, Opts), + #change{key=Seq, id=Id, value=Results, doc=Doc, deleted=Del}; +changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, true, _, _) -> + #change{key=Seq, id=Id, value=Results, deleted=true}; +changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, _, _, _) -> + #change{key=Seq, id=Id, value=Results}. + +doc_member(Shard, DocInfo, Opts) -> + case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of + {ok, Doc} -> + couch_doc:to_json_obj(Doc, []); + Error -> + Error + end. + +possible_ancestors(_FullInfo, []) -> + []; +possible_ancestors(FullInfo, MissingRevs) -> + #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo), + LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo], + % Find the revs that are possible parents of this rev + lists:foldl(fun({LeafPos, LeafRevId}, Acc) -> + % this leaf is a "possible ancenstor" of the missing + % revs if this LeafPos lessthan any of the missing revs + case lists:any(fun({MissingPos, _}) -> + LeafPos < MissingPos end, MissingRevs) of + true -> + [{LeafPos, LeafRevId} | Acc]; + false -> + Acc + end + end, [], LeafRevs). + +make_att_readers([]) -> + []; +make_att_readers([#doc{atts=Atts0} = Doc | Rest]) -> + % % go through the attachments looking for 'follows' in the data, + % % replace with function that reads the data from MIME stream. + Atts = [Att#att{data=make_att_reader(D)} || #att{data=D} = Att <- Atts0], + [Doc#doc{atts = Atts} | make_att_readers(Rest)]. + +make_att_reader({follows, Parser}) -> + fun() -> + Parser ! {get_bytes, self()}, + receive {bytes, Bytes} -> Bytes end + end; +make_att_reader(Else) -> + Else. + +clean_stack() -> + lists:map(fun({M,F,A}) when is_list(A) -> {M,F,length(A)}; (X) -> X end, + erlang:get_stacktrace()). + +set_io_priority(DbName, Options) -> + case lists:keyfind(io_priority, 1, Options) of + {io_priority, Pri} -> + erlang:put(io_priority, Pri); + false -> + erlang:put(io_priority, {interactive, DbName}) + end. |