diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-05-28 13:15:38 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-05-28 13:18:57 -0400 |
commit | 450bc69ea891d9ad03c26c31c181c28ee5cbe8f9 (patch) | |
tree | c0a1b4db1fbd6354ef5c22eb832aad3f623d6962 | |
parent | 8dc112522f8baef4454f3c0ca74fbbd920fc451d (diff) |
implement fabric missing_revs, BugzID 10217
-rw-r--r-- | src/fabric.erl | 15 | ||||
-rw-r--r-- | src/fabric_doc.erl | 56 | ||||
-rw-r--r-- | src/fabric_rpc.erl | 23 |
3 files changed, 88 insertions, 6 deletions
diff --git a/src/fabric.erl b/src/fabric.erl index 3a4575bd..26b379af 100644 --- a/src/fabric.erl +++ b/src/fabric.erl @@ -2,7 +2,8 @@ -export([all_databases/1, create_db/2, delete_db/2, get_db_info/2, db_path/2]). --export([open_doc/3, open_revs/4, update_doc/3, update_docs/3]). +-export([open_doc/3, open_revs/4, get_missing_revs/2]). +-export([update_doc/3, update_docs/3]). -include("../../couch/src/couch_db.hrl"). @@ -34,6 +35,10 @@ open_doc(DbName, Id, Options) -> open_revs(DbName, Id, Revs, Options) -> fabric_doc:open_revs(dbname(DbName), docid(Id), Revs, Options). +get_missing_revs(DbName, IdsRevs) when is_list(IdsRevs) -> + Sanitized = [idrevs(IdR) || IdR <- IdsRevs], + fabric_doc:get_missing_revs(dbname(DbName), Sanitized). + update_doc(DbName, Doc, Options) -> {ok, [Result]} = update_docs(DbName, [Doc], Options), Result. @@ -70,6 +75,14 @@ doc({_} = Doc) -> doc(Doc) -> erlang:error({illegal_doc_format, Doc}). +idrevs({Id, Revs}) when is_list(Revs) -> + {docid(Id), [rev(R) || R <- Revs]}. + +rev(Rev) when is_list(Rev); is_binary(Rev) -> + couch_doc:parse_rev(Rev); +rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) -> + Rev. + generate_customer_path("/", _Customer) -> ""; generate_customer_path("/favicon.ico", _Customer) -> diff --git a/src/fabric_doc.erl b/src/fabric_doc.erl index 039b6aab..97e7cfb6 100644 --- a/src/fabric_doc.erl +++ b/src/fabric_doc.erl @@ -24,6 +24,15 @@ open_revs(DbName, Id, Revs, Options) -> Acc0 = {length(Workers), couch_util:get_value(r, Options, 1), []}, fabric_util:recv(Workers, #shard.ref, fun handle_open_revs/3, Acc0). +get_missing_revs(DbName, AllIdsRevs) -> + Workers = lists:map(fun({#shard{name=Name, node=Node} = Shard, IdsRevs}) -> + Ref = rexi:cast(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs]}), + Shard#shard{ref=Ref} + end, group_idrevs_by_shard(DbName, AllIdsRevs)), + ResultDict = dict:from_list([{Id, {nil,Revs}} || {Id, Revs} <- AllIdsRevs]), + Acc0 = {length(Workers), ResultDict}, + fabric_util:recv(Workers, #shard.ref, fun handle_missing_revs/3, Acc0). + update_docs(DbName, AllDocs, Options) -> GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) -> Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}), @@ -39,9 +48,6 @@ update_docs(DbName, AllDocs, Options) -> Else end. -get_missing_revs(_DbName, _IdsRevs) -> - ok. - handle_open_doc(_Worker, {rexi_DOWN, _, _, _}, Acc0) -> skip_message(Acc0); handle_open_doc(_Worker, {rexi_EXIT, _Reason}, Acc0) -> @@ -66,6 +72,23 @@ handle_open_revs(_Worker, {rexi_EXIT, _}, Acc0) -> handle_open_revs(_Worker, _Reply, {_WaitingCount, _R, _Replies}) -> {stop, not_implemented}. +handle_missing_revs(_Worker, {rexi_DOWN, _, _, _}, Acc0) -> + skip_message(Acc0); +handle_missing_revs(_Worker, {rexi_EXIT, _, _, _}, Acc0) -> + skip_message(Acc0); +handle_missing_revs(_Worker, {ok, Results}, {1, D0}) -> + D = update_dict(D0, Results), + {stop, dict:fold(fun force_missing_revs_reply/3, [], D)}; +handle_missing_revs(_Worker, {ok, Results}, {WaitingCount, D0}) -> + D = update_dict(D0, Results), + case dict:fold(fun maybe_missing_revs_reply/3, {stop, []}, D) of + continue -> + % still haven't heard about some Ids + {ok, {WaitingCount - 1, D}}; + {stop, FinalReply} -> + {stop, FinalReply} + end. + handle_update_docs(_Worker, {rexi_DOWN, _, _, _}, Acc0) -> skip_message(Acc0); handle_update_docs(_Worker, {rexi_EXIT, _}, Acc0) -> @@ -92,6 +115,23 @@ handle_update_docs(Worker, {ok, Replies}, Acc0) -> {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}} end. +force_missing_revs_reply(Id, {nil,Revs}, Acc) -> + % never heard about this ID, assume it's missing + [{Id, Revs} | Acc]; +force_missing_revs_reply(_, [], Acc) -> + Acc; +force_missing_revs_reply(Id, Revs, Acc) -> + [{Id, Revs} | Acc]. + +maybe_missing_revs_reply(_, _, continue) -> + continue; +maybe_missing_revs_reply(_, {nil, _}, _) -> + continue; +maybe_missing_revs_reply(_, [], {stop, Acc}) -> + {stop, Acc}; +maybe_missing_revs_reply(Id, Revs, {stop, Acc}) -> + {stop, [{Id, Revs} | Acc]}. + force_update_reply(Doc, Replies, {W, Acc}) -> % TODO make a real decision here case Replies of @@ -129,6 +169,13 @@ group_docs_by_shard(DbName, Docs) -> end, D0, partitions:for_key(DbName,Id)) end, dict:new(), Docs)). +group_idrevs_by_shard(DbName, IdsRevs) -> + dict:to_list(lists:foldl(fun({Id, Revs}, D0) -> + lists:foldl(fun(Shard, D1) -> + dict:append(Shard, {Id, Revs}, D1) + end, D0, partitions:for_key(DbName,Id)) + end, dict:new(), IdsRevs)). + skip_message({1, _R, Replies}) -> repair_read_quorum_failure(Replies); skip_message({WaitingCount, R, Replies}) -> @@ -142,6 +189,9 @@ merge_read_reply(Key, Reply, Replies) -> {lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1} end. +update_dict(D0, KVs) -> + lists:foldl(fun({K,V}, D1) -> dict:store(K, V, D1) end, D0, KVs). + append_update_replies([], [], DocReplyDict) -> DocReplyDict; append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) -> diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl index c54ec247..75ce5e90 100644 --- a/src/fabric_rpc.erl +++ b/src/fabric_rpc.erl @@ -1,12 +1,31 @@ -module(fabric_rpc). --export([open_doc/3, open_doc/4, get_db_info/1, update_docs/3]). +-export([open_doc/3, open_doc/4, get_db_info/1, update_docs/3, + get_missing_revs/2]). +-include("../../couch/src/couch_db.hrl"). -include_lib("eunit/include/eunit.hrl"). + open_doc(DbName, DocId, Options) -> - io:format("~p ~p ~p ~p~n", [?MODULE, DbName, DocId, Options]), with_db(DbName, {couch_db, open_doc_int, [DocId, Options]}). +get_missing_revs(DbName, IdRevsList) -> + % reimplement here so we get [] for Ids with no missing revs in response + rexi:reply(case couch_db:open(DbName, []) of + {ok, Db} -> + Ids = [Id1 || {Id1, _Revs} <- IdRevsList], + {ok, lists:zipwith(fun({Id, Revs}, FullDocInfoResult) -> + case FullDocInfoResult of + {ok, #full_doc_info{rev_tree=RevisionTree}} -> + {Id, couch_key_tree:find_missing(RevisionTree, Revs)}; + not_found -> + {Id, Revs} + end + end, IdRevsList, couch_btree:lookup(Db#db.id_tree, Ids))}; + Error -> + Error + end). + %% rpc endpoints %% call to with_db will supply your M:F with a #db{} and then remaining args |