diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-06-01 10:58:17 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-06-01 10:58:17 -0400 |
commit | bd976a5a78ba88a25996fc7e94c22e8a8925ccec (patch) | |
tree | 4a3ee571dd3a937b166ee246473bce2b71b55885 | |
parent | 9b7831dbbddc97b4134be2fa3b3ec2d2ebc9462b (diff) |
add sync_reply, change msg format to be more like gen_server
-rw-r--r-- | src/fabric_create_db.erl | 12 | ||||
-rw-r--r-- | src/fabric_delete_db.erl | 8 | ||||
-rw-r--r-- | src/fabric_get_db_info.erl | 4 | ||||
-rw-r--r-- | src/fabric_missing_revs.erl | 8 | ||||
-rw-r--r-- | src/fabric_open_doc.erl | 6 | ||||
-rw-r--r-- | src/fabric_open_revs.erl | 6 | ||||
-rw-r--r-- | src/fabric_update_docs.erl | 6 | ||||
-rw-r--r-- | src/fabric_util.erl | 14 |
8 files changed, 35 insertions, 29 deletions
diff --git a/src/fabric_create_db.erl b/src/fabric_create_db.erl index 1e9bf256..21b093bf 100644 --- a/src/fabric_create_db.erl +++ b/src/fabric_create_db.erl @@ -37,26 +37,26 @@ send_create_calls(Fullmap, Options) -> end, Fullmap). %% @doc handle create messages from shards -handle_create_msg(_, file_exists, _) -> +handle_create_msg(file_exists, _, _) -> {error, file_exists}; -handle_create_msg(_, {rexi_EXIT, _Reason}, {Complete, N, Parts}) -> +handle_create_msg({rexi_EXIT, _Reason}, _, {Complete, N, Parts}) -> {ok, {Complete, N-1, Parts}}; -handle_create_msg(_, {rexi_DOWN, _, _, _}, {Complete, _N, _Parts}) -> +handle_create_msg({rexi_DOWN, _, _, _}, _, {Complete, _N, _Parts}) -> if Complete -> {stop, ok}; true -> {error, create_db_fubar} end; handle_create_msg(_, _, {true, 1, _Acc}) -> {stop, ok}; -handle_create_msg({_, #shard{range=[Beg,_]}}, {ok, _}, {false, 1, PartResults0}) -> +handle_create_msg({ok, _}, {_, #shard{range=[Beg,_]}}, {false, 1, PartResults0}) -> PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}), case is_complete(PartResults) of true -> {stop, ok}; false -> {error, create_db_fubar} end; -handle_create_msg(_RefPart, {ok, _}, {true, N, Parts}) -> +handle_create_msg({ok, _}, _RefPart, {true, N, Parts}) -> {ok, {true, N-1, Parts}}; -handle_create_msg({_Ref, #shard{range=[Beg,_]}}, {ok, _}, {false, Rem, PartResults0}) -> +handle_create_msg({ok, _}, {_Ref, #shard{range=[Beg,_]}}, {false, Rem, PartResults0}) -> PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}), {ok, {is_complete(PartResults), Rem-1, PartResults}}. diff --git a/src/fabric_delete_db.erl b/src/fabric_delete_db.erl index cbcf0d5d..d21a1a06 100644 --- a/src/fabric_delete_db.erl +++ b/src/fabric_delete_db.erl @@ -33,18 +33,18 @@ send_delete_calls(Parts, Options) -> {Ref, Part} end, Parts). -handle_delete_msg(_, not_found, {NotFound, N}) -> +handle_delete_msg(not_found, _, {NotFound, N}) -> {ok, {NotFound, N-1}}; -handle_delete_msg(_, {rexi_EXIT, _Reason}, {NotFound, N}) -> +handle_delete_msg({rexi_EXIT, _Reason}, _, {NotFound, N}) -> {ok, {NotFound, N-1}}; -handle_delete_msg(_, {rexi_DOWN, _, _, _}, _Acc) -> +handle_delete_msg({rexi_DOWN, _, _, _}, _, _Acc) -> {error, delete_db_fubar}; handle_delete_msg(_, _, {NotFound, 1}) -> if NotFound -> {stop, not_found}; true -> {stop, ok} end; -handle_delete_msg(_, ok, {_NotFound, N}) -> +handle_delete_msg(ok, _, {_NotFound, N}) -> {ok, {false, N-1}}. delete_fullmap(DbName) -> diff --git a/src/fabric_get_db_info.erl b/src/fabric_get_db_info.erl index 19e72e92..9242b569 100644 --- a/src/fabric_get_db_info.erl +++ b/src/fabric_get_db_info.erl @@ -32,7 +32,7 @@ handle_info_msg(_, _, {false, 1, Infos0}) -> end, [], Infos0)), ?LOG_ERROR("get_db_info error, missing shards: ~p", [MissingShards]), {error, get_db_info}; -handle_info_msg(#shard{range=[Beg,_]}, {ok, Info}, {false, N, Infos0}) -> +handle_info_msg({ok, Info}, #shard{range=[Beg,_]}, {false, N, Infos0}) -> case couch_util:get_value(Beg, Infos0) of nil -> Infos = lists:keyreplace(Beg, 1, Infos0, {Beg, Info}), @@ -43,7 +43,7 @@ handle_info_msg(#shard{range=[Beg,_]}, {ok, Info}, {false, N, Infos0}) -> _ -> {ok, {false, N-1, Infos0}} end; -handle_info_msg(_, _Other, {Complete, N, Infos0}) -> +handle_info_msg(_Other, _, {Complete, N, Infos0}) -> {ok, {Complete, N-1, Infos0}}. is_complete(List) -> diff --git a/src/fabric_missing_revs.erl b/src/fabric_missing_revs.erl index ee7ea421..ff756425 100644 --- a/src/fabric_missing_revs.erl +++ b/src/fabric_missing_revs.erl @@ -13,14 +13,14 @@ go(DbName, AllIdsRevs) -> Acc0 = {length(Workers), ResultDict}, fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0). -handle_message(_Worker, {rexi_DOWN, _, _, _}, Acc0) -> +handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) -> skip_message(Acc0); -handle_message(_Worker, {rexi_EXIT, _, _, _}, Acc0) -> +handle_message({rexi_EXIT, _, _, _}, _Worker, Acc0) -> skip_message(Acc0); -handle_message(_Worker, {ok, Results}, {1, D0}) -> +handle_message({ok, Results}, _Worker, {1, D0}) -> D = update_dict(D0, Results), {stop, dict:fold(fun force_reply/3, [], D)}; -handle_message(_Worker, {ok, Results}, {WaitingCount, D0}) -> +handle_message({ok, Results}, _Worker, {WaitingCount, D0}) -> D = update_dict(D0, Results), case dict:fold(fun maybe_reply/3, {stop, []}, D) of continue -> diff --git a/src/fabric_open_doc.erl b/src/fabric_open_doc.erl index 3ea7d4ab..e2ea3023 100644 --- a/src/fabric_open_doc.erl +++ b/src/fabric_open_doc.erl @@ -18,11 +18,11 @@ go(DbName, Id, Options) -> Error end. -handle_message(_Worker, {rexi_DOWN, _, _, _}, Acc0) -> +handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) -> skip_message(Acc0); -handle_message(_Worker, {rexi_EXIT, _Reason}, Acc0) -> +handle_message({rexi_EXIT, _Reason}, _Worker, Acc0) -> skip_message(Acc0); -handle_message(_Worker, Reply, {WaitingCount, R, Replies}) -> +handle_message(Reply, _Worker, {WaitingCount, R, Replies}) -> case merge_read_reply(make_key(Reply), Reply, Replies) of {_, KeyCount} when KeyCount =:= R -> {stop, Reply}; diff --git a/src/fabric_open_revs.erl b/src/fabric_open_revs.erl index 6847a2e5..cc464203 100644 --- a/src/fabric_open_revs.erl +++ b/src/fabric_open_revs.erl @@ -15,11 +15,11 @@ go(DbName, Id, Revs, Options) -> Else end. -handle_message(_Worker, {rexi_DOWN, _, _, _}, Acc0) -> +handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) -> skip_message(Acc0); -handle_message(_Worker, {rexi_EXIT, _}, Acc0) -> +handle_message({rexi_EXIT, _}, _Worker, Acc0) -> skip_message(Acc0); -handle_message(_Worker, Reply, {WaitingCount, R, Replies}) -> +handle_message(Reply, _Worker, {WaitingCount, R, Replies}) -> case merge_read_reply(make_key(Reply), Reply, Replies) of {_, KeyCount} when KeyCount =:= R -> {stop, Reply}; diff --git a/src/fabric_update_docs.erl b/src/fabric_update_docs.erl index 1281e40e..7a677e1d 100644 --- a/src/fabric_update_docs.erl +++ b/src/fabric_update_docs.erl @@ -19,11 +19,11 @@ go(DbName, AllDocs, Options) -> Else end. -handle_message(_Worker, {rexi_DOWN, _, _, _}, Acc0) -> +handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) -> skip_message(Acc0); -handle_message(_Worker, {rexi_EXIT, _}, Acc0) -> +handle_message({rexi_EXIT, _}, _Worker, Acc0) -> skip_message(Acc0); -handle_message(Worker, {ok, Replies}, Acc0) -> +handle_message({ok, Replies}, Worker, Acc0) -> {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0, Docs = couch_util:get_value(Worker, GroupedDocs), DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0), diff --git a/src/fabric_util.erl b/src/fabric_util.erl index b6d68e09..dd1aaf0a 100644 --- a/src/fabric_util.erl +++ b/src/fabric_util.erl @@ -55,10 +55,16 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> false -> % this was some non-matching message which we will ignore {ok, Acc0}; - RefPart -> - % call the Fun that understands the message - %?debugFmt("~nAcc0: ~p~n", [Acc0]), - Fun(RefPart, Msg, Acc0) + Worker -> + Fun(Msg, Worker, Acc0) + end; + {Ref, From, Msg} -> + io:format("process sync_reply {~p,~p} ~p~n", [Ref, From, Msg]), + case lists:keyfind(Ref, Keypos, RefList) of + false -> + {ok, Acc0}; + Worker -> + Fun(Msg, {Worker, From}, Acc0) end; {rexi_DOWN, _RexiMonPid, ServerPid, Reason} = Msg -> showroom_log:message(alert, "rexi_DOWN ~p ~p", [ServerPid, Reason]), |