diff options
author | Brad Anderson <brad@cloudant.com> | 2010-08-17 22:04:15 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-08-27 16:52:26 -0400 |
commit | 3b64928720ce15c92374b031ff24b10022671261 (patch) | |
tree | 0535ad8bb4f6c3cfd148c62c8c0a0f24c1a7f589 /apps/fabric/src/fabric_util.erl | |
parent | 794d78eb02a20e6c545d24fa9649839ebe763636 (diff) |
split some rexi utilities out from fabric
Diffstat (limited to 'apps/fabric/src/fabric_util.erl')
-rw-r--r-- | apps/fabric/src/fabric_util.erl | 66 |
1 files changed, 5 insertions, 61 deletions
diff --git a/apps/fabric/src/fabric_util.erl b/apps/fabric/src/fabric_util.erl index b9e30093..a3265010 100644 --- a/apps/fabric/src/fabric_util.erl +++ b/apps/fabric/src/fabric_util.erl @@ -14,12 +14,10 @@ -module(fabric_util). --export([submit_jobs/3, cleanup/1, recv/4, receive_loop/4, receive_loop/6, - get_db/1]). +-export([submit_jobs/3, cleanup/1, recv/4, get_db/1]). -include("fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). submit_jobs(Shards, EndPoint, ExtraArgs) -> lists:map(fun(#shard{node=Node, name=ShardName} = Shard) -> @@ -31,66 +29,12 @@ cleanup(Workers) -> [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers]. recv(Workers, Keypos, Fun, Acc0) -> - receive_loop(Workers, Keypos, Fun, Acc0). - -receive_loop(Workers, Keypos, Fun, Acc0) -> - case couch_config:get("fabric", "request_timeout", "60000") of - "infinity" -> - Timeout = infinity; - N -> - Timeout = list_to_integer(N) + Timeout = case couch_config:get("fabric", "request_timeout", "60000") of + "infinity" -> infinity; + N -> list_to_integer(N) end, - receive_loop(Workers, Keypos, Fun, Acc0, Timeout, infinity). - -%% @doc set up the receive loop with an overall timeout --spec receive_loop([any()], integer(), function(), any(), timeout(), timeout()) -> - {ok, any()} | timeout | {error, any()}. -receive_loop(RefPartMap, Keypos, Fun, Acc0, infinity, PerMsgTO) -> - process_mailbox(RefPartMap, Keypos, Fun, Acc0, nil, PerMsgTO); -receive_loop(RefPartMap, Keypos, Fun, Acc0, GlobalTimeout, PerMsgTO) -> - TimeoutRef = erlang:make_ref(), - {ok, TRef} = timer:send_after(GlobalTimeout, {timeout, TimeoutRef}), - try - process_mailbox(RefPartMap, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) - after - timer:cancel(TRef) - end. + rexi_utils:recv(Workers, Keypos, Fun, Acc0, Timeout, infinity). -process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> - case process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) of - {ok, Acc} -> - process_mailbox(RefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO); - {stop, Acc} -> - {ok, Acc}; - Error -> - Error - end. - -process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> - receive - {timeout, TimeoutRef} -> - timeout; - {Ref, Msg} -> - case lists:keyfind(Ref, Keypos, RefList) of - false -> - % this was some non-matching message which we will ignore - {ok, Acc0}; - Worker -> - Fun(Msg, Worker, Acc0) - end; - {Ref, From, Msg} -> - case lists:keyfind(Ref, Keypos, RefList) of - false -> - {ok, Acc0}; - Worker -> - Fun(Msg, {Worker, From}, Acc0) - end; - {rexi_DOWN, _RexiMonPid, ServerPid, Reason} = Msg -> - ?LOG_ERROR("rexi_DOWN ~p ~p", [ServerPid, Reason]), - Fun(Msg, nil, Acc0) - after PerMsgTO -> - timeout - end. get_db(DbName) -> Shards = mem3:shards(DbName), |