diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-10-25 15:46:05 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-10-25 21:45:32 -0400 |
commit | ebac05f686b56791511cb9b599dfb5a742dcfc96 (patch) | |
tree | 00a789cd058f98fa014d927f094f9e6e9f91f6f2 /apps/fabric/src/fabric_rpc.erl | |
parent | 952a85381ff4b5b34426000b1dee73c9e74becdd (diff) |
use get-deps to pull down individual cloudant projects
Diffstat (limited to 'apps/fabric/src/fabric_rpc.erl')
-rw-r--r-- | apps/fabric/src/fabric_rpc.erl | 402 |
1 files changed, 0 insertions, 402 deletions
diff --git a/apps/fabric/src/fabric_rpc.erl b/apps/fabric/src/fabric_rpc.erl deleted file mode 100644 index a7d555e0..00000000 --- a/apps/fabric/src/fabric_rpc.erl +++ /dev/null @@ -1,402 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_rpc). - --export([get_db_info/1, get_doc_count/1, get_update_seq/1]). --export([open_doc/3, open_revs/4, get_missing_revs/2, update_docs/3]). --export([all_docs/2, changes/3, map_view/4, reduce_view/4, group_info/2]). --export([create_db/3, delete_db/3, reset_validation_funs/1, set_security/3, - set_revs_limit/3]). - --include("fabric.hrl"). --include_lib("couch/include/couch_db.hrl"). - --record (view_acc, { - db, - limit, - include_docs, - offset = nil, - total_rows, - reduce_fun = fun couch_db:enum_docs_reduce_to_count/1, - group_level = 0 -}). - -%% rpc endpoints -%% call to with_db will supply your M:F with a #db{} and then remaining args - -all_docs(DbName, #view_query_args{keys=nil} = QueryArgs) -> - {ok, Db} = couch_db:open(DbName, []), - #view_query_args{ - start_key = StartKey, - start_docid = StartDocId, - end_key = EndKey, - end_docid = EndDocId, - limit = Limit, - skip = Skip, - include_docs = IncludeDocs, - direction = Dir, - inclusive_end = Inclusive - } = QueryArgs, - {ok, Total} = couch_db:get_doc_count(Db), - Acc0 = #view_acc{ - db = Db, - include_docs = IncludeDocs, - limit = Limit+Skip, - total_rows = Total - }, - EndKeyType = if Inclusive -> end_key; true -> end_key_gt end, - Options = [ - {dir, Dir}, - {start_key, if is_binary(StartKey) -> StartKey; true -> StartDocId end}, - {EndKeyType, if is_binary(EndKey) -> EndKey; true -> EndDocId end} - ], - {ok, _, Acc} = couch_db:enum_docs(Db, fun view_fold/3, Acc0, Options), - final_response(Total, Acc#view_acc.offset). - -changes(DbName, Args, StartSeq) -> - #changes_args{style=Style, dir=Dir} = Args, - case couch_db:open(DbName, []) of - {ok, Db} -> - Enum = fun changes_enumerator/2, - Opts = [{dir,Dir}], - Acc0 = {Db, StartSeq, Args}, - try - {ok, {_, LastSeq, _}} = - couch_db:changes_since(Db, Style, StartSeq, Enum, Opts, Acc0), - rexi:reply({complete, LastSeq}) - after - couch_db:close(Db) - end; - Error -> - rexi:reply(Error) - end. - -map_view(DbName, DDoc, ViewName, QueryArgs) -> - {ok, Db} = couch_db:open(DbName, []), - #view_query_args{ - limit = Limit, - skip = Skip, - keys = Keys, - include_docs = IncludeDocs, - stale = Stale, - view_type = ViewType - } = QueryArgs, - MinSeq = if Stale == ok -> 0; true -> couch_db:get_update_seq(Db) end, - Group0 = couch_view_group:design_doc_to_view_group(DDoc), - {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), - {ok, Group} = couch_view_group:request_group(Pid, MinSeq), - View = fabric_view:extract_view(Pid, ViewName, Group#group.views, ViewType), - {ok, Total} = couch_view:get_row_count(View), - Acc0 = #view_acc{ - db = Db, - include_docs = IncludeDocs, - limit = Limit+Skip, - total_rows = Total, - reduce_fun = fun couch_view:reduce_to_count/1 - }, - case Keys of - nil -> - Options = couch_httpd_view:make_key_options(QueryArgs), - {ok, _, Acc} = couch_view:fold(View, fun view_fold/3, Acc0, Options); - _ -> - Acc = lists:foldl(fun(Key, AccIn) -> - KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key}, - Options = couch_httpd_view:make_key_options(KeyArgs), - {_Go, _, Out} = couch_view:fold(View, fun view_fold/3, AccIn, - Options), - Out - end, Acc0, Keys) - end, - final_response(Total, Acc#view_acc.offset). - -reduce_view(DbName, Group0, ViewName, QueryArgs) -> - {ok, Db} = couch_db:open(DbName, []), - #view_query_args{ - group_level = GroupLevel, - limit = Limit, - skip = Skip, - keys = Keys, - stale = Stale - } = QueryArgs, - GroupFun = group_rows_fun(GroupLevel), - MinSeq = if Stale == ok -> 0; true -> couch_db:get_update_seq(Db) end, - {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), - {ok, #group{views=Views, def_lang=Lang}} = couch_view_group:request_group( - Pid, MinSeq), - {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce), - ReduceView = {reduce, NthRed, Lang, View}, - Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip}, - case Keys of - nil -> - Options0 = couch_httpd_view:make_key_options(QueryArgs), - Options = [{key_group_fun, GroupFun} | Options0], - couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options); - _ -> - lists:map(fun(Key) -> - KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key}, - Options0 = couch_httpd_view:make_key_options(KeyArgs), - Options = [{key_group_fun, GroupFun} | Options0], - couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options) - end, Keys) - end, - rexi:reply(complete). - -create_db(DbName, Options, Doc) -> - mem3_util:write_db_doc(Doc), - rexi:reply(case couch_server:create(DbName, Options) of - {ok, _} -> - ok; - Error -> - Error - end). - -delete_db(DbName, Options, DocId) -> - mem3_util:delete_db_doc(DocId), - rexi:reply(couch_server:delete(DbName, Options)). - -get_db_info(DbName) -> - with_db(DbName, [], {couch_db, get_db_info, []}). - -get_doc_count(DbName) -> - with_db(DbName, [], {couch_db, get_doc_count, []}). - -get_update_seq(DbName) -> - with_db(DbName, [], {couch_db, get_update_seq, []}). - -set_security(DbName, SecObj, Options) -> - with_db(DbName, Options, {couch_db, set_security, [SecObj]}). - -set_revs_limit(DbName, Limit, Options) -> - with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}). - -open_doc(DbName, DocId, Options) -> - with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}). - -open_revs(DbName, Id, Revs, Options) -> - with_db(DbName, Options, {couch_db, open_doc_revs, [Id, Revs, Options]}). - -get_missing_revs(DbName, IdRevsList) -> - % reimplement here so we get [] for Ids with no missing revs in response - rexi:reply(case couch_db:open(DbName, []) of - {ok, Db} -> - Ids = [Id1 || {Id1, _Revs} <- IdRevsList], - {ok, lists:zipwith(fun({Id, Revs}, FullDocInfoResult) -> - case FullDocInfoResult of - {ok, #full_doc_info{rev_tree=RevisionTree} = FullInfo} -> - MissingRevs = couch_key_tree:find_missing(RevisionTree, Revs), - {Id, MissingRevs, possible_ancestors(FullInfo, MissingRevs)}; - not_found -> - {Id, Revs, []} - end - end, IdRevsList, couch_btree:lookup(Db#db.id_tree, Ids))}; - Error -> - Error - end). - -update_docs(DbName, Docs0, Options) -> - case proplists:get_value(replicated_changes, Options) of - true -> - X = replicated_changes; - _ -> - X = interactive_edit - end, - Docs = make_att_readers(Docs0), - with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}). - -group_info(DbName, Group0) -> - {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), - rexi:reply(couch_view_group:request_group_info(Pid)). - -reset_validation_funs(DbName) -> - case couch_db:open(DbName, []) of - {ok, #db{main_pid = Pid}} -> - gen_server:cast(Pid, {load_validation_funs, undefined}); - _ -> - ok - end. - -%% -%% internal -%% - -with_db(DbName, Options, {M,F,A}) -> - case couch_db:open(DbName, Options) of - {ok, Db} -> - rexi:reply(try - apply(M, F, [Db | A]) - catch Exception -> - Exception; - error:Reason -> - ?LOG_ERROR("~p ~p ~p~n~p", [?MODULE, {M,F}, Reason, - erlang:get_stacktrace()]), - {error, Reason} - end); - Error -> - rexi:reply(Error) - end. - -view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) -> - % matches for _all_docs and translates #full_doc_info{} -> KV pair - case couch_doc:to_doc_info(FullDocInfo) of - #doc_info{revs=[#rev_info{deleted=false, rev=Rev}|_]} -> - Id = FullDocInfo#full_doc_info.id, - Value = {[{rev,couch_doc:rev_to_str(Rev)}]}, - view_fold({{Id,Id}, Value}, OffsetReds, Acc); - #doc_info{revs=[#rev_info{deleted=true}|_]} -> - {ok, Acc} - end; -view_fold(KV, OffsetReds, #view_acc{offset=nil, total_rows=Total} = Acc) -> - % calculates the offset for this shard - #view_acc{reduce_fun=Reduce} = Acc, - Offset = Reduce(OffsetReds), - case rexi:sync_reply({total_and_offset, Total, Offset}) of - ok -> - view_fold(KV, OffsetReds, Acc#view_acc{offset=Offset}); - stop -> - exit(normal); - timeout -> - exit(timeout) - end; -view_fold(_KV, _Offset, #view_acc{limit=0} = Acc) -> - % we scanned through limit+skip local rows - {stop, Acc}; -view_fold({{Key,Id}, Value}, _Offset, Acc) -> - % the normal case - #view_acc{ - db = Db, - limit = Limit, - include_docs = IncludeDocs - } = Acc, - Doc = if not IncludeDocs -> undefined; true -> - case couch_db:open_doc(Db, Id, []) of - {not_found, deleted} -> - null; - {not_found, missing} -> - undefined; - {ok, Doc0} -> - couch_doc:to_json_obj(Doc0, []) - end - end, - case rexi:sync_reply(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of - ok -> - {ok, Acc#view_acc{limit=Limit-1}}; - timeout -> - exit(timeout) - end. - -final_response(Total, nil) -> - case rexi:sync_reply({total_and_offset, Total, Total}) of ok -> - rexi:reply(complete); - stop -> - ok; - timeout -> - exit(timeout) - end; -final_response(_Total, _Offset) -> - rexi:reply(complete). - -group_rows_fun(exact) -> - fun({Key1,_}, {Key2,_}) -> Key1 == Key2 end; -group_rows_fun(0) -> - fun(_A, _B) -> true end; -group_rows_fun(GroupLevel) when is_integer(GroupLevel) -> - fun({[_|_] = Key1,_}, {[_|_] = Key2,_}) -> - lists:sublist(Key1, GroupLevel) == lists:sublist(Key2, GroupLevel); - ({Key1,_}, {Key2,_}) -> - Key1 == Key2 - end. - -reduce_fold(_Key, _Red, #view_acc{limit=0} = Acc) -> - {stop, Acc}; -reduce_fold(_Key, Red, #view_acc{group_level=0} = Acc) -> - send(null, Red, Acc); -reduce_fold(Key, Red, #view_acc{group_level=exact} = Acc) -> - send(Key, Red, Acc); -reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) -> - send(lists:sublist(K, I), Red, Acc). - -send(Key, Value, #view_acc{limit=Limit} = Acc) -> - case rexi:sync_reply(#view_row{key=Key, value=Value}) of - ok -> - {ok, Acc#view_acc{limit=Limit-1}}; - stop -> - exit(normal); - timeout -> - exit(timeout) - end. - -changes_enumerator(DocInfo, {Db, _Seq, Args}) -> - #changes_args{include_docs=IncludeDocs, filter=Acc} = Args, - #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]} - = DocInfo, - case [X || X <- couch_changes:filter(DocInfo, Acc), X /= null] of - [] -> - {ok, {Db, Seq, Args}}; - Results -> - ChangesRow = changes_row(Db, Seq, Id, Results, Rev, Del, IncludeDocs), - Go = rexi:sync_reply(ChangesRow), - {Go, {Db, Seq, Args}} - end. - -changes_row(_, Seq, Id, Results, _, true, true) -> - #view_row{key=Seq, id=Id, value=Results, doc=deleted}; -changes_row(_, Seq, Id, Results, _, true, false) -> - #view_row{key=Seq, id=Id, value=Results, doc=deleted}; -changes_row(Db, Seq, Id, Results, Rev, false, true) -> - #view_row{key=Seq, id=Id, value=Results, doc=doc_member(Db, Id, Rev)}; -changes_row(_, Seq, Id, Results, _, false, false) -> - #view_row{key=Seq, id=Id, value=Results}. - -doc_member(Shard, Id, Rev) -> - case couch_db:open_doc_revs(Shard, Id, [Rev], []) of - {ok, [{ok,Doc}]} -> - couch_doc:to_json_obj(Doc, []); - Error -> - Error - end. - -possible_ancestors(_FullInfo, []) -> - []; -possible_ancestors(FullInfo, MissingRevs) -> - #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo), - LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo], - % Find the revs that are possible parents of this rev - lists:foldl(fun({LeafPos, LeafRevId}, Acc) -> - % this leaf is a "possible ancenstor" of the missing - % revs if this LeafPos lessthan any of the missing revs - case lists:any(fun({MissingPos, _}) -> - LeafPos < MissingPos end, MissingRevs) of - true -> - [{LeafPos, LeafRevId} | Acc]; - false -> - Acc - end - end, [], LeafRevs). - -make_att_readers([]) -> - []; -make_att_readers([#doc{atts=Atts0} = Doc | Rest]) -> - % % go through the attachments looking for 'follows' in the data, - % % replace with function that reads the data from MIME stream. - Atts = [Att#att{data=make_att_reader(D)} || #att{data=D} = Att <- Atts0], - [Doc#doc{atts = Atts} | make_att_readers(Rest)]. - -make_att_reader({follows, Parser}) -> - fun() -> - Parser ! {get_bytes, self()}, - receive {bytes, Bytes} -> Bytes end - end; -make_att_reader(Else) -> - Else. |