summaryrefslogtreecommitdiff
path: root/deps/fabric/src/fabric_rpc.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/fabric/src/fabric_rpc.erl')
-rw-r--r--deps/fabric/src/fabric_rpc.erl485
1 files changed, 485 insertions, 0 deletions
diff --git a/deps/fabric/src/fabric_rpc.erl b/deps/fabric/src/fabric_rpc.erl
new file mode 100644
index 00000000..3f25dfd7
--- /dev/null
+++ b/deps/fabric/src/fabric_rpc.erl
@@ -0,0 +1,485 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_rpc).
+
+-export([get_db_info/1, get_doc_count/1, get_update_seq/1]).
+-export([open_doc/3, open_revs/4, get_missing_revs/2, get_missing_revs/3,
+ update_docs/3]).
+-export([all_docs/2, changes/3, map_view/4, reduce_view/4, group_info/2]).
+-export([create_db/1, delete_db/1, reset_validation_funs/1, set_security/3,
+ set_revs_limit/3, create_shard_db_doc/2, delete_shard_db_doc/2]).
+
+-include("fabric.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-record (view_acc, {
+ db,
+ limit,
+ include_docs,
+ conflicts,
+ doc_info = nil,
+ offset = nil,
+ total_rows,
+ reduce_fun = fun couch_db:enum_docs_reduce_to_count/1,
+ group_level = 0
+}).
+
+%% rpc endpoints
+%% call to with_db will supply your M:F with a #db{} and then remaining args
+
+all_docs(DbName, #view_query_args{keys=nil} = QueryArgs) ->
+ {ok, Db} = get_or_create_db(DbName, []),
+ #view_query_args{
+ start_key = StartKey,
+ start_docid = StartDocId,
+ end_key = EndKey,
+ end_docid = EndDocId,
+ limit = Limit,
+ skip = Skip,
+ include_docs = IncludeDocs,
+ conflicts = Conflicts,
+ direction = Dir,
+ inclusive_end = Inclusive,
+ extra = Extra
+ } = QueryArgs,
+ set_io_priority(DbName, Extra),
+ {ok, Total} = couch_db:get_doc_count(Db),
+ Acc0 = #view_acc{
+ db = Db,
+ include_docs = IncludeDocs,
+ conflicts = Conflicts,
+ limit = Limit+Skip,
+ total_rows = Total
+ },
+ EndKeyType = if Inclusive -> end_key; true -> end_key_gt end,
+ Options = [
+ {dir, Dir},
+ {start_key, if is_binary(StartKey) -> StartKey; true -> StartDocId end},
+ {EndKeyType, if is_binary(EndKey) -> EndKey; true -> EndDocId end}
+ ],
+ {ok, _, Acc} = couch_db:enum_docs(Db, fun view_fold/3, Acc0, Options),
+ final_response(Total, Acc#view_acc.offset).
+
+changes(DbName, Args, StartSeq) ->
+ erlang:put(io_priority, {interactive, DbName}),
+ #changes_args{dir=Dir} = Args,
+ case get_or_create_db(DbName, []) of
+ {ok, Db} ->
+ Enum = fun changes_enumerator/2,
+ Opts = [{dir,Dir}],
+ Acc0 = {Db, StartSeq, Args},
+ try
+ {ok, {_, LastSeq, _}} =
+ couch_db:changes_since(Db, StartSeq, Enum, Opts, Acc0),
+ rexi:reply({complete, LastSeq})
+ after
+ couch_db:close(Db)
+ end;
+ Error ->
+ rexi:reply(Error)
+ end.
+
+map_view(DbName, DDoc, ViewName, QueryArgs) ->
+ {ok, Db} = get_or_create_db(DbName, []),
+ #view_query_args{
+ limit = Limit,
+ skip = Skip,
+ keys = Keys,
+ include_docs = IncludeDocs,
+ conflicts = Conflicts,
+ stale = Stale,
+ view_type = ViewType,
+ extra = Extra
+ } = QueryArgs,
+ set_io_priority(DbName, Extra),
+ {LastSeq, MinSeq} = calculate_seqs(Db, Stale),
+ Group0 = couch_view_group:design_doc_to_view_group(DDoc),
+ {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
+ {ok, Group} = couch_view_group:request_group(Pid, MinSeq),
+ maybe_update_view_group(Pid, LastSeq, Stale),
+ erlang:monitor(process, Group#group.fd),
+ View = fabric_view:extract_view(Pid, ViewName, Group#group.views, ViewType),
+ {ok, Total} = couch_view:get_row_count(View),
+ Acc0 = #view_acc{
+ db = Db,
+ include_docs = IncludeDocs,
+ conflicts = Conflicts,
+ limit = Limit+Skip,
+ total_rows = Total,
+ reduce_fun = fun couch_view:reduce_to_count/1
+ },
+ case Keys of
+ nil ->
+ Options = couch_httpd_view:make_key_options(QueryArgs),
+ {ok, _, Acc} = couch_view:fold(View, fun view_fold/3, Acc0, Options);
+ _ ->
+ Acc = lists:foldl(fun(Key, AccIn) ->
+ KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key},
+ Options = couch_httpd_view:make_key_options(KeyArgs),
+ {_Go, _, Out} = couch_view:fold(View, fun view_fold/3, AccIn,
+ Options),
+ Out
+ end, Acc0, Keys)
+ end,
+ final_response(Total, Acc#view_acc.offset).
+
+reduce_view(DbName, Group0, ViewName, QueryArgs) ->
+ erlang:put(io_priority, {interactive, DbName}),
+ {ok, Db} = get_or_create_db(DbName, []),
+ #view_query_args{
+ group_level = GroupLevel,
+ limit = Limit,
+ skip = Skip,
+ keys = Keys,
+ stale = Stale,
+ extra = Extra
+ } = QueryArgs,
+ set_io_priority(DbName, Extra),
+ GroupFun = group_rows_fun(GroupLevel),
+ {LastSeq, MinSeq} = calculate_seqs(Db, Stale),
+ {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
+ {ok, Group} = couch_view_group:request_group(Pid, MinSeq),
+ maybe_update_view_group(Pid, LastSeq, Stale),
+ #group{views=Views, def_lang=Lang, fd=Fd} = Group,
+ erlang:monitor(process, Fd),
+ {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce),
+ ReduceView = {reduce, NthRed, Lang, View},
+ Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip},
+ case Keys of
+ nil ->
+ Options0 = couch_httpd_view:make_key_options(QueryArgs),
+ Options = [{key_group_fun, GroupFun} | Options0],
+ couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options);
+ _ ->
+ lists:map(fun(Key) ->
+ KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key},
+ Options0 = couch_httpd_view:make_key_options(KeyArgs),
+ Options = [{key_group_fun, GroupFun} | Options0],
+ couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options)
+ end, Keys)
+ end,
+ rexi:reply(complete).
+
+calculate_seqs(Db, Stale) ->
+ LastSeq = couch_db:get_update_seq(Db),
+ if
+ Stale == ok orelse Stale == update_after ->
+ {LastSeq, 0};
+ true ->
+ {LastSeq, LastSeq}
+ end.
+
+maybe_update_view_group(GroupPid, LastSeq, update_after) ->
+ couch_view_group:trigger_group_update(GroupPid, LastSeq);
+maybe_update_view_group(_, _, _) ->
+ ok.
+
+create_db(DbName) ->
+ rexi:reply(case couch_server:create(DbName, []) of
+ {ok, _} ->
+ ok;
+ Error ->
+ Error
+ end).
+
+create_shard_db_doc(_, Doc) ->
+ rexi:reply(mem3_util:write_db_doc(Doc)).
+
+delete_db(DbName) ->
+ couch_server:delete(DbName, []).
+
+delete_shard_db_doc(_, DocId) ->
+ rexi:reply(mem3_util:delete_db_doc(DocId)).
+
+get_db_info(DbName) ->
+ with_db(DbName, [], {couch_db, get_db_info, []}).
+
+get_doc_count(DbName) ->
+ with_db(DbName, [], {couch_db, get_doc_count, []}).
+
+get_update_seq(DbName) ->
+ with_db(DbName, [], {couch_db, get_update_seq, []}).
+
+set_security(DbName, SecObj, Options) ->
+ with_db(DbName, Options, {couch_db, set_security, [SecObj]}).
+
+set_revs_limit(DbName, Limit, Options) ->
+ with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}).
+
+open_doc(DbName, DocId, Options) ->
+ with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}).
+
+open_revs(DbName, Id, Revs, Options) ->
+ with_db(DbName, Options, {couch_db, open_doc_revs, [Id, Revs, Options]}).
+
+get_missing_revs(DbName, IdRevsList) ->
+ get_missing_revs(DbName, IdRevsList, []).
+
+get_missing_revs(DbName, IdRevsList, Options) ->
+ % reimplement here so we get [] for Ids with no missing revs in response
+ set_io_priority(DbName, Options),
+ rexi:reply(case get_or_create_db(DbName, Options) of
+ {ok, Db} ->
+ Ids = [Id1 || {Id1, _Revs} <- IdRevsList],
+ {ok, lists:zipwith(fun({Id, Revs}, FullDocInfoResult) ->
+ case FullDocInfoResult of
+ {ok, #full_doc_info{rev_tree=RevisionTree} = FullInfo} ->
+ MissingRevs = couch_key_tree:find_missing(RevisionTree, Revs),
+ {Id, MissingRevs, possible_ancestors(FullInfo, MissingRevs)};
+ not_found ->
+ {Id, Revs, []}
+ end
+ end, IdRevsList, couch_btree:lookup(Db#db.id_tree, Ids))};
+ Error ->
+ Error
+ end).
+
+update_docs(DbName, Docs0, Options) ->
+ case proplists:get_value(replicated_changes, Options) of
+ true ->
+ X = replicated_changes;
+ _ ->
+ X = interactive_edit
+ end,
+ Docs = make_att_readers(Docs0),
+ with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}).
+
+group_info(DbName, Group0) ->
+ {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
+ rexi:reply(couch_view_group:request_group_info(Pid)).
+
+reset_validation_funs(DbName) ->
+ case get_or_create_db(DbName, []) of
+ {ok, #db{main_pid = Pid}} ->
+ gen_server:cast(Pid, {load_validation_funs, undefined});
+ _ ->
+ ok
+ end.
+
+%%
+%% internal
+%%
+
+with_db(DbName, Options, {M,F,A}) ->
+ set_io_priority(DbName, Options),
+ case get_or_create_db(DbName, Options) of
+ {ok, Db} ->
+ rexi:reply(try
+ apply(M, F, [Db | A])
+ catch Exception ->
+ Exception;
+ error:Reason ->
+ twig:log(error, "rpc ~p:~p/~p ~p ~p", [M, F, length(A)+1, Reason,
+ clean_stack()]),
+ {error, Reason}
+ end);
+ Error ->
+ rexi:reply(Error)
+ end.
+
+get_or_create_db(DbName, Options) ->
+ case couch_db:open_int(DbName, Options) of
+ {not_found, no_db_file} ->
+ twig:log(warn, "~p creating ~s", [?MODULE, DbName]),
+ couch_server:create(DbName, Options);
+ Else ->
+ Else
+ end.
+
+view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) ->
+ % matches for _all_docs and translates #full_doc_info{} -> KV pair
+ case couch_doc:to_doc_info(FullDocInfo) of
+ #doc_info{id=Id, revs=[#rev_info{deleted=false, rev=Rev}|_]} = DI ->
+ Value = {[{rev,couch_doc:rev_to_str(Rev)}]},
+ view_fold({{Id,Id}, Value}, OffsetReds, Acc#view_acc{doc_info=DI});
+ #doc_info{revs=[#rev_info{deleted=true}|_]} ->
+ {ok, Acc}
+ end;
+view_fold(KV, OffsetReds, #view_acc{offset=nil, total_rows=Total} = Acc) ->
+ % calculates the offset for this shard
+ #view_acc{reduce_fun=Reduce} = Acc,
+ Offset = Reduce(OffsetReds),
+ case rexi:sync_reply({total_and_offset, Total, Offset}) of
+ ok ->
+ view_fold(KV, OffsetReds, Acc#view_acc{offset=Offset});
+ stop ->
+ exit(normal);
+ timeout ->
+ exit(timeout)
+ end;
+view_fold(_KV, _Offset, #view_acc{limit=0} = Acc) ->
+ % we scanned through limit+skip local rows
+ {stop, Acc};
+view_fold({{Key,Id}, Value}, _Offset, Acc) ->
+ % the normal case
+ #view_acc{
+ db = Db,
+ doc_info = DocInfo,
+ limit = Limit,
+ conflicts = Conflicts,
+ include_docs = IncludeDocs
+ } = Acc,
+ case Value of {Props} ->
+ LinkedDocs = (couch_util:get_value(<<"_id">>, Props) =/= undefined);
+ _ ->
+ LinkedDocs = false
+ end,
+ if LinkedDocs ->
+ % we'll embed this at a higher level b/c the doc may be non-local
+ Doc = undefined;
+ IncludeDocs ->
+ IdOrInfo = if DocInfo =/= nil -> DocInfo; true -> Id end,
+ Options = if Conflicts -> [conflicts]; true -> [] end,
+ case couch_db:open_doc(Db, IdOrInfo, Options) of
+ {not_found, deleted} ->
+ Doc = null;
+ {not_found, missing} ->
+ Doc = undefined;
+ {ok, Doc0} ->
+ Doc = couch_doc:to_json_obj(Doc0, [])
+ end;
+ true ->
+ Doc = undefined
+ end,
+ case rexi:sync_reply(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of
+ ok ->
+ {ok, Acc#view_acc{limit=Limit-1}};
+ timeout ->
+ exit(timeout)
+ end.
+
+final_response(Total, nil) ->
+ case rexi:sync_reply({total_and_offset, Total, Total}) of ok ->
+ rexi:reply(complete);
+ stop ->
+ ok;
+ timeout ->
+ exit(timeout)
+ end;
+final_response(_Total, _Offset) ->
+ rexi:reply(complete).
+
+%% TODO: handle case of bogus group level
+group_rows_fun(exact) ->
+ fun({Key1,_}, {Key2,_}) -> Key1 == Key2 end;
+group_rows_fun(0) ->
+ fun(_A, _B) -> true end;
+group_rows_fun(GroupLevel) when is_integer(GroupLevel) ->
+ fun({[_|_] = Key1,_}, {[_|_] = Key2,_}) ->
+ lists:sublist(Key1, GroupLevel) == lists:sublist(Key2, GroupLevel);
+ ({Key1,_}, {Key2,_}) ->
+ Key1 == Key2
+ end.
+
+reduce_fold(_Key, _Red, #view_acc{limit=0} = Acc) ->
+ {stop, Acc};
+reduce_fold(_Key, Red, #view_acc{group_level=0} = Acc) ->
+ send(null, Red, Acc);
+reduce_fold(Key, Red, #view_acc{group_level=exact} = Acc) ->
+ send(Key, Red, Acc);
+reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) ->
+ send(lists:sublist(K, I), Red, Acc);
+reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0 ->
+ send(K, Red, Acc).
+
+
+send(Key, Value, #view_acc{limit=Limit} = Acc) ->
+ case rexi:sync_reply(#view_row{key=Key, value=Value}) of
+ ok ->
+ {ok, Acc#view_acc{limit=Limit-1}};
+ stop ->
+ exit(normal);
+ timeout ->
+ exit(timeout)
+ end.
+
+changes_enumerator(DocInfo, {Db, _Seq, Args}) ->
+ #changes_args{
+ include_docs = IncludeDocs,
+ filter = Acc,
+ conflicts = Conflicts
+ } = Args,
+ #doc_info{high_seq=Seq, revs=[#rev_info{deleted=Del}|_]} = DocInfo,
+ case [X || X <- couch_changes:filter(DocInfo, Acc), X /= null] of
+ [] ->
+ {ok, {Db, Seq, Args}};
+ Results ->
+ Opts = if Conflicts -> [conflicts]; true -> [] end,
+ ChangesRow = changes_row(Db, DocInfo, Results, Del, IncludeDocs, Opts),
+ Go = rexi:sync_reply(ChangesRow),
+ {Go, {Db, Seq, Args}}
+ end.
+
+changes_row(Db, #doc_info{id=Id, high_seq=Seq}=DI, Results, Del, true, Opts) ->
+ Doc = doc_member(Db, DI, Opts),
+ #change{key=Seq, id=Id, value=Results, doc=Doc, deleted=Del};
+changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, true, _, _) ->
+ #change{key=Seq, id=Id, value=Results, deleted=true};
+changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, _, _, _) ->
+ #change{key=Seq, id=Id, value=Results}.
+
+doc_member(Shard, DocInfo, Opts) ->
+ case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of
+ {ok, Doc} ->
+ couch_doc:to_json_obj(Doc, []);
+ Error ->
+ Error
+ end.
+
+possible_ancestors(_FullInfo, []) ->
+ [];
+possible_ancestors(FullInfo, MissingRevs) ->
+ #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo),
+ LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo],
+ % Find the revs that are possible parents of this rev
+ lists:foldl(fun({LeafPos, LeafRevId}, Acc) ->
+ % this leaf is a "possible ancenstor" of the missing
+ % revs if this LeafPos lessthan any of the missing revs
+ case lists:any(fun({MissingPos, _}) ->
+ LeafPos < MissingPos end, MissingRevs) of
+ true ->
+ [{LeafPos, LeafRevId} | Acc];
+ false ->
+ Acc
+ end
+ end, [], LeafRevs).
+
+make_att_readers([]) ->
+ [];
+make_att_readers([#doc{atts=Atts0} = Doc | Rest]) ->
+ % % go through the attachments looking for 'follows' in the data,
+ % % replace with function that reads the data from MIME stream.
+ Atts = [Att#att{data=make_att_reader(D)} || #att{data=D} = Att <- Atts0],
+ [Doc#doc{atts = Atts} | make_att_readers(Rest)].
+
+make_att_reader({follows, Parser}) ->
+ fun() ->
+ Parser ! {get_bytes, self()},
+ receive {bytes, Bytes} -> Bytes end
+ end;
+make_att_reader(Else) ->
+ Else.
+
+clean_stack() ->
+ lists:map(fun({M,F,A}) when is_list(A) -> {M,F,length(A)}; (X) -> X end,
+ erlang:get_stacktrace()).
+
+set_io_priority(DbName, Options) ->
+ case lists:keyfind(io_priority, 1, Options) of
+ {io_priority, Pri} ->
+ erlang:put(io_priority, Pri);
+ false ->
+ erlang:put(io_priority, {interactive, DbName})
+ end.