From 3b64928720ce15c92374b031ff24b10022671261 Mon Sep 17 00:00:00 2001 From: Brad Anderson Date: Tue, 17 Aug 2010 22:04:15 -0400 Subject: split some rexi utilities out from fabric --- apps/fabric/src/fabric_util.erl | 66 +++----------------------------- apps/fabric/src/fabric_view_all_docs.erl | 2 +- apps/fabric/src/fabric_view_changes.erl | 4 +- apps/fabric/src/fabric_view_map.erl | 5 +-- apps/fabric/src/fabric_view_reduce.erl | 2 +- apps/rexi/ebin/rexi.app | 7 +++- apps/rexi/src/rexi_utils.erl | 53 +++++++++++++++++++++++++ 7 files changed, 70 insertions(+), 69 deletions(-) create mode 100644 apps/rexi/src/rexi_utils.erl (limited to 'apps') diff --git a/apps/fabric/src/fabric_util.erl b/apps/fabric/src/fabric_util.erl index b9e30093..a3265010 100644 --- a/apps/fabric/src/fabric_util.erl +++ b/apps/fabric/src/fabric_util.erl @@ -14,12 +14,10 @@ -module(fabric_util). --export([submit_jobs/3, cleanup/1, recv/4, receive_loop/4, receive_loop/6, - get_db/1]). +-export([submit_jobs/3, cleanup/1, recv/4, get_db/1]). -include("fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). submit_jobs(Shards, EndPoint, ExtraArgs) -> lists:map(fun(#shard{node=Node, name=ShardName} = Shard) -> @@ -31,66 +29,12 @@ cleanup(Workers) -> [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers]. recv(Workers, Keypos, Fun, Acc0) -> - receive_loop(Workers, Keypos, Fun, Acc0). - -receive_loop(Workers, Keypos, Fun, Acc0) -> - case couch_config:get("fabric", "request_timeout", "60000") of - "infinity" -> - Timeout = infinity; - N -> - Timeout = list_to_integer(N) + Timeout = case couch_config:get("fabric", "request_timeout", "60000") of + "infinity" -> infinity; + N -> list_to_integer(N) end, - receive_loop(Workers, Keypos, Fun, Acc0, Timeout, infinity). - -%% @doc set up the receive loop with an overall timeout --spec receive_loop([any()], integer(), function(), any(), timeout(), timeout()) -> - {ok, any()} | timeout | {error, any()}. -receive_loop(RefPartMap, Keypos, Fun, Acc0, infinity, PerMsgTO) -> - process_mailbox(RefPartMap, Keypos, Fun, Acc0, nil, PerMsgTO); -receive_loop(RefPartMap, Keypos, Fun, Acc0, GlobalTimeout, PerMsgTO) -> - TimeoutRef = erlang:make_ref(), - {ok, TRef} = timer:send_after(GlobalTimeout, {timeout, TimeoutRef}), - try - process_mailbox(RefPartMap, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) - after - timer:cancel(TRef) - end. + rexi_utils:recv(Workers, Keypos, Fun, Acc0, Timeout, infinity). -process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> - case process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) of - {ok, Acc} -> - process_mailbox(RefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO); - {stop, Acc} -> - {ok, Acc}; - Error -> - Error - end. - -process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> - receive - {timeout, TimeoutRef} -> - timeout; - {Ref, Msg} -> - case lists:keyfind(Ref, Keypos, RefList) of - false -> - % this was some non-matching message which we will ignore - {ok, Acc0}; - Worker -> - Fun(Msg, Worker, Acc0) - end; - {Ref, From, Msg} -> - case lists:keyfind(Ref, Keypos, RefList) of - false -> - {ok, Acc0}; - Worker -> - Fun(Msg, {Worker, From}, Acc0) - end; - {rexi_DOWN, _RexiMonPid, ServerPid, Reason} = Msg -> - ?LOG_ERROR("rexi_DOWN ~p ~p", [ServerPid, Reason]), - Fun(Msg, nil, Acc0) - after PerMsgTO -> - timeout - end. get_db(DbName) -> Shards = mem3:shards(DbName), diff --git a/apps/fabric/src/fabric_view_all_docs.erl b/apps/fabric/src/fabric_view_all_docs.erl index 45ae3864..b3436171 100644 --- a/apps/fabric/src/fabric_view_all_docs.erl +++ b/apps/fabric/src/fabric_view_all_docs.erl @@ -37,7 +37,7 @@ go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) -> limit = Limit, user_acc = Acc0 }, - try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3, + try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, State, infinity, 5000) of {ok, NewState} -> {ok, NewState#collector.user_acc}; diff --git a/apps/fabric/src/fabric_view_changes.erl b/apps/fabric/src/fabric_view_changes.erl index e4f44485..c7b6b7c4 100644 --- a/apps/fabric/src/fabric_view_changes.erl +++ b/apps/fabric/src/fabric_view_changes.erl @@ -103,7 +103,7 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn) -> limit = ChangesArgs#changes_args.limit, rows = Seqs % store sequence positions instead }, - try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3, + try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, State, infinity, 5000) after fabric_util:cleanup(Workers) @@ -221,7 +221,7 @@ unpack_seqs(0, DbName) -> unpack_seqs("0", DbName) -> fabric_dict:init(mem3:shards(DbName), 0); - + unpack_seqs(Packed, DbName) -> {match, [Opaque]} = re:run(Packed, "^([0-9]+-)?(?.*)", [{capture, [opaque], binary}]), diff --git a/apps/fabric/src/fabric_view_map.erl b/apps/fabric/src/fabric_view_map.erl index 91ed3179..e1210a84 100644 --- a/apps/fabric/src/fabric_view_map.erl +++ b/apps/fabric/src/fabric_view_map.erl @@ -42,7 +42,7 @@ go(DbName, DDoc, View, Args, Callback, Acc0) -> sorted = Args#view_query_args.sorted, user_acc = Acc0 }, - try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3, + try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, State, infinity, 1000 * 60 * 60) of {ok, NewState} -> {ok, NewState#collector.user_acc}; @@ -117,7 +117,7 @@ handle_message(#view_row{} = Row, {_,From}, #collector{sorted=false} = St) -> {Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn), gen_server:reply(From, ok), {Go, St#collector{user_acc=Acc, limit=Limit-1}}; - + handle_message(#view_row{} = Row, {Worker, From}, State) -> #collector{ query_args = #view_query_args{direction=Dir}, @@ -149,4 +149,3 @@ merge_row(_, KeyDict, Row, Rows) -> dict:fetch(A, KeyDict) < dict:fetch(B, KeyDict) end end, [Row], Rows). - diff --git a/apps/fabric/src/fabric_view_reduce.erl b/apps/fabric/src/fabric_view_reduce.erl index f401fc58..6ae564cc 100644 --- a/apps/fabric/src/fabric_view_reduce.erl +++ b/apps/fabric/src/fabric_view_reduce.erl @@ -49,7 +49,7 @@ go(DbName, DDoc, VName, Args, Callback, Acc0) -> rows = dict:new(), user_acc = Acc0 }, - try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3, + try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, State, infinity, 1000 * 60 * 60) of {ok, NewState} -> {ok, NewState#collector.user_acc}; diff --git a/apps/rexi/ebin/rexi.app b/apps/rexi/ebin/rexi.app index 620c8863..5c944b72 100644 --- a/apps/rexi/ebin/rexi.app +++ b/apps/rexi/ebin/rexi.app @@ -1,7 +1,12 @@ {application, rexi, [ {description, "Lightweight RPC server"}, {vsn, "1.2"}, - {modules, [rexi, rexi_app, rexi_sup, rexi_monitor, rexi_server]}, + {modules, [rexi, + rexi_app, + rexi_sup, + rexi_monitor, + rexi_server, + rexi_utils]}, {registered, [rexi_sup, rexi_server]}, {applications, [kernel, stdlib]}, {mod, {rexi_app,[]}} diff --git a/apps/rexi/src/rexi_utils.erl b/apps/rexi/src/rexi_utils.erl new file mode 100644 index 00000000..79183951 --- /dev/null +++ b/apps/rexi/src/rexi_utils.erl @@ -0,0 +1,53 @@ +-module(rexi_utils). + +-export([recv/6]). + +%% @doc set up the receive loop with an overall timeout +-spec recv([any()], integer(), function(), any(), timeout(), timeout()) -> + {ok, any()} | timeout | {error, any()}. +recv(Refs, Keypos, Fun, Acc0, infinity, PerMsgTO) -> + process_mailbox(Refs, Keypos, Fun, Acc0, nil, PerMsgTO); +recv(Refs, Keypos, Fun, Acc0, GlobalTimeout, PerMsgTO) -> + TimeoutRef = erlang:make_ref(), + {ok, TRef} = timer:send_after(GlobalTimeout, {timeout, TimeoutRef}), + try + process_mailbox(Refs, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) + after + timer:cancel(TRef) + end. + +process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> + case process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) of + {ok, Acc} -> + process_mailbox(RefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO); + {stop, Acc} -> + {ok, Acc}; + Error -> + Error + end. + +process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> + receive + {timeout, TimeoutRef} -> + timeout; + {Ref, Msg} -> + case lists:keyfind(Ref, Keypos, RefList) of + false -> + % this was some non-matching message which we will ignore + {ok, Acc0}; + Worker -> + Fun(Msg, Worker, Acc0) + end; + {Ref, From, Msg} -> + case lists:keyfind(Ref, Keypos, RefList) of + false -> + {ok, Acc0}; + Worker -> + Fun(Msg, {Worker, From}, Acc0) + end; + {rexi_DOWN, _RexiMonPid, ServerPid, Reason} = Msg -> + io:format("rexi_DOWN ~p ~p", [ServerPid, Reason]), + Fun(Msg, nil, Acc0) + after PerMsgTO -> + timeout + end. -- cgit v1.2.3