summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-06-01 10:58:17 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-06-01 10:58:17 -0400
commitbd976a5a78ba88a25996fc7e94c22e8a8925ccec (patch)
tree4a3ee571dd3a937b166ee246473bce2b71b55885
parent9b7831dbbddc97b4134be2fa3b3ec2d2ebc9462b (diff)
add sync_reply, change msg format to be more like gen_server
-rw-r--r--src/fabric_create_db.erl12
-rw-r--r--src/fabric_delete_db.erl8
-rw-r--r--src/fabric_get_db_info.erl4
-rw-r--r--src/fabric_missing_revs.erl8
-rw-r--r--src/fabric_open_doc.erl6
-rw-r--r--src/fabric_open_revs.erl6
-rw-r--r--src/fabric_update_docs.erl6
-rw-r--r--src/fabric_util.erl14
8 files changed, 35 insertions, 29 deletions
diff --git a/src/fabric_create_db.erl b/src/fabric_create_db.erl
index 1e9bf256..21b093bf 100644
--- a/src/fabric_create_db.erl
+++ b/src/fabric_create_db.erl
@@ -37,26 +37,26 @@ send_create_calls(Fullmap, Options) ->
end, Fullmap).
%% @doc handle create messages from shards
-handle_create_msg(_, file_exists, _) ->
+handle_create_msg(file_exists, _, _) ->
{error, file_exists};
-handle_create_msg(_, {rexi_EXIT, _Reason}, {Complete, N, Parts}) ->
+handle_create_msg({rexi_EXIT, _Reason}, _, {Complete, N, Parts}) ->
{ok, {Complete, N-1, Parts}};
-handle_create_msg(_, {rexi_DOWN, _, _, _}, {Complete, _N, _Parts}) ->
+handle_create_msg({rexi_DOWN, _, _, _}, _, {Complete, _N, _Parts}) ->
if
Complete -> {stop, ok};
true -> {error, create_db_fubar}
end;
handle_create_msg(_, _, {true, 1, _Acc}) ->
{stop, ok};
-handle_create_msg({_, #shard{range=[Beg,_]}}, {ok, _}, {false, 1, PartResults0}) ->
+handle_create_msg({ok, _}, {_, #shard{range=[Beg,_]}}, {false, 1, PartResults0}) ->
PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}),
case is_complete(PartResults) of
true -> {stop, ok};
false -> {error, create_db_fubar}
end;
-handle_create_msg(_RefPart, {ok, _}, {true, N, Parts}) ->
+handle_create_msg({ok, _}, _RefPart, {true, N, Parts}) ->
{ok, {true, N-1, Parts}};
-handle_create_msg({_Ref, #shard{range=[Beg,_]}}, {ok, _}, {false, Rem, PartResults0}) ->
+handle_create_msg({ok, _}, {_Ref, #shard{range=[Beg,_]}}, {false, Rem, PartResults0}) ->
PartResults = lists:keyreplace(Beg, 1, PartResults0, {Beg, true}),
{ok, {is_complete(PartResults), Rem-1, PartResults}}.
diff --git a/src/fabric_delete_db.erl b/src/fabric_delete_db.erl
index cbcf0d5d..d21a1a06 100644
--- a/src/fabric_delete_db.erl
+++ b/src/fabric_delete_db.erl
@@ -33,18 +33,18 @@ send_delete_calls(Parts, Options) ->
{Ref, Part}
end, Parts).
-handle_delete_msg(_, not_found, {NotFound, N}) ->
+handle_delete_msg(not_found, _, {NotFound, N}) ->
{ok, {NotFound, N-1}};
-handle_delete_msg(_, {rexi_EXIT, _Reason}, {NotFound, N}) ->
+handle_delete_msg({rexi_EXIT, _Reason}, _, {NotFound, N}) ->
{ok, {NotFound, N-1}};
-handle_delete_msg(_, {rexi_DOWN, _, _, _}, _Acc) ->
+handle_delete_msg({rexi_DOWN, _, _, _}, _, _Acc) ->
{error, delete_db_fubar};
handle_delete_msg(_, _, {NotFound, 1}) ->
if
NotFound -> {stop, not_found};
true -> {stop, ok}
end;
-handle_delete_msg(_, ok, {_NotFound, N}) ->
+handle_delete_msg(ok, _, {_NotFound, N}) ->
{ok, {false, N-1}}.
delete_fullmap(DbName) ->
diff --git a/src/fabric_get_db_info.erl b/src/fabric_get_db_info.erl
index 19e72e92..9242b569 100644
--- a/src/fabric_get_db_info.erl
+++ b/src/fabric_get_db_info.erl
@@ -32,7 +32,7 @@ handle_info_msg(_, _, {false, 1, Infos0}) ->
end, [], Infos0)),
?LOG_ERROR("get_db_info error, missing shards: ~p", [MissingShards]),
{error, get_db_info};
-handle_info_msg(#shard{range=[Beg,_]}, {ok, Info}, {false, N, Infos0}) ->
+handle_info_msg({ok, Info}, #shard{range=[Beg,_]}, {false, N, Infos0}) ->
case couch_util:get_value(Beg, Infos0) of
nil ->
Infos = lists:keyreplace(Beg, 1, Infos0, {Beg, Info}),
@@ -43,7 +43,7 @@ handle_info_msg(#shard{range=[Beg,_]}, {ok, Info}, {false, N, Infos0}) ->
_ ->
{ok, {false, N-1, Infos0}}
end;
-handle_info_msg(_, _Other, {Complete, N, Infos0}) ->
+handle_info_msg(_Other, _, {Complete, N, Infos0}) ->
{ok, {Complete, N-1, Infos0}}.
is_complete(List) ->
diff --git a/src/fabric_missing_revs.erl b/src/fabric_missing_revs.erl
index ee7ea421..ff756425 100644
--- a/src/fabric_missing_revs.erl
+++ b/src/fabric_missing_revs.erl
@@ -13,14 +13,14 @@ go(DbName, AllIdsRevs) ->
Acc0 = {length(Workers), ResultDict},
fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0).
-handle_message(_Worker, {rexi_DOWN, _, _, _}, Acc0) ->
+handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) ->
skip_message(Acc0);
-handle_message(_Worker, {rexi_EXIT, _, _, _}, Acc0) ->
+handle_message({rexi_EXIT, _, _, _}, _Worker, Acc0) ->
skip_message(Acc0);
-handle_message(_Worker, {ok, Results}, {1, D0}) ->
+handle_message({ok, Results}, _Worker, {1, D0}) ->
D = update_dict(D0, Results),
{stop, dict:fold(fun force_reply/3, [], D)};
-handle_message(_Worker, {ok, Results}, {WaitingCount, D0}) ->
+handle_message({ok, Results}, _Worker, {WaitingCount, D0}) ->
D = update_dict(D0, Results),
case dict:fold(fun maybe_reply/3, {stop, []}, D) of
continue ->
diff --git a/src/fabric_open_doc.erl b/src/fabric_open_doc.erl
index 3ea7d4ab..e2ea3023 100644
--- a/src/fabric_open_doc.erl
+++ b/src/fabric_open_doc.erl
@@ -18,11 +18,11 @@ go(DbName, Id, Options) ->
Error
end.
-handle_message(_Worker, {rexi_DOWN, _, _, _}, Acc0) ->
+handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) ->
skip_message(Acc0);
-handle_message(_Worker, {rexi_EXIT, _Reason}, Acc0) ->
+handle_message({rexi_EXIT, _Reason}, _Worker, Acc0) ->
skip_message(Acc0);
-handle_message(_Worker, Reply, {WaitingCount, R, Replies}) ->
+handle_message(Reply, _Worker, {WaitingCount, R, Replies}) ->
case merge_read_reply(make_key(Reply), Reply, Replies) of
{_, KeyCount} when KeyCount =:= R ->
{stop, Reply};
diff --git a/src/fabric_open_revs.erl b/src/fabric_open_revs.erl
index 6847a2e5..cc464203 100644
--- a/src/fabric_open_revs.erl
+++ b/src/fabric_open_revs.erl
@@ -15,11 +15,11 @@ go(DbName, Id, Revs, Options) ->
Else
end.
-handle_message(_Worker, {rexi_DOWN, _, _, _}, Acc0) ->
+handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) ->
skip_message(Acc0);
-handle_message(_Worker, {rexi_EXIT, _}, Acc0) ->
+handle_message({rexi_EXIT, _}, _Worker, Acc0) ->
skip_message(Acc0);
-handle_message(_Worker, Reply, {WaitingCount, R, Replies}) ->
+handle_message(Reply, _Worker, {WaitingCount, R, Replies}) ->
case merge_read_reply(make_key(Reply), Reply, Replies) of
{_, KeyCount} when KeyCount =:= R ->
{stop, Reply};
diff --git a/src/fabric_update_docs.erl b/src/fabric_update_docs.erl
index 1281e40e..7a677e1d 100644
--- a/src/fabric_update_docs.erl
+++ b/src/fabric_update_docs.erl
@@ -19,11 +19,11 @@ go(DbName, AllDocs, Options) ->
Else
end.
-handle_message(_Worker, {rexi_DOWN, _, _, _}, Acc0) ->
+handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) ->
skip_message(Acc0);
-handle_message(_Worker, {rexi_EXIT, _}, Acc0) ->
+handle_message({rexi_EXIT, _}, _Worker, Acc0) ->
skip_message(Acc0);
-handle_message(Worker, {ok, Replies}, Acc0) ->
+handle_message({ok, Replies}, Worker, Acc0) ->
{WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0,
Docs = couch_util:get_value(Worker, GroupedDocs),
DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0),
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index b6d68e09..dd1aaf0a 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -55,10 +55,16 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
false ->
% this was some non-matching message which we will ignore
{ok, Acc0};
- RefPart ->
- % call the Fun that understands the message
- %?debugFmt("~nAcc0: ~p~n", [Acc0]),
- Fun(RefPart, Msg, Acc0)
+ Worker ->
+ Fun(Msg, Worker, Acc0)
+ end;
+ {Ref, From, Msg} ->
+ io:format("process sync_reply {~p,~p} ~p~n", [Ref, From, Msg]),
+ case lists:keyfind(Ref, Keypos, RefList) of
+ false ->
+ {ok, Acc0};
+ Worker ->
+ Fun(Msg, {Worker, From}, Acc0)
end;
{rexi_DOWN, _RexiMonPid, ServerPid, Reason} = Msg ->
showroom_log:message(alert, "rexi_DOWN ~p ~p", [ServerPid, Reason]),